You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/07/19 20:44:24 UTC

svn commit: r1504961 [9/11] - in /activemq/activemq-blaze/trunk: ./ src/main/java/org/apache/activeblaze/ src/main/java/org/apache/activeblaze/cluster/ src/main/java/org/apache/activeblaze/group/ src/main/java/org/apache/activeblaze/impl/destination/ s...

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsStreamMessage.java (from r752825, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsStreamMessage.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsStreamMessage.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java&r1=752825&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsStreamMessage.java Fri Jul 19 18:44:21 2013
@@ -14,78 +14,91 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activeblaze.jms.message;
+package org.apache.activeblaze.wire;
 
-import org.apache.activeblaze.BlazeException;
-import org.apache.activeblaze.BlazeRuntimeException;
-import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
-import org.apache.activeblaze.wire.BlazeData;
-import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.BufferInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
+
 import javax.jms.JMSException;
 import javax.jms.MessageEOFException;
 import javax.jms.MessageFormatException;
 import javax.jms.MessageNotReadableException;
 import javax.jms.MessageNotWriteableException;
 import javax.jms.StreamMessage;
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
 
 /**
- * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive types in the Java programming language.
- * It is filled and read sequentially. It inherits from the <CODE>Message</CODE> interface and adds a stream message
- * body. Its methods are based largely on those found in <CODE>java.io.DataInputStream</CODE> and
- * <CODE>java.io.DataOutputStream</CODE>. <p/>
- * <P>
- * The primitive types can be read or written explicitly using methods for each type. They may also be read or written
- * generically as objects. For instance, a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to
+ * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive
+ * types in the Java programming language. It is filled and read sequentially.
+ * It inherits from the <CODE>Message</CODE> interface and adds a stream message
+ * body. Its methods are based largely on those found in
+ * <CODE>java.io.DataInputStream</CODE> and
+ * <CODE>java.io.DataOutputStream</CODE>.
+ * <p/>
+ * <p/>
+ * The primitive types can be read or written explicitly using methods for each
+ * type. They may also be read or written generically as objects. For instance,
+ * a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to
  * <CODE>StreamMessage.writeObject(new
  * Integer(6))</CODE>. Both forms are provided, because the explicit form is
- * convenient for static programming, and the object form is needed when types are not known at compile time. <p/>
- * <P>
- * When the message is first created, and when <CODE>clearBody</CODE> is called, the body of the message is in
- * write-only mode. After the first call to <CODE>reset</CODE> has been made, the message body is in read-only mode.
- * After a message has been sent, the client that sent it can retain and modify it without affecting the message that
- * has been sent. The same message object can be sent multiple times. When a message has been received, the provider has
- * called <CODE>reset</CODE> so that the message body is in read-only mode for the client. <p/>
- * <P>
- * If <CODE>clearBody</CODE> is called on a message in read-only mode, the message body is cleared and the message
- * body is in write-only mode. <p/>
- * <P>
- * If a client attempts to read a message in write-only mode, a <CODE>MessageNotReadableException</CODE> is thrown.
+ * convenient for static programming, and the object form is needed when types
+ * are not known at compile time.
+ * <p/>
+ * <p/>
+ * When the message is first created, and when <CODE>clearBody</CODE> is called,
+ * the body of the message is in write-only mode. After the first call to
+ * <CODE>reset</CODE> has been made, the message body is in read-only mode.
+ * After a message has been sent, the client that sent it can retain and modify
+ * it without affecting the message that has been sent. The same message object
+ * can be sent multiple times. When a message has been received, the provider
+ * has called <CODE>reset</CODE> so that the message body is in read-only mode
+ * for the client.
+ * <p/>
+ * <p/>
+ * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
+ * message body is cleared and the message body is in write-only mode.
+ * <p/>
+ * <p/>
+ * If a client attempts to read a message in write-only mode, a
+ * <CODE>MessageNotReadableException</CODE> is thrown.
+ * <p/>
+ * <p/>
+ * If a client attempts to write a message in read-only mode, a
+ * <CODE>MessageNotWriteableException</CODE> is thrown.
+ * <p/>
+ * <p/>
+ * <CODE>StreamMessage</CODE> objects support the following conversion table.
+ * The marked cases must be supported. The unmarked cases must throw a
+ * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive conversions
+ * may throw a runtime exception if the primitive's <CODE>valueOf()</CODE>
+ * method does not accept it as a valid <CODE>String</CODE> representation of
+ * the primitive.
+ * <p/>
+ * <p/>
+ * A value written as the row type can be read as the column type.
  * <p/>
- * <P>
- * If a client attempts to write a message in read-only mode, a <CODE>MessageNotWriteableException</CODE> is thrown.
  * <p/>
- * <P>
- * <CODE>StreamMessage</CODE> objects support the following conversion table. The marked cases must be supported. The
- * unmarked cases must throw a <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive conversions may
- * throw a runtime exception if the primitive's <CODE>valueOf()</CODE> method does not accept it as a valid
- * <CODE>String</CODE> representation of the primitive. <p/>
- * <P>
- * A value written as the row type can be read as the column type. <p/>
- * 
  * <PRE>
- *  | | boolean byte short char int long float double String byte[]
+ * | | boolean byte short char int long float double String byte[]
  * |----------------------------------------------------------------------
  * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
  * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
  * X |----------------------------------------------------------------------
- * 
+ * <p/>
  * </PRE>
- * 
  * <p/>
- * <P>
- * Attempting to read a null value as a primitive type must be treated as calling the primitive's corresponding
- * <code>valueOf(String)</code> conversion method with a null value. Since <code>char</code> does not support a
- * <code>String</code> conversion, attempting to read a null value as a <code>char</code> must throw a
- * <code>NullPointerException</code>.
- * 
- * 
+ * <p/>
+ * <p/>
+ * Attempting to read a null value as a primitive type must be treated as
+ * calling the primitive's corresponding <code>valueOf(String)</code> conversion
+ * method with a null value. Since <code>char</code> does not support a
+ * <code>String</code> conversion, attempting to read a null value as a
+ * <code>char</code> must throw a <code>NullPointerException</code>.
+ *
  * @see javax.jms.Session#createStreamMessage()
  * @see javax.jms.BytesMessage
  * @see javax.jms.MapMessage
@@ -96,14 +109,18 @@ import javax.jms.StreamMessage;
 public class BlazeJmsStreamMessage extends BlazeJmsMessage implements StreamMessage {
     protected transient DataOutputStream dataOut;
     protected transient ByteArrayOutputStream bytesOut;
-    protected transient DataInputStream dataIn;
+    protected transient BufferInputStream dataIn;
     protected transient int remainingBytes = -1;
 
+    public int getPacketType() {
+        return PacketType.JMS_STREAM.getNumber();
+    }
+
     public BlazeJmsStreamMessage clone() {
         BlazeJmsStreamMessage copy = new BlazeJmsStreamMessage();
         try {
             copy(copy);
-        } catch (BlazeException e) {
+        } catch (Exception e) {
             throw new BlazeRuntimeException(e);
         }
         return copy;
@@ -113,30 +130,28 @@ public class BlazeJmsStreamMessage exten
      * @param copy
      * @throws BlazeException
      */
-    protected void copy(BlazeJmsStreamMessage copy) throws BlazeException {
+    protected void copy(BlazeJmsStreamMessage copy) throws IOException {
         storeContent();
         super.copy(copy);
         copy.dataOut = null;
         copy.bytesOut = null;
         copy.dataIn = null;
     }
-    
-    /** 
+
+    /**
      * @return the type
      * @see org.apache.activeblaze.BlazeMessage#getType()
      */
-    public int getType(){
+    public int getType() {
         return JmsMessageType.STREAM.ordinal();
     }
 
-    public void storeContent() {
+    protected void storeContent() throws IOException {
         super.storeContent();
-        if (this.dataOut != null && getContent() != null) {
+        if (this.dataOut != null) {
             try {
                 this.dataOut.close();
-                Buffer buffer = new Buffer(bytesOut.toByteArray());
-                BlazeDataBean data = (BlazeDataBean) getContent();
-                data.setPayload(buffer);
+                this.payload = new Buffer(bytesOut.toByteArray());
                 this.bytesOut = null;
                 this.dataOut = null;
             } catch (IOException ioe) {
@@ -145,33 +160,24 @@ public class BlazeJmsStreamMessage exten
         }
     }
 
-    protected void loadContent() {
+    protected void loadContent() throws IOException {
         super.loadContent();
         if (this.dataIn == null) {
-            BlazeData bd = getContent();
-            Buffer data = null;
-            if (bd == null || bd.getPayload() == null) {
-                data = bd.getPayload();
-                if (data == null) {
-                    data = new Buffer(new byte[0]);
-                }
-            }
-            if (data == null) {
-                data = bd.getPayload();
-            }
-            BufferInputStream is = new BufferInputStream(data);
-            this.dataIn = new DataInputStream(is);
+            this.dataIn = new BufferInputStream(this.payload);
         }
     }
 
     /**
-     * Clears out the message body. Clearing a message's body does not clear its header values or property entries. <p/>
-     * <P>
-     * If this message body was read-only, calling this method leaves the message body in the same state as an empty
-     * body in a newly created message.
-     * 
-     * @throws JMSException
-     *             if the JMS provider fails to clear the message body due to some internal error.
+     * Clears out the message body. Clearing a message's body does not clear its
+     * header values or property entries.
+     * <p/>
+     * <p/>
+     * If this message body was read-only, calling this method leaves the
+     * message body in the same state as an empty body in a newly created
+     * message.
+     *
+     * @throws JMSException if the JMS provider fails to clear the message body due to
+     *                      some internal error.
      */
     public void clearBody() throws JMSException {
         super.clearBody();
@@ -183,16 +189,13 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Reads a <code>boolean</code> from the stream message.
-     * 
+     *
      * @return the <code>boolean</code> value read
-     * @throws JMSException
-     *             if the JMS provider fails to read the message due to some internal error.
-     * @throws MessageEOFException
-     *             if unexpected end of message stream has been reached.
-     * @throws MessageFormatException
-     *             if this type conversion is invalid.
-     * @throws MessageNotReadableException
-     *             if the message is in write-only mode.
+     * @throws JMSException                if the JMS provider fails to read the message due to some
+     *                                     internal error.
+     * @throws MessageEOFException         if unexpected end of message stream has been reached.
+     * @throws MessageFormatException      if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
      */
     public boolean readBoolean() throws JMSException {
         initializeReading();
@@ -223,16 +226,14 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Reads a <code>byte</code> value from the stream message.
-     * 
-     * @return the next byte from the stream message as a 8-bit <code>byte</code>
-     * @throws JMSException
-     *             if the JMS provider fails to read the message due to some internal error.
-     * @throws MessageEOFException
-     *             if unexpected end of message stream has been reached.
-     * @throws MessageFormatException
-     *             if this type conversion is invalid.
-     * @throws MessageNotReadableException
-     *             if the message is in write-only mode.
+     *
+     * @return the next byte from the stream message as a 8-bit
+     *         <code>byte</code>
+     * @throws JMSException                if the JMS provider fails to read the message due to some
+     *                                     internal error.
+     * @throws MessageEOFException         if unexpected end of message stream has been reached.
+     * @throws MessageFormatException      if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
      */
     public byte readByte() throws JMSException {
         initializeReading();
@@ -256,11 +257,7 @@ public class BlazeJmsStreamMessage exten
                 throw new MessageFormatException(" not a byte type");
             }
         } catch (NumberFormatException mfe) {
-            try {
-                this.dataIn.reset();
-            } catch (IOException ioe) {
-                throw BlazeJmsExceptionSupport.create(ioe);
-            }
+            this.dataIn.reset();
             throw mfe;
         } catch (EOFException e) {
             throw BlazeJmsExceptionSupport.createMessageEOFException(e);
@@ -271,16 +268,13 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Reads a 16-bit integer from the stream message.
-     * 
+     *
      * @return a 16-bit integer from the stream message
-     * @throws JMSException
-     *             if the JMS provider fails to read the message due to some internal error.
-     * @throws MessageEOFException
-     *             if unexpected end of message stream has been reached.
-     * @throws MessageFormatException
-     *             if this type conversion is invalid.
-     * @throws MessageNotReadableException
-     *             if the message is in write-only mode.
+     * @throws JMSException                if the JMS provider fails to read the message due to some
+     *                                     internal error.
+     * @throws MessageEOFException         if unexpected end of message stream has been reached.
+     * @throws MessageFormatException      if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
      */
     public short readShort() throws JMSException {
         initializeReading();
@@ -307,11 +301,7 @@ public class BlazeJmsStreamMessage exten
                 throw new MessageFormatException(" not a short type");
             }
         } catch (NumberFormatException mfe) {
-            try {
-                this.dataIn.reset();
-            } catch (IOException ioe) {
-                throw BlazeJmsExceptionSupport.create(ioe);
-            }
+            this.dataIn.reset();
             throw mfe;
         } catch (EOFException e) {
             throw BlazeJmsExceptionSupport.createMessageEOFException(e);
@@ -322,16 +312,13 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Reads a Unicode character value from the stream message.
-     * 
+     *
      * @return a Unicode character from the stream message
-     * @throws JMSException
-     *             if the JMS provider fails to read the message due to some internal error.
-     * @throws MessageEOFException
-     *             if unexpected end of message stream has been reached.
-     * @throws MessageFormatException
-     *             if this type conversion is invalid
-     * @throws MessageNotReadableException
-     *             if the message is in write-only mode.
+     * @throws JMSException                if the JMS provider fails to read the message due to some
+     *                                     internal error.
+     * @throws MessageEOFException         if unexpected end of message stream has been reached.
+     * @throws MessageFormatException      if this type conversion is invalid
+     * @throws MessageNotReadableException if the message is in write-only mode.
      */
     public char readChar() throws JMSException {
         initializeReading();
@@ -352,31 +339,21 @@ public class BlazeJmsStreamMessage exten
                 throw new MessageFormatException(" not a char type");
             }
         } catch (NumberFormatException mfe) {
-            try {
-                this.dataIn.reset();
-            } catch (IOException ioe) {
-                throw BlazeJmsExceptionSupport.create(ioe);
-            }
+            this.dataIn.reset();
             throw mfe;
-        } catch (EOFException e) {
-            throw BlazeJmsExceptionSupport.createMessageEOFException(e);
-        } catch (IOException e) {
-            throw BlazeJmsExceptionSupport.createMessageFormatException(e);
         }
     }
 
     /**
      * Reads a 32-bit integer from the stream message.
-     * 
-     * @return a 32-bit integer value from the stream message, interpreted as an <code>int</code>
-     * @throws JMSException
-     *             if the JMS provider fails to read the message due to some internal error.
-     * @throws MessageEOFException
-     *             if unexpected end of message stream has been reached.
-     * @throws MessageFormatException
-     *             if this type conversion is invalid.
-     * @throws MessageNotReadableException
-     *             if the message is in write-only mode.
+     *
+     * @return a 32-bit integer value from the stream message, interpreted as an
+     *         <code>int</code>
+     * @throws JMSException                if the JMS provider fails to read the message due to some
+     *                                     internal error.
+     * @throws MessageEOFException         if unexpected end of message stream has been reached.
+     * @throws MessageFormatException      if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
      */
     public int readInt() throws JMSException {
         initializeReading();
@@ -406,11 +383,7 @@ public class BlazeJmsStreamMessage exten
                 throw new MessageFormatException(" not an int type");
             }
         } catch (NumberFormatException mfe) {
-            try {
-                this.dataIn.reset();
-            } catch (IOException ioe) {
-                throw BlazeJmsExceptionSupport.create(ioe);
-            }
+            this.dataIn.reset();
             throw mfe;
         } catch (EOFException e) {
             throw BlazeJmsExceptionSupport.createMessageEOFException(e);
@@ -421,16 +394,14 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Reads a 64-bit integer from the stream message.
-     * 
-     * @return a 64-bit integer value from the stream message, interpreted as a <code>long</code>
-     * @throws JMSException
-     *             if the JMS provider fails to read the message due to some internal error.
-     * @throws MessageEOFException
-     *             if unexpected end of message stream has been reached.
-     * @throws MessageFormatException
-     *             if this type conversion is invalid.
-     * @throws MessageNotReadableException
-     *             if the message is in write-only mode.
+     *
+     * @return a 64-bit integer value from the stream message, interpreted as a
+     *         <code>long</code>
+     * @throws JMSException                if the JMS provider fails to read the message due to some
+     *                                     internal error.
+     * @throws MessageEOFException         if unexpected end of message stream has been reached.
+     * @throws MessageFormatException      if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
      */
     public long readLong() throws JMSException {
         initializeReading();
@@ -463,11 +434,7 @@ public class BlazeJmsStreamMessage exten
                 throw new MessageFormatException(" not a long type");
             }
         } catch (NumberFormatException mfe) {
-            try {
-                this.dataIn.reset();
-            } catch (IOException ioe) {
-                throw BlazeJmsExceptionSupport.create(ioe);
-            }
+            this.dataIn.reset();
             throw mfe;
         } catch (EOFException e) {
             throw BlazeJmsExceptionSupport.createMessageEOFException(e);
@@ -478,16 +445,13 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Reads a <code>float</code> from the stream message.
-     * 
+     *
      * @return a <code>float</code> value from the stream message
-     * @throws JMSException
-     *             if the JMS provider fails to read the message due to some internal error.
-     * @throws MessageEOFException
-     *             if unexpected end of message stream has been reached.
-     * @throws MessageFormatException
-     *             if this type conversion is invalid.
-     * @throws MessageNotReadableException
-     *             if the message is in write-only mode.
+     * @throws JMSException                if the JMS provider fails to read the message due to some
+     *                                     internal error.
+     * @throws MessageEOFException         if unexpected end of message stream has been reached.
+     * @throws MessageFormatException      if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
      */
     public float readFloat() throws JMSException {
         initializeReading();
@@ -511,11 +475,7 @@ public class BlazeJmsStreamMessage exten
                 throw new MessageFormatException(" not a float type");
             }
         } catch (NumberFormatException mfe) {
-            try {
-                this.dataIn.reset();
-            } catch (IOException ioe) {
-                throw BlazeJmsExceptionSupport.create(ioe);
-            }
+            this.dataIn.reset();
             throw mfe;
         } catch (EOFException e) {
             throw BlazeJmsExceptionSupport.createMessageEOFException(e);
@@ -526,16 +486,13 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Reads a <code>double</code> from the stream message.
-     * 
+     *
      * @return a <code>double</code> value from the stream message
-     * @throws JMSException
-     *             if the JMS provider fails to read the message due to some internal error.
-     * @throws MessageEOFException
-     *             if unexpected end of message stream has been reached.
-     * @throws MessageFormatException
-     *             if this type conversion is invalid.
-     * @throws MessageNotReadableException
-     *             if the message is in write-only mode.
+     * @throws JMSException                if the JMS provider fails to read the message due to some
+     *                                     internal error.
+     * @throws MessageEOFException         if unexpected end of message stream has been reached.
+     * @throws MessageFormatException      if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
      */
     public double readDouble() throws JMSException {
         initializeReading();
@@ -562,11 +519,7 @@ public class BlazeJmsStreamMessage exten
                 throw new MessageFormatException(" not a double type");
             }
         } catch (NumberFormatException mfe) {
-            try {
-                this.dataIn.reset();
-            } catch (IOException ioe) {
-                throw BlazeJmsExceptionSupport.create(ioe);
-            }
+            this.dataIn.reset();
             throw mfe;
         } catch (EOFException e) {
             throw BlazeJmsExceptionSupport.createMessageEOFException(e);
@@ -577,16 +530,13 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Reads a <CODE>String</CODE> from the stream message.
-     * 
+     *
      * @return a Unicode string from the stream message
-     * @throws JMSException
-     *             if the JMS provider fails to read the message due to some internal error.
-     * @throws MessageEOFException
-     *             if unexpected end of message stream has been reached.
-     * @throws MessageFormatException
-     *             if this type conversion is invalid.
-     * @throws MessageNotReadableException
-     *             if the message is in write-only mode.
+     * @throws JMSException                if the JMS provider fails to read the message due to some
+     *                                     internal error.
+     * @throws MessageEOFException         if unexpected end of message stream has been reached.
+     * @throws MessageFormatException      if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
      */
     public String readString() throws JMSException {
         initializeReading();
@@ -633,11 +583,7 @@ public class BlazeJmsStreamMessage exten
                 throw new MessageFormatException(" not a String type");
             }
         } catch (NumberFormatException mfe) {
-            try {
-                this.dataIn.reset();
-            } catch (IOException ioe) {
-                throw BlazeJmsExceptionSupport.create(ioe);
-            }
+            this.dataIn.reset();
             throw mfe;
         } catch (EOFException e) {
             throw BlazeJmsExceptionSupport.createMessageEOFException(e);
@@ -647,104 +593,104 @@ public class BlazeJmsStreamMessage exten
     }
 
     /**
-     * Reads a byte array field from the stream message into the specified <CODE>byte[]</CODE> object (the read
-     * buffer). <p/>
-     * <P>
-     * To read the field value, <CODE>readBytes</CODE> should be successively called until it returns a value less
-     * than the length of the read buffer. The value of the bytes in the buffer following the last byte read is
-     * undefined. <p/>
-     * <P>
-     * If <CODE>readBytes</CODE> returns a value equal to the length of the buffer, a subsequent
-     * <CODE>readBytes</CODE> call must be made. If there are no more bytes to be read, this call returns -1. <p/>
-     * <P>
-     * If the byte array field value is null, <CODE>readBytes</CODE> returns -1. <p/>
-     * <P>
-     * If the byte array field value is empty, <CODE>readBytes</CODE> returns 0. <p/>
-     * <P>
-     * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE> field value has been made, the full value
-     * of the field must be read before it is valid to read the next field. An attempt to read the next field before
-     * that has been done will throw a <CODE>MessageFormatException</CODE>. <p/>
-     * <P>
-     * To read the byte field value into a new <CODE>byte[]</CODE> object, use the <CODE>readObject</CODE> method.
-     * 
-     * @param value
-     *            the buffer into which the data is read
-     * @return the total number of bytes read into the buffer, or -1 if there is no more data because the end of the
-     *         byte field has been reached
-     * @throws JMSException
-     *             if the JMS provider fails to read the message due to some internal error.
-     * @throws MessageEOFException
-     *             if unexpected end of message stream has been reached.
-     * @throws MessageFormatException
-     *             if this type conversion is invalid.
-     * @throws MessageNotReadableException
-     *             if the message is in write-only mode.
+     * Reads a byte array field from the stream message into the specified
+     * <CODE>byte[]</CODE> object (the read buffer).
+     * <p/>
+     * <p/>
+     * To read the field value, <CODE>readBytes</CODE> should be successively
+     * called until it returns a value less than the length of the read buffer.
+     * The value of the bytes in the buffer following the last byte read is
+     * undefined.
+     * <p/>
+     * <p/>
+     * If <CODE>readBytes</CODE> returns a value equal to the length of the
+     * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there
+     * are no more bytes to be read, this call returns -1.
+     * <p/>
+     * <p/>
+     * If the byte array field value is null, <CODE>readBytes</CODE> returns -1.
+     * <p/>
+     * <p/>
+     * If the byte array field value is empty, <CODE>readBytes</CODE> returns 0.
+     * <p/>
+     * <p/>
+     * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE> field
+     * value has been made, the full value of the field must be read before it
+     * is valid to read the next field. An attempt to read the next field before
+     * that has been done will throw a <CODE>MessageFormatException</CODE>.
+     * <p/>
+     * <p/>
+     * To read the byte field value into a new <CODE>byte[]</CODE> object, use
+     * the <CODE>readObject</CODE> method.
+     *
+     * @param value the buffer into which the data is read
+     * @return the total number of bytes read into the buffer, or -1 if there is
+     *         no more data because the end of the byte field has been reached
+     * @throws JMSException                if the JMS provider fails to read the message due to some
+     *                                     internal error.
+     * @throws MessageEOFException         if unexpected end of message stream has been reached.
+     * @throws MessageFormatException      if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
      * @see #readObject()
      */
     public int readBytes(byte[] value) throws JMSException {
         initializeReading();
-        try {
-            if (value == null) {
-                throw new NullPointerException();
+        if (value == null) {
+            throw new NullPointerException();
+        }
+        if (remainingBytes == -1) {
+            this.dataIn.mark(value.length + 1);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new MessageEOFException("reached end of data");
             }
-            if (remainingBytes == -1) {
-                this.dataIn.mark(value.length + 1);
-                int type = this.dataIn.read();
-                if (type == -1) {
-                    throw new MessageEOFException("reached end of data");
-                }
-                if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
-                    throw new MessageFormatException("Not a byte array");
-                }
-                remainingBytes = this.dataIn.readInt();
-            } else if (remainingBytes == 0) {
-                remainingBytes = -1;
-                return -1;
-            }
-            if (value.length <= remainingBytes) {
-                // small buffer
-                remainingBytes -= value.length;
-                this.dataIn.readFully(value);
-                return value.length;
-            } else {
-                // big buffer
-                int rc = this.dataIn.read(value, 0, remainingBytes);
-                remainingBytes = 0;
-                return rc;
+            if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
+                throw new MessageFormatException("Not a byte array");
             }
-        } catch (EOFException e) {
-            JMSException jmsEx = new MessageEOFException(e.getMessage());
-            jmsEx.setLinkedException(e);
-            throw jmsEx;
-        } catch (IOException e) {
-            JMSException jmsEx = new MessageFormatException(e.getMessage());
-            jmsEx.setLinkedException(e);
-            throw jmsEx;
+            remainingBytes = this.dataIn.readInt();
+        } else if (remainingBytes == 0) {
+            remainingBytes = -1;
+            return -1;
+        }
+        if (value.length <= remainingBytes) {
+            // small buffer
+            remainingBytes -= value.length;
+            this.dataIn.readFully(value);
+            return value.length;
+        } else {
+            // big buffer
+            int rc = this.dataIn.read(value, 0, remainingBytes);
+            remainingBytes = 0;
+            return rc;
         }
     }
 
     /**
-     * Reads an object from the stream message. <p/>
-     * <P>
-     * This method can be used to return, in objectified format, an object in the Java programming language ("Java
-     * object") that has been written to the stream with the equivalent <CODE>writeObject</CODE> method call, or its
-     * equivalent primitive <CODE>write<I>type</I></CODE> method. <p/>
-     * <P>
-     * Note that byte values are returned as <CODE>byte[]</CODE>, not <CODE>Byte[]</CODE>. <p/>
-     * <P>
-     * An attempt to call <CODE>readObject</CODE> to read a byte field value into a new <CODE>byte[]</CODE> object
-     * before the full value of the byte field has been read will throw a <CODE>MessageFormatException</CODE>.
-     * 
-     * @return a Java object from the stream message, in objectified format (for example, if the object was written as
-     *         an <CODE>int</CODE>, an <CODE>Integer</CODE> is returned)
-     * @throws JMSException
-     *             if the JMS provider fails to read the message due to some internal error.
-     * @throws MessageEOFException
-     *             if unexpected end of message stream has been reached.
-     * @throws MessageFormatException
-     *             if this type conversion is invalid.
-     * @throws MessageNotReadableException
-     *             if the message is in write-only mode.
+     * Reads an object from the stream message.
+     * <p/>
+     * <p/>
+     * This method can be used to return, in objectified format, an object in
+     * the Java programming language ("Java object") that has been written to
+     * the stream with the equivalent <CODE>writeObject</CODE> method call, or
+     * its equivalent primitive <CODE>write<I>type</I></CODE> method.
+     * <p/>
+     * <p/>
+     * Note that byte values are returned as <CODE>byte[]</CODE>, not
+     * <CODE>Byte[]</CODE>.
+     * <p/>
+     * <p/>
+     * An attempt to call <CODE>readObject</CODE> to read a byte field value
+     * into a new <CODE>byte[]</CODE> object before the full value of the byte
+     * field has been read will throw a <CODE>MessageFormatException</CODE>.
+     *
+     * @return a Java object from the stream message, in objectified format (for
+     *         example, if the object was written as an <CODE>int</CODE>, an
+     *         <CODE>Integer</CODE> is returned)
+     * @throws JMSException                if the JMS provider fails to read the message due to some
+     *                                     internal error.
+     * @throws MessageEOFException         if unexpected end of message stream has been reached.
+     * @throws MessageFormatException      if this type conversion is invalid.
+     * @throws MessageNotReadableException if the message is in write-only mode.
      * @see #readBytes(byte[] value)
      */
     public Object readObject() throws JMSException {
@@ -798,11 +744,7 @@ public class BlazeJmsStreamMessage exten
                 throw new MessageFormatException("unknown type");
             }
         } catch (NumberFormatException mfe) {
-            try {
-                this.dataIn.reset();
-            } catch (IOException ioe) {
-                throw BlazeJmsExceptionSupport.create(ioe);
-            }
+            this.dataIn.reset();
             throw mfe;
         } catch (EOFException e) {
             JMSException jmsEx = new MessageEOFException(e.getMessage());
@@ -816,15 +758,14 @@ public class BlazeJmsStreamMessage exten
     }
 
     /**
-     * Writes a <code>boolean</code> to the stream message. The value <code>true</code> is written as the value
-     * <code>(byte)1</code>; the value <code>false</code> is written as the value <code>(byte)0</code>.
-     * 
-     * @param value
-     *            the <code>boolean</code> value to be written
-     * @throws JMSException
-     *             if the JMS provider fails to write the message due to some internal error.
-     * @throws MessageNotWriteableException
-     *             if the message is in read-only mode.
+     * Writes a <code>boolean</code> to the stream message. The value
+     * <code>true</code> is written as the value <code>(byte)1</code>; the value
+     * <code>false</code> is written as the value <code>(byte)0</code>.
+     *
+     * @param value the <code>boolean</code> value to be written
+     * @throws JMSException                 if the JMS provider fails to write the message due to some
+     *                                      internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
      */
     public void writeBoolean(boolean value) throws JMSException {
         initializeWriting();
@@ -837,13 +778,11 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Writes a <code>byte</code> to the stream message.
-     * 
-     * @param value
-     *            the <code>byte</code> value to be written
-     * @throws JMSException
-     *             if the JMS provider fails to write the message due to some internal error.
-     * @throws MessageNotWriteableException
-     *             if the message is in read-only mode.
+     *
+     * @param value the <code>byte</code> value to be written
+     * @throws JMSException                 if the JMS provider fails to write the message due to some
+     *                                      internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
      */
     public void writeByte(byte value) throws JMSException {
         initializeWriting();
@@ -856,13 +795,11 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Writes a <code>short</code> to the stream message.
-     * 
-     * @param value
-     *            the <code>short</code> value to be written
-     * @throws JMSException
-     *             if the JMS provider fails to write the message due to some internal error.
-     * @throws MessageNotWriteableException
-     *             if the message is in read-only mode.
+     *
+     * @param value the <code>short</code> value to be written
+     * @throws JMSException                 if the JMS provider fails to write the message due to some
+     *                                      internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
      */
     public void writeShort(short value) throws JMSException {
         initializeWriting();
@@ -875,13 +812,11 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Writes a <code>char</code> to the stream message.
-     * 
-     * @param value
-     *            the <code>char</code> value to be written
-     * @throws JMSException
-     *             if the JMS provider fails to write the message due to some internal error.
-     * @throws MessageNotWriteableException
-     *             if the message is in read-only mode.
+     *
+     * @param value the <code>char</code> value to be written
+     * @throws JMSException                 if the JMS provider fails to write the message due to some
+     *                                      internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
      */
     public void writeChar(char value) throws JMSException {
         initializeWriting();
@@ -894,13 +829,11 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Writes an <code>int</code> to the stream message.
-     * 
-     * @param value
-     *            the <code>int</code> value to be written
-     * @throws JMSException
-     *             if the JMS provider fails to write the message due to some internal error.
-     * @throws MessageNotWriteableException
-     *             if the message is in read-only mode.
+     *
+     * @param value the <code>int</code> value to be written
+     * @throws JMSException                 if the JMS provider fails to write the message due to some
+     *                                      internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
      */
     public void writeInt(int value) throws JMSException {
         initializeWriting();
@@ -913,13 +846,11 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Writes a <code>long</code> to the stream message.
-     * 
-     * @param value
-     *            the <code>long</code> value to be written
-     * @throws JMSException
-     *             if the JMS provider fails to write the message due to some internal error.
-     * @throws MessageNotWriteableException
-     *             if the message is in read-only mode.
+     *
+     * @param value the <code>long</code> value to be written
+     * @throws JMSException                 if the JMS provider fails to write the message due to some
+     *                                      internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
      */
     public void writeLong(long value) throws JMSException {
         initializeWriting();
@@ -932,13 +863,11 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Writes a <code>float</code> to the stream message.
-     * 
-     * @param value
-     *            the <code>float</code> value to be written
-     * @throws JMSException
-     *             if the JMS provider fails to write the message due to some internal error.
-     * @throws MessageNotWriteableException
-     *             if the message is in read-only mode.
+     *
+     * @param value the <code>float</code> value to be written
+     * @throws JMSException                 if the JMS provider fails to write the message due to some
+     *                                      internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
      */
     public void writeFloat(float value) throws JMSException {
         initializeWriting();
@@ -951,13 +880,11 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Writes a <code>double</code> to the stream message.
-     * 
-     * @param value
-     *            the <code>double</code> value to be written
-     * @throws JMSException
-     *             if the JMS provider fails to write the message due to some internal error.
-     * @throws MessageNotWriteableException
-     *             if the message is in read-only mode.
+     *
+     * @param value the <code>double</code> value to be written
+     * @throws JMSException                 if the JMS provider fails to write the message due to some
+     *                                      internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
      */
     public void writeDouble(double value) throws JMSException {
         initializeWriting();
@@ -970,13 +897,11 @@ public class BlazeJmsStreamMessage exten
 
     /**
      * Writes a <code>String</code> to the stream message.
-     * 
-     * @param value
-     *            the <code>String</code> value to be written
-     * @throws JMSException
-     *             if the JMS provider fails to write the message due to some internal error.
-     * @throws MessageNotWriteableException
-     *             if the message is in read-only mode.
+     *
+     * @param value the <code>String</code> value to be written
+     * @throws JMSException                 if the JMS provider fails to write the message due to some
+     *                                      internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
      */
     public void writeString(String value) throws JMSException {
         initializeWriting();
@@ -992,38 +917,37 @@ public class BlazeJmsStreamMessage exten
     }
 
     /**
-     * Writes a byte array field to the stream message. <p/>
-     * <P>
-     * The byte array <code>value</code> is written to the message as a byte array field. Consecutively written byte
-     * array fields are treated as two distinct fields when the fields are read.
-     * 
-     * @param value
-     *            the byte array value to be written
-     * @throws JMSException
-     *             if the JMS provider fails to write the message due to some internal error.
-     * @throws MessageNotWriteableException
-     *             if the message is in read-only mode.
+     * Writes a byte array field to the stream message.
+     * <p/>
+     * <p/>
+     * The byte array <code>value</code> is written to the message as a byte
+     * array field. Consecutively written byte array fields are treated as two
+     * distinct fields when the fields are read.
+     *
+     * @param value the byte array value to be written
+     * @throws JMSException                 if the JMS provider fails to write the message due to some
+     *                                      internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
      */
     public void writeBytes(byte[] value) throws JMSException {
         writeBytes(value, 0, value.length);
     }
 
     /**
-     * Writes a portion of a byte array as a byte array field to the stream message. <p/>
-     * <P>
-     * The a portion of the byte array <code>value</code> is written to the message as a byte array field.
-     * Consecutively written byte array fields are treated as two distinct fields when the fields are read.
-     * 
-     * @param value
-     *            the byte array value to be written
-     * @param offset
-     *            the initial offset within the byte array
-     * @param length
-     *            the number of bytes to use
-     * @throws JMSException
-     *             if the JMS provider fails to write the message due to some internal error.
-     * @throws MessageNotWriteableException
-     *             if the message is in read-only mode.
+     * Writes a portion of a byte array as a byte array field to the stream
+     * message.
+     * <p/>
+     * <p/>
+     * The a portion of the byte array <code>value</code> is written to the
+     * message as a byte array field. Consecutively written byte array fields
+     * are treated as two distinct fields when the fields are read.
+     *
+     * @param value  the byte array value to be written
+     * @param offset the initial offset within the byte array
+     * @param length the number of bytes to use
+     * @throws JMSException                 if the JMS provider fails to write the message due to some
+     *                                      internal error.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
      */
     public void writeBytes(byte[] value, int offset, int length) throws JMSException {
         initializeWriting();
@@ -1035,19 +959,18 @@ public class BlazeJmsStreamMessage exten
     }
 
     /**
-     * Writes an object to the stream message. <p/>
-     * <P>
-     * This method works only for the objectified primitive object types (<code>Integer</code>, <code>Double</code>,
-     * <code>Long</code>&nbsp;...), <code>String</code> objects, and byte arrays.
-     * 
-     * @param value
-     *            the Java object to be written
-     * @throws JMSException
-     *             if the JMS provider fails to write the message due to some internal error.
-     * @throws MessageFormatException
-     *             if the object is invalid.
-     * @throws MessageNotWriteableException
-     *             if the message is in read-only mode.
+     * Writes an object to the stream message.
+     * <p/>
+     * <p/>
+     * This method works only for the objectified primitive object types (
+     * <code>Integer</code>, <code>Double</code>, <code>Long</code>&nbsp;...),
+     * <code>String</code> objects, and byte arrays.
+     *
+     * @param value the Java object to be written
+     * @throws JMSException                 if the JMS provider fails to write the message due to some
+     *                                      internal error.
+     * @throws MessageFormatException       if the object is invalid.
+     * @throws MessageNotWriteableException if the message is in read-only mode.
      */
     public void writeObject(Object value) throws JMSException {
         initializeWriting();
@@ -1083,10 +1006,17 @@ public class BlazeJmsStreamMessage exten
     }
 
     /**
-     * Puts the message body in read-only mode and repositions the stream of bytes to the beginning.
+     * Puts the message body in read-only mode and repositions the stream of
+     * bytes to the beginning.
      */
-    public void reset() {
-        storeContent();
+    public void reset() throws JMSException {
+        try {
+            storeContent();
+        } catch (Exception e) {
+            JMSException ex = new JMSException(e.getMessage());
+            ex.initCause(e);
+            ex.setLinkedException(ex);
+        }
         this.bytesOut = null;
         this.dataIn = null;
         this.dataOut = null;
@@ -1104,15 +1034,7 @@ public class BlazeJmsStreamMessage exten
     protected void initializeReading() {
         if (this.dataIn == null) {
             if (this.dataIn == null) {
-                BlazeData bd = getContent();
-                Buffer data = null;
-                if (bd != null && bd.getPayload() != null) {
-                    data = bd.getPayload();
-                } else {
-                    data = new Buffer(new byte[0]);
-                }
-                BufferInputStream is = new BufferInputStream(data);
-                this.dataIn = new DataInputStream(is);
+                this.dataIn = new BufferInputStream(this.payload);
             }
         }
     }

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsStreamMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsStreamMessage.java
------------------------------------------------------------------------------
    svn:executable = *

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsTextMessage.java (from r755483, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsTextMessage.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsTextMessage.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java&r1=755483&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsTextMessage.java Fri Jul 19 18:44:21 2013
@@ -14,33 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activeblaze.jms.message;
+package org.apache.activeblaze.wire;
 
-import org.apache.activeblaze.BlazeException;
-import org.apache.activeblaze.BlazeRuntimeException;
-import org.apache.activeblaze.util.BufferOutputStream;
-import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.BufferInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
+
 import javax.jms.JMSException;
 import javax.jms.TextMessage;
-
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.BlazeRuntimeException;
 
 /**
- * 
  * @version $Revision$
  */
 public class BlazeJmsTextMessage extends BlazeJmsMessage implements TextMessage {
     protected String text;
 
+    public int getPacketType() {
+        return PacketType.JMS_TEXT.getNumber();
+    }
+
     public BlazeJmsTextMessage clone() {
         BlazeJmsTextMessage copy = new BlazeJmsTextMessage();
         try {
             copy(copy);
-        } catch (BlazeException e) {
+        } catch (Exception e) {
             throw new BlazeRuntimeException(e);
         }
         return copy;
@@ -50,17 +47,17 @@ public class BlazeJmsTextMessage extends
      * @param copy
      * @throws BlazeException
      */
-    protected void copy(BlazeJmsTextMessage copy) throws BlazeException {
+    protected void copy(BlazeJmsTextMessage copy) throws IOException {
         storeContent();
         super.copy(copy);
         copy.text = this.text;
     }
-    
-    /** 
+
+    /**
      * @return the type
      * @see org.apache.activeblaze.BlazeMessage#getType()
      */
-    public int getType(){
+    public int getType() {
         return JmsMessageType.TEXT.ordinal();
     }
 
@@ -69,53 +66,42 @@ public class BlazeJmsTextMessage extends
         setContent(null);
     }
 
-    public void storeContent() {
+    protected void storeContent() throws IOException {
         super.storeContent();
         try {
             BufferOutputStream os = new BufferOutputStream(this.text != null ? this.text.length() : 10);
-            DataOutputStream dataOut = new DataOutputStream(os);
-            MarshallingSupport.writeUTF8(dataOut, this.text);
-            BlazeDataBean data = (BlazeDataBean) getContent();
-            data.setPayload(os.toBuffer());
-            dataOut.close();
+            MarshallingSupport.writeUTF8(os, this.text);
+            this.payload = os.toBuffer();
         } catch (IOException e) {
             throw new BlazeRuntimeException(e);
         }
     }
 
-    public String getText() {
-        if (this.text == null && getContent() != null) {
-            Buffer data = null;
-            if (getContent() == null || getContent().getPayload() == null) {
-                data = getContent().getPayload();
-                if (data == null) {
-                    data = new Buffer(new byte[0]);
-                }
-            }
-            if (data == null) {
-                data = getContent().getPayload();
-            }
+    public String getText() throws JMSException {
+        if (this.text == null) {
             try {
-                BufferInputStream is = new BufferInputStream(data);
-                DataInputStream dataIn = new DataInputStream(is);
-                dataIn.close();
-                setContent(null);
-                this.text = MarshallingSupport.readUTF8(dataIn);
+                this.text = MarshallingSupport.readUTF8(new BufferInputStream(this.payload));
             } catch (IOException e) {
-                throw new BlazeRuntimeException(e);
+                JMSException ex = new JMSException(e.getMessage());
+                ex.initCause(e);
+                ex.setLinkedException(e);
+                throw ex;
             }
         }
         return this.text;
     }
 
     /**
-     * Clears out the message body. Clearing a message's body does not clear its header values or property entries. <p/>
-     * <P>
-     * If this message body was read-only, calling this method leaves the message body in the same state as an empty
-     * body in a newly created message.
-     * 
-     * @throws JMSException
-     *             if the JMS provider fails to clear the message body due to some internal error.
+     * Clears out the message body. Clearing a message's body does not clear its
+     * header values or property entries.
+     * <p/>
+     * <p/>
+     * If this message body was read-only, calling this method leaves the
+     * message body in the same state as an empty body in a newly created
+     * message.
+     *
+     * @throws JMSException if the JMS provider fails to clear the message body due to
+     *                      some internal error.
      */
     public void clearBody() throws JMSException {
         super.clearBody();

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsTextMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsTextMessage.java
------------------------------------------------------------------------------
    svn:executable = *

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BufferOutputStream.java (from r752825, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BufferOutputStream.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BufferOutputStream.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BufferOutputStream.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BufferOutputStream.java&r1=752825&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BufferOutputStream.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BufferOutputStream.java Fri Jul 19 18:44:21 2013
@@ -14,90 +14,248 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activeblaze.util;
+package org.apache.activeblaze.wire;
 
-import java.io.EOFException;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.OutputStream;
-import org.apache.activemq.protobuf.Buffer;
-
+import java.io.UTFDataFormatException;
 
 /**
- * Very similar to the java.io.ByteArrayOutputStream but this version 
- * is not thread safe and the resulting data is returned in a Buffer
- * to avoid an extra byte[] allocation.  
+ * Very similar to the java.io.ByteArrayOutputStream but this version is not
+ * thread safe and the resulting data is returned in a Buffer to avoid an extra
+ * byte[] allocation.
  */
-final public class BufferOutputStream extends OutputStream {
-
-    byte buffer[];
-    int offset;
-    int limit;
-    int pos;
+final public class BufferOutputStream extends OutputStream implements DataOutput {
+    private byte writeBuffer[] = new byte[8];
+    private byte buffer[];
+    private int offset;
+    private int pos;
 
     public BufferOutputStream(int size) {
         this(new byte[size]);
-    }   
-    
+    }
+
     public BufferOutputStream(byte[] buffer) {
         this.buffer = buffer;
-        this.limit = buffer.length;
-    }   
-    
+    }
+
     public BufferOutputStream(Buffer data) {
         this.buffer = data.data;
         this.pos = this.offset = data.offset;
-        this.limit = data.offset+data.length;
     }
-    
-    
-    public void write(int b) throws IOException {
-        int newPos = pos + 1;
+
+    /**
+     * Get the buffer
+     *
+     * @return the buffer
+     */
+    public byte[] getBuffer() {
+        return this.buffer;
+    }
+
+    /**
+     * Get the offset
+     *
+     * @return the offset
+     */
+    public int getOffset() {
+        return this.offset;
+    }
+
+    /**
+     * Get the pos
+     *
+     * @return the pos
+     */
+    public int getPos() {
+        return this.pos;
+    }
+
+    public void skip(int skip) {
+        int newPos = this.pos + skip;
         checkCapacity(newPos);
-        buffer[pos] = (byte) b;
-        pos = newPos;
+        this.pos = newPos;
     }
 
-    public void write(byte b[], int off, int len) throws IOException {
-        int newPos = pos + len;
+    public void position(int pos) {
+        int newPos = this.offset + pos;
         checkCapacity(newPos);
-        System.arraycopy(b, off, buffer, pos, len);
-        pos = newPos;
+        this.pos = newPos;
     }
-    
-    public Buffer getNextBuffer(int len) throws IOException {
-        int newPos = pos + len;
+
+    public void write(int b) {
+        int newPos = this.pos + 1;
         checkCapacity(newPos);
-        return new Buffer(buffer, pos, len);
+        this.buffer[this.pos] = (byte) b;
+        this.pos = newPos;
     }
-    
-    /**
-     * Ensures the the buffer has at least the minimumCapacity specified. 
-     * @param i
-     * @throws EOFException 
-     */
-    private void checkCapacity(int minimumCapacity) throws IOException {
-        if (minimumCapacity > buffer.length) {
-            byte b[] = new byte[Math.max(buffer.length << 1, minimumCapacity)];
-            System.arraycopy(buffer, 0, b, 0, buffer.length);
-            buffer = b;
+
+    public void write(byte b[], int off, int len) {
+        int newPos = this.pos + len;
+        checkCapacity(newPos);
+        System.arraycopy(b, off, this.buffer, this.pos, len);
+        this.pos = newPos;
+    }
+
+    public void write(Buffer buffer) {
+        if (buffer != null) {
+            writeInt(buffer.length);
+            write(buffer.data, buffer.offset, buffer.length);
+        } else {
+            writeInt(0);
         }
     }
 
+    public void writeObject(Object object) throws IOException {
+        IOUtils.writeObject(this, object);
+    }
+
     public void reset() {
-        pos = offset;
+        this.pos = this.offset;
     }
 
     public Buffer toBuffer() {
-        return new Buffer(buffer, offset, pos);
+        return new Buffer(this.buffer, this.offset, this.pos);
     }
-    
+
     public byte[] toByteArray() {
         return toBuffer().toByteArray();
     }
-    
-    public int size() {
-        return offset-pos;
+
+    public int length() {
+        return pos - offset;
+    }
+
+    public void write(byte[] b) {
+        write(b, 0, b.length);
+    }
+
+    public void writeBoolean(boolean v) {
+        write(v ? 1 : 0);
+    }
+
+    public void writeByte(int v) {
+        write(v);
+    }
+
+    public void writeBytes(String s) {
+        int len = s.length();
+        for (int i = 0; i < len; i++) {
+            write((byte) s.charAt(i));
+        }
+    }
+
+    public void writeString(String s) {
+        if (s == null) {
+            writeShort(0);
+        } else {
+            int len = s.length();
+            writeShort(len);
+            for (int i = 0; i < len; i++) {
+                write((byte) s.charAt(i));
+            }
+        }
+    }
+
+    public void writeChar(int v) {
+        write((v >>> 8) & 0xFF);
+        write((v >>> 0) & 0xFF);
+    }
+
+    public void writeChars(String s) {
+        int len = s.length();
+        for (int i = 0; i < len; i++) {
+            int v = s.charAt(i);
+            write((v >>> 8) & 0xFF);
+            write((v >>> 0) & 0xFF);
+        }
     }
-    
 
+    public void writeDouble(double v) {
+        writeLong(Double.doubleToLongBits(v));
+    }
+
+    public void writeFloat(float v) {
+        writeInt(Float.floatToIntBits(v));
+    }
+
+    public void writeInt(int v) {
+        write((v >>> 24) & 0xFF);
+        write((v >>> 16) & 0xFF);
+        write((v >>> 8) & 0xFF);
+        write((v >>> 0) & 0xFF);
+    }
+
+    /**
+     * @see java.io.DataOutput#writeLong(long)
+     */
+    public void writeLong(long v) {
+        writeBuffer[0] = (byte) (v >>> 56);
+        writeBuffer[1] = (byte) (v >>> 48);
+        writeBuffer[2] = (byte) (v >>> 40);
+        writeBuffer[3] = (byte) (v >>> 32);
+        writeBuffer[4] = (byte) (v >>> 24);
+        writeBuffer[5] = (byte) (v >>> 16);
+        writeBuffer[6] = (byte) (v >>> 8);
+        writeBuffer[7] = (byte) (v >>> 0);
+        write(writeBuffer, 0, 8);
+    }
+
+    public void writeShort(int v) {
+        write((v >>> 8) & 0xFF);
+        write((v >>> 0) & 0xFF);
+    }
+
+    public void writeUTF(String str) throws IOException {
+        int strlen = str.length();
+        int utflen = 0;
+        int c, count = 0;
+        /* use charAt instead of copying String to char array */
+        for (int i = 0; i < strlen; i++) {
+            c = str.charAt(i);
+            if ((c >= 0x0001) && (c <= 0x007F)) {
+                utflen++;
+            } else if (c > 0x07FF) {
+                utflen += 3;
+            } else {
+                utflen += 2;
+            }
+        }
+        if (utflen > 65535)
+            throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes");
+        byte[] bytearr = null;
+        bytearr = new byte[utflen + 2];
+        bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+        bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
+        int i = 0;
+        for (i = 0; i < strlen; i++) {
+            c = str.charAt(i);
+            if (!((c >= 0x0001) && (c <= 0x007F)))
+                break;
+            bytearr[count++] = (byte) c;
+        }
+        for (; i < strlen; i++) {
+            c = str.charAt(i);
+            if ((c >= 0x0001) && (c <= 0x007F)) {
+                bytearr[count++] = (byte) c;
+            } else if (c > 0x07FF) {
+                bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+                bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+                bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+            } else {
+                bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+                bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+            }
+        }
+        write(bytearr, 0, utflen + 2);
+    }
+
+    private final void checkCapacity(int minimumCapacity) {
+        if (minimumCapacity > this.buffer.length) {
+            byte b[] = new byte[Math.max(this.buffer.length << 1, minimumCapacity)];
+            System.arraycopy(this.buffer, 0, b, 0, this.buffer.length);
+            this.buffer = b;
+        }
+    }
 }

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BufferOutputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/IOUtils.java (from r752825, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/IOUtils.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/IOUtils.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java&r1=752825&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/IOUtils.java Fri Jul 19 18:44:21 2013
@@ -14,8 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activeblaze.util;
+package org.apache.activeblaze.wire;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -23,20 +25,19 @@ import java.io.InputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.BufferInputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
+import org.apache.activeblaze.util.ClassLoadingAwareObjectInputStream;
 
 /**
  * Utilities for ByteBuffers
- * 
  */
 public class IOUtils {
     /**
      * Create an InputStream to read a ByteBuffer
-     * 
-     * @param buf
-     * @return
+     *
+     * @return the InputStream
      */
     public static InputStream getByteBufferInputStream(final ByteBuffer buf) {
         return new InputStream() {
@@ -57,9 +58,6 @@ public class IOUtils {
 
     /**
      * Create an OutputStream for a ByteBuffer
-     * 
-     * @param buf
-     * @return
      */
     public static OutputStream getByteBufferOutputStream(final ByteBuffer buf) {
         return new OutputStream() {
@@ -75,12 +73,8 @@ public class IOUtils {
 
     /**
      * Create a Buffer from an Object
-     * 
-     * @param object
-     * @return
-     * @throws Exception
      */
-    public static Buffer getBuffer(Object object) throws Exception {
+    public static Buffer getBuffer(Object object) throws IOException {
         if (object != null) {
             BufferOutputStream bufferOut = new BufferOutputStream(512);
             DataOutputStream dataOut = new DataOutputStream(bufferOut);
@@ -95,19 +89,144 @@ public class IOUtils {
     }
 
     /**
+     * @param out
+     * @param object
+     * @throws IOException
+     */
+    public static void writeObject(BufferOutputStream out, Object object) throws IOException {
+        if (object != null) {
+            ByteArrayOutputStream bufferOut = new ByteArrayOutputStream(512);
+            ObjectOutputStream objOut = new ObjectOutputStream(bufferOut);
+            objOut.writeObject(object);
+            objOut.flush();
+            objOut.close();
+            byte[] data = bufferOut.toByteArray();
+            out.writeInt(data.length);
+            out.write(data);
+        } else {
+            out.writeInt(0);
+        }
+    }
+
+    /**
+     * @return an Object
+     */
+    public static Object readObject(BufferInputStream in) throws IOException {
+        Object result = null;
+        int len = in.readInt();
+        if (len > 0) {
+            byte[] rawData = new byte[len];
+            in.readFully(rawData);
+            InputStream is = new ByteArrayInputStream(rawData);
+            DataInputStream dataIn = new DataInputStream(is);
+            ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
+            try {
+                result = objIn.readObject();
+            } catch (ClassNotFoundException e) {
+                IOException ex = new IOException("Class not Found " + e.getMessage());
+                ex.initCause(e);
+                throw ex;
+            }
+        }
+        return result;
+    }
+
+    /**
      * Create an Object from a Buffer
-     * 
-     * @param buffer
-     * @return
-     * @throws Exception
+     *
+     * @return the Object
      */
-    public static Object getObject(Buffer buffer) throws Exception {
+    public static Object getObject(Buffer buffer) throws IOException {
         if (buffer != null) {
-            InputStream is = new BufferInputStream(buffer);
+            InputStream is = new ByteArrayInputStream(buffer.data, buffer.offset, buffer.length);
             DataInputStream dataIn = new DataInputStream(is);
             ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
-            return objIn.readObject();
+            try {
+                return objIn.readObject();
+            } catch (ClassNotFoundException e) {
+                IOException ex = new IOException("Class not Found " + e.getMessage());
+                ex.initCause(e);
+                throw ex;
+            }
         }
         return null;
     }
+
+    /**
+     * Write a Packet to a buffer
+     *
+     * @return the Buffer
+     */
+    public static Buffer writePacket(Packet packet) throws IOException {
+        BufferOutputStream out = new BufferOutputStream(1024);
+        out.writeByte(packet.getPacketType());
+        packet.write(out);
+        return out.toBuffer();
+    }
+
+    /**
+     * Read a packet from a buffer
+     *
+     * @return the Packet
+     */
+    public static Packet readPacket(Buffer buffer) throws IOException {
+        BufferInputStream in = new BufferInputStream(buffer);
+        int type = in.readByte();
+        Packet packet = PacketType.valueOf(type).createPacket();
+        packet.read(in);
+        return packet;
+    }
+
+    /**
+     * Compress the buffer
+     *
+     * @return compressed Buffer
+     */
+    public static Buffer compress(Buffer buffer) throws IOException {
+        Buffer result = buffer;
+        if (buffer != null) {
+            BufferOutputStream bytesOut = new BufferOutputStream(buffer.length);
+            GZIPOutputStream gzipOut = new GZIPOutputStream(bytesOut, buffer.length);
+            gzipOut.write(buffer.toByteArray());
+            gzipOut.close();
+            bytesOut.close();
+            result = bytesOut.toBuffer();
+        }
+        return result;
+    }
+
+    /**
+     * Inflate a compressed buffer
+     *
+     * @return inflated buffer
+     */
+    public static Buffer inflate(Buffer buffer) throws IOException {
+        Buffer result = buffer;
+        if (isCompressed(buffer)) {
+            InputStream bytesIn = new BufferInputStream(buffer);
+            GZIPInputStream gzipIn = new GZIPInputStream(bytesIn);
+            BufferOutputStream bytesOut = new BufferOutputStream(buffer.length);
+            byte[] data = new byte[4096];
+            int bytesRead = 0;
+            while ((bytesRead = gzipIn.read(data, 0, data.length)) > 0) {
+                bytesOut.write(data, 0, bytesRead);
+            }
+            gzipIn.close();
+            bytesIn.close();
+            result = bytesOut.toBuffer();
+            bytesOut.close();
+        }
+        return result;
+    }
+
+    static boolean isCompressed(Buffer data) {
+        boolean result = false;
+        if (data != null && data.length > 2) {
+            int ch1 = (int) (data.byteAt(0) & 0xff);
+            int ch2 = (int) (data.byteAt(1) & 0xff);
+            int magic = (ch1 | (ch2 << 8));
+            result = (magic == GZIPInputStream.GZIP_MAGIC);
+        }
+        return result;
+    }
 }

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/IOUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native