You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/07/19 20:44:24 UTC
svn commit: r1504961 [9/11] - in /activemq/activemq-blaze/trunk: ./
src/main/java/org/apache/activeblaze/
src/main/java/org/apache/activeblaze/cluster/
src/main/java/org/apache/activeblaze/group/
src/main/java/org/apache/activeblaze/impl/destination/ s...
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsStreamMessage.java (from r752825, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsStreamMessage.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsStreamMessage.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java&r1=752825&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsStreamMessage.java Fri Jul 19 18:44:21 2013
@@ -14,78 +14,91 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activeblaze.jms.message;
+package org.apache.activeblaze.wire;
-import org.apache.activeblaze.BlazeException;
-import org.apache.activeblaze.BlazeRuntimeException;
-import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
-import org.apache.activeblaze.wire.BlazeData;
-import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.BufferInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
+
import javax.jms.JMSException;
import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
import javax.jms.StreamMessage;
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.jms.BlazeJmsExceptionSupport;
/**
- * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive types in the Java programming language.
- * It is filled and read sequentially. It inherits from the <CODE>Message</CODE> interface and adds a stream message
- * body. Its methods are based largely on those found in <CODE>java.io.DataInputStream</CODE> and
- * <CODE>java.io.DataOutputStream</CODE>. <p/>
- * <P>
- * The primitive types can be read or written explicitly using methods for each type. They may also be read or written
- * generically as objects. For instance, a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to
+ * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive
+ * types in the Java programming language. It is filled and read sequentially.
+ * It inherits from the <CODE>Message</CODE> interface and adds a stream message
+ * body. Its methods are based largely on those found in
+ * <CODE>java.io.DataInputStream</CODE> and
+ * <CODE>java.io.DataOutputStream</CODE>.
+ * <p/>
+ * <p/>
+ * The primitive types can be read or written explicitly using methods for each
+ * type. They may also be read or written generically as objects. For instance,
+ * a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to
* <CODE>StreamMessage.writeObject(new
* Integer(6))</CODE>. Both forms are provided, because the explicit form is
- * convenient for static programming, and the object form is needed when types are not known at compile time. <p/>
- * <P>
- * When the message is first created, and when <CODE>clearBody</CODE> is called, the body of the message is in
- * write-only mode. After the first call to <CODE>reset</CODE> has been made, the message body is in read-only mode.
- * After a message has been sent, the client that sent it can retain and modify it without affecting the message that
- * has been sent. The same message object can be sent multiple times. When a message has been received, the provider has
- * called <CODE>reset</CODE> so that the message body is in read-only mode for the client. <p/>
- * <P>
- * If <CODE>clearBody</CODE> is called on a message in read-only mode, the message body is cleared and the message
- * body is in write-only mode. <p/>
- * <P>
- * If a client attempts to read a message in write-only mode, a <CODE>MessageNotReadableException</CODE> is thrown.
+ * convenient for static programming, and the object form is needed when types
+ * are not known at compile time.
+ * <p/>
+ * <p/>
+ * When the message is first created, and when <CODE>clearBody</CODE> is called,
+ * the body of the message is in write-only mode. After the first call to
+ * <CODE>reset</CODE> has been made, the message body is in read-only mode.
+ * After a message has been sent, the client that sent it can retain and modify
+ * it without affecting the message that has been sent. The same message object
+ * can be sent multiple times. When a message has been received, the provider
+ * has called <CODE>reset</CODE> so that the message body is in read-only mode
+ * for the client.
+ * <p/>
+ * <p/>
+ * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
+ * message body is cleared and the message body is in write-only mode.
+ * <p/>
+ * <p/>
+ * If a client attempts to read a message in write-only mode, a
+ * <CODE>MessageNotReadableException</CODE> is thrown.
+ * <p/>
+ * <p/>
+ * If a client attempts to write a message in read-only mode, a
+ * <CODE>MessageNotWriteableException</CODE> is thrown.
+ * <p/>
+ * <p/>
+ * <CODE>StreamMessage</CODE> objects support the following conversion table.
+ * The marked cases must be supported. The unmarked cases must throw a
+ * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive conversions
+ * may throw a runtime exception if the primitive's <CODE>valueOf()</CODE>
+ * method does not accept it as a valid <CODE>String</CODE> representation of
+ * the primitive.
+ * <p/>
+ * <p/>
+ * A value written as the row type can be read as the column type.
* <p/>
- * <P>
- * If a client attempts to write a message in read-only mode, a <CODE>MessageNotWriteableException</CODE> is thrown.
* <p/>
- * <P>
- * <CODE>StreamMessage</CODE> objects support the following conversion table. The marked cases must be supported. The
- * unmarked cases must throw a <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive conversions may
- * throw a runtime exception if the primitive's <CODE>valueOf()</CODE> method does not accept it as a valid
- * <CODE>String</CODE> representation of the primitive. <p/>
- * <P>
- * A value written as the row type can be read as the column type. <p/>
- *
* <PRE>
- * | | boolean byte short char int long float double String byte[]
+ * | | boolean byte short char int long float double String byte[]
* |----------------------------------------------------------------------
* |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
* |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
* X |----------------------------------------------------------------------
- *
+ * <p/>
* </PRE>
- *
* <p/>
- * <P>
- * Attempting to read a null value as a primitive type must be treated as calling the primitive's corresponding
- * <code>valueOf(String)</code> conversion method with a null value. Since <code>char</code> does not support a
- * <code>String</code> conversion, attempting to read a null value as a <code>char</code> must throw a
- * <code>NullPointerException</code>.
- *
- *
+ * <p/>
+ * <p/>
+ * Attempting to read a null value as a primitive type must be treated as
+ * calling the primitive's corresponding <code>valueOf(String)</code> conversion
+ * method with a null value. Since <code>char</code> does not support a
+ * <code>String</code> conversion, attempting to read a null value as a
+ * <code>char</code> must throw a <code>NullPointerException</code>.
+ *
* @see javax.jms.Session#createStreamMessage()
* @see javax.jms.BytesMessage
* @see javax.jms.MapMessage
@@ -96,14 +109,18 @@ import javax.jms.StreamMessage;
public class BlazeJmsStreamMessage extends BlazeJmsMessage implements StreamMessage {
protected transient DataOutputStream dataOut;
protected transient ByteArrayOutputStream bytesOut;
- protected transient DataInputStream dataIn;
+ protected transient BufferInputStream dataIn;
protected transient int remainingBytes = -1;
+ public int getPacketType() {
+ return PacketType.JMS_STREAM.getNumber();
+ }
+
public BlazeJmsStreamMessage clone() {
BlazeJmsStreamMessage copy = new BlazeJmsStreamMessage();
try {
copy(copy);
- } catch (BlazeException e) {
+ } catch (Exception e) {
throw new BlazeRuntimeException(e);
}
return copy;
@@ -113,30 +130,28 @@ public class BlazeJmsStreamMessage exten
* @param copy
* @throws BlazeException
*/
- protected void copy(BlazeJmsStreamMessage copy) throws BlazeException {
+ protected void copy(BlazeJmsStreamMessage copy) throws IOException {
storeContent();
super.copy(copy);
copy.dataOut = null;
copy.bytesOut = null;
copy.dataIn = null;
}
-
- /**
+
+ /**
* @return the type
* @see org.apache.activeblaze.BlazeMessage#getType()
*/
- public int getType(){
+ public int getType() {
return JmsMessageType.STREAM.ordinal();
}
- public void storeContent() {
+ protected void storeContent() throws IOException {
super.storeContent();
- if (this.dataOut != null && getContent() != null) {
+ if (this.dataOut != null) {
try {
this.dataOut.close();
- Buffer buffer = new Buffer(bytesOut.toByteArray());
- BlazeDataBean data = (BlazeDataBean) getContent();
- data.setPayload(buffer);
+ this.payload = new Buffer(bytesOut.toByteArray());
this.bytesOut = null;
this.dataOut = null;
} catch (IOException ioe) {
@@ -145,33 +160,24 @@ public class BlazeJmsStreamMessage exten
}
}
- protected void loadContent() {
+ protected void loadContent() throws IOException {
super.loadContent();
if (this.dataIn == null) {
- BlazeData bd = getContent();
- Buffer data = null;
- if (bd == null || bd.getPayload() == null) {
- data = bd.getPayload();
- if (data == null) {
- data = new Buffer(new byte[0]);
- }
- }
- if (data == null) {
- data = bd.getPayload();
- }
- BufferInputStream is = new BufferInputStream(data);
- this.dataIn = new DataInputStream(is);
+ this.dataIn = new BufferInputStream(this.payload);
}
}
/**
- * Clears out the message body. Clearing a message's body does not clear its header values or property entries. <p/>
- * <P>
- * If this message body was read-only, calling this method leaves the message body in the same state as an empty
- * body in a newly created message.
- *
- * @throws JMSException
- * if the JMS provider fails to clear the message body due to some internal error.
+ * Clears out the message body. Clearing a message's body does not clear its
+ * header values or property entries.
+ * <p/>
+ * <p/>
+ * If this message body was read-only, calling this method leaves the
+ * message body in the same state as an empty body in a newly created
+ * message.
+ *
+ * @throws JMSException if the JMS provider fails to clear the message body due to
+ * some internal error.
*/
public void clearBody() throws JMSException {
super.clearBody();
@@ -183,16 +189,13 @@ public class BlazeJmsStreamMessage exten
/**
* Reads a <code>boolean</code> from the stream message.
- *
+ *
* @return the <code>boolean</code> value read
- * @throws JMSException
- * if the JMS provider fails to read the message due to some internal error.
- * @throws MessageEOFException
- * if unexpected end of message stream has been reached.
- * @throws MessageFormatException
- * if this type conversion is invalid.
- * @throws MessageNotReadableException
- * if the message is in write-only mode.
+ * @throws JMSException if the JMS provider fails to read the message due to some
+ * internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
*/
public boolean readBoolean() throws JMSException {
initializeReading();
@@ -223,16 +226,14 @@ public class BlazeJmsStreamMessage exten
/**
* Reads a <code>byte</code> value from the stream message.
- *
- * @return the next byte from the stream message as a 8-bit <code>byte</code>
- * @throws JMSException
- * if the JMS provider fails to read the message due to some internal error.
- * @throws MessageEOFException
- * if unexpected end of message stream has been reached.
- * @throws MessageFormatException
- * if this type conversion is invalid.
- * @throws MessageNotReadableException
- * if the message is in write-only mode.
+ *
+ * @return the next byte from the stream message as a 8-bit
+ * <code>byte</code>
+ * @throws JMSException if the JMS provider fails to read the message due to some
+ * internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
*/
public byte readByte() throws JMSException {
initializeReading();
@@ -256,11 +257,7 @@ public class BlazeJmsStreamMessage exten
throw new MessageFormatException(" not a byte type");
}
} catch (NumberFormatException mfe) {
- try {
- this.dataIn.reset();
- } catch (IOException ioe) {
- throw BlazeJmsExceptionSupport.create(ioe);
- }
+ this.dataIn.reset();
throw mfe;
} catch (EOFException e) {
throw BlazeJmsExceptionSupport.createMessageEOFException(e);
@@ -271,16 +268,13 @@ public class BlazeJmsStreamMessage exten
/**
* Reads a 16-bit integer from the stream message.
- *
+ *
* @return a 16-bit integer from the stream message
- * @throws JMSException
- * if the JMS provider fails to read the message due to some internal error.
- * @throws MessageEOFException
- * if unexpected end of message stream has been reached.
- * @throws MessageFormatException
- * if this type conversion is invalid.
- * @throws MessageNotReadableException
- * if the message is in write-only mode.
+ * @throws JMSException if the JMS provider fails to read the message due to some
+ * internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
*/
public short readShort() throws JMSException {
initializeReading();
@@ -307,11 +301,7 @@ public class BlazeJmsStreamMessage exten
throw new MessageFormatException(" not a short type");
}
} catch (NumberFormatException mfe) {
- try {
- this.dataIn.reset();
- } catch (IOException ioe) {
- throw BlazeJmsExceptionSupport.create(ioe);
- }
+ this.dataIn.reset();
throw mfe;
} catch (EOFException e) {
throw BlazeJmsExceptionSupport.createMessageEOFException(e);
@@ -322,16 +312,13 @@ public class BlazeJmsStreamMessage exten
/**
* Reads a Unicode character value from the stream message.
- *
+ *
* @return a Unicode character from the stream message
- * @throws JMSException
- * if the JMS provider fails to read the message due to some internal error.
- * @throws MessageEOFException
- * if unexpected end of message stream has been reached.
- * @throws MessageFormatException
- * if this type conversion is invalid
- * @throws MessageNotReadableException
- * if the message is in write-only mode.
+ * @throws JMSException if the JMS provider fails to read the message due to some
+ * internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been reached.
+ * @throws MessageFormatException if this type conversion is invalid
+ * @throws MessageNotReadableException if the message is in write-only mode.
*/
public char readChar() throws JMSException {
initializeReading();
@@ -352,31 +339,21 @@ public class BlazeJmsStreamMessage exten
throw new MessageFormatException(" not a char type");
}
} catch (NumberFormatException mfe) {
- try {
- this.dataIn.reset();
- } catch (IOException ioe) {
- throw BlazeJmsExceptionSupport.create(ioe);
- }
+ this.dataIn.reset();
throw mfe;
- } catch (EOFException e) {
- throw BlazeJmsExceptionSupport.createMessageEOFException(e);
- } catch (IOException e) {
- throw BlazeJmsExceptionSupport.createMessageFormatException(e);
}
}
/**
* Reads a 32-bit integer from the stream message.
- *
- * @return a 32-bit integer value from the stream message, interpreted as an <code>int</code>
- * @throws JMSException
- * if the JMS provider fails to read the message due to some internal error.
- * @throws MessageEOFException
- * if unexpected end of message stream has been reached.
- * @throws MessageFormatException
- * if this type conversion is invalid.
- * @throws MessageNotReadableException
- * if the message is in write-only mode.
+ *
+ * @return a 32-bit integer value from the stream message, interpreted as an
+ * <code>int</code>
+ * @throws JMSException if the JMS provider fails to read the message due to some
+ * internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
*/
public int readInt() throws JMSException {
initializeReading();
@@ -406,11 +383,7 @@ public class BlazeJmsStreamMessage exten
throw new MessageFormatException(" not an int type");
}
} catch (NumberFormatException mfe) {
- try {
- this.dataIn.reset();
- } catch (IOException ioe) {
- throw BlazeJmsExceptionSupport.create(ioe);
- }
+ this.dataIn.reset();
throw mfe;
} catch (EOFException e) {
throw BlazeJmsExceptionSupport.createMessageEOFException(e);
@@ -421,16 +394,14 @@ public class BlazeJmsStreamMessage exten
/**
* Reads a 64-bit integer from the stream message.
- *
- * @return a 64-bit integer value from the stream message, interpreted as a <code>long</code>
- * @throws JMSException
- * if the JMS provider fails to read the message due to some internal error.
- * @throws MessageEOFException
- * if unexpected end of message stream has been reached.
- * @throws MessageFormatException
- * if this type conversion is invalid.
- * @throws MessageNotReadableException
- * if the message is in write-only mode.
+ *
+ * @return a 64-bit integer value from the stream message, interpreted as a
+ * <code>long</code>
+ * @throws JMSException if the JMS provider fails to read the message due to some
+ * internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
*/
public long readLong() throws JMSException {
initializeReading();
@@ -463,11 +434,7 @@ public class BlazeJmsStreamMessage exten
throw new MessageFormatException(" not a long type");
}
} catch (NumberFormatException mfe) {
- try {
- this.dataIn.reset();
- } catch (IOException ioe) {
- throw BlazeJmsExceptionSupport.create(ioe);
- }
+ this.dataIn.reset();
throw mfe;
} catch (EOFException e) {
throw BlazeJmsExceptionSupport.createMessageEOFException(e);
@@ -478,16 +445,13 @@ public class BlazeJmsStreamMessage exten
/**
* Reads a <code>float</code> from the stream message.
- *
+ *
* @return a <code>float</code> value from the stream message
- * @throws JMSException
- * if the JMS provider fails to read the message due to some internal error.
- * @throws MessageEOFException
- * if unexpected end of message stream has been reached.
- * @throws MessageFormatException
- * if this type conversion is invalid.
- * @throws MessageNotReadableException
- * if the message is in write-only mode.
+ * @throws JMSException if the JMS provider fails to read the message due to some
+ * internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
*/
public float readFloat() throws JMSException {
initializeReading();
@@ -511,11 +475,7 @@ public class BlazeJmsStreamMessage exten
throw new MessageFormatException(" not a float type");
}
} catch (NumberFormatException mfe) {
- try {
- this.dataIn.reset();
- } catch (IOException ioe) {
- throw BlazeJmsExceptionSupport.create(ioe);
- }
+ this.dataIn.reset();
throw mfe;
} catch (EOFException e) {
throw BlazeJmsExceptionSupport.createMessageEOFException(e);
@@ -526,16 +486,13 @@ public class BlazeJmsStreamMessage exten
/**
* Reads a <code>double</code> from the stream message.
- *
+ *
* @return a <code>double</code> value from the stream message
- * @throws JMSException
- * if the JMS provider fails to read the message due to some internal error.
- * @throws MessageEOFException
- * if unexpected end of message stream has been reached.
- * @throws MessageFormatException
- * if this type conversion is invalid.
- * @throws MessageNotReadableException
- * if the message is in write-only mode.
+ * @throws JMSException if the JMS provider fails to read the message due to some
+ * internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
*/
public double readDouble() throws JMSException {
initializeReading();
@@ -562,11 +519,7 @@ public class BlazeJmsStreamMessage exten
throw new MessageFormatException(" not a double type");
}
} catch (NumberFormatException mfe) {
- try {
- this.dataIn.reset();
- } catch (IOException ioe) {
- throw BlazeJmsExceptionSupport.create(ioe);
- }
+ this.dataIn.reset();
throw mfe;
} catch (EOFException e) {
throw BlazeJmsExceptionSupport.createMessageEOFException(e);
@@ -577,16 +530,13 @@ public class BlazeJmsStreamMessage exten
/**
* Reads a <CODE>String</CODE> from the stream message.
- *
+ *
* @return a Unicode string from the stream message
- * @throws JMSException
- * if the JMS provider fails to read the message due to some internal error.
- * @throws MessageEOFException
- * if unexpected end of message stream has been reached.
- * @throws MessageFormatException
- * if this type conversion is invalid.
- * @throws MessageNotReadableException
- * if the message is in write-only mode.
+ * @throws JMSException if the JMS provider fails to read the message due to some
+ * internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
*/
public String readString() throws JMSException {
initializeReading();
@@ -633,11 +583,7 @@ public class BlazeJmsStreamMessage exten
throw new MessageFormatException(" not a String type");
}
} catch (NumberFormatException mfe) {
- try {
- this.dataIn.reset();
- } catch (IOException ioe) {
- throw BlazeJmsExceptionSupport.create(ioe);
- }
+ this.dataIn.reset();
throw mfe;
} catch (EOFException e) {
throw BlazeJmsExceptionSupport.createMessageEOFException(e);
@@ -647,104 +593,104 @@ public class BlazeJmsStreamMessage exten
}
/**
- * Reads a byte array field from the stream message into the specified <CODE>byte[]</CODE> object (the read
- * buffer). <p/>
- * <P>
- * To read the field value, <CODE>readBytes</CODE> should be successively called until it returns a value less
- * than the length of the read buffer. The value of the bytes in the buffer following the last byte read is
- * undefined. <p/>
- * <P>
- * If <CODE>readBytes</CODE> returns a value equal to the length of the buffer, a subsequent
- * <CODE>readBytes</CODE> call must be made. If there are no more bytes to be read, this call returns -1. <p/>
- * <P>
- * If the byte array field value is null, <CODE>readBytes</CODE> returns -1. <p/>
- * <P>
- * If the byte array field value is empty, <CODE>readBytes</CODE> returns 0. <p/>
- * <P>
- * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE> field value has been made, the full value
- * of the field must be read before it is valid to read the next field. An attempt to read the next field before
- * that has been done will throw a <CODE>MessageFormatException</CODE>. <p/>
- * <P>
- * To read the byte field value into a new <CODE>byte[]</CODE> object, use the <CODE>readObject</CODE> method.
- *
- * @param value
- * the buffer into which the data is read
- * @return the total number of bytes read into the buffer, or -1 if there is no more data because the end of the
- * byte field has been reached
- * @throws JMSException
- * if the JMS provider fails to read the message due to some internal error.
- * @throws MessageEOFException
- * if unexpected end of message stream has been reached.
- * @throws MessageFormatException
- * if this type conversion is invalid.
- * @throws MessageNotReadableException
- * if the message is in write-only mode.
+ * Reads a byte array field from the stream message into the specified
+ * <CODE>byte[]</CODE> object (the read buffer).
+ * <p/>
+ * <p/>
+ * To read the field value, <CODE>readBytes</CODE> should be successively
+ * called until it returns a value less than the length of the read buffer.
+ * The value of the bytes in the buffer following the last byte read is
+ * undefined.
+ * <p/>
+ * <p/>
+ * If <CODE>readBytes</CODE> returns a value equal to the length of the
+ * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there
+ * are no more bytes to be read, this call returns -1.
+ * <p/>
+ * <p/>
+ * If the byte array field value is null, <CODE>readBytes</CODE> returns -1.
+ * <p/>
+ * <p/>
+ * If the byte array field value is empty, <CODE>readBytes</CODE> returns 0.
+ * <p/>
+ * <p/>
+ * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE> field
+ * value has been made, the full value of the field must be read before it
+ * is valid to read the next field. An attempt to read the next field before
+ * that has been done will throw a <CODE>MessageFormatException</CODE>.
+ * <p/>
+ * <p/>
+ * To read the byte field value into a new <CODE>byte[]</CODE> object, use
+ * the <CODE>readObject</CODE> method.
+ *
+ * @param value the buffer into which the data is read
+ * @return the total number of bytes read into the buffer, or -1 if there is
+ * no more data because the end of the byte field has been reached
+ * @throws JMSException if the JMS provider fails to read the message due to some
+ * internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
* @see #readObject()
*/
public int readBytes(byte[] value) throws JMSException {
initializeReading();
- try {
- if (value == null) {
- throw new NullPointerException();
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ if (remainingBytes == -1) {
+ this.dataIn.mark(value.length + 1);
+ int type = this.dataIn.read();
+ if (type == -1) {
+ throw new MessageEOFException("reached end of data");
}
- if (remainingBytes == -1) {
- this.dataIn.mark(value.length + 1);
- int type = this.dataIn.read();
- if (type == -1) {
- throw new MessageEOFException("reached end of data");
- }
- if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
- throw new MessageFormatException("Not a byte array");
- }
- remainingBytes = this.dataIn.readInt();
- } else if (remainingBytes == 0) {
- remainingBytes = -1;
- return -1;
- }
- if (value.length <= remainingBytes) {
- // small buffer
- remainingBytes -= value.length;
- this.dataIn.readFully(value);
- return value.length;
- } else {
- // big buffer
- int rc = this.dataIn.read(value, 0, remainingBytes);
- remainingBytes = 0;
- return rc;
+ if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
+ throw new MessageFormatException("Not a byte array");
}
- } catch (EOFException e) {
- JMSException jmsEx = new MessageEOFException(e.getMessage());
- jmsEx.setLinkedException(e);
- throw jmsEx;
- } catch (IOException e) {
- JMSException jmsEx = new MessageFormatException(e.getMessage());
- jmsEx.setLinkedException(e);
- throw jmsEx;
+ remainingBytes = this.dataIn.readInt();
+ } else if (remainingBytes == 0) {
+ remainingBytes = -1;
+ return -1;
+ }
+ if (value.length <= remainingBytes) {
+ // small buffer
+ remainingBytes -= value.length;
+ this.dataIn.readFully(value);
+ return value.length;
+ } else {
+ // big buffer
+ int rc = this.dataIn.read(value, 0, remainingBytes);
+ remainingBytes = 0;
+ return rc;
}
}
/**
- * Reads an object from the stream message. <p/>
- * <P>
- * This method can be used to return, in objectified format, an object in the Java programming language ("Java
- * object") that has been written to the stream with the equivalent <CODE>writeObject</CODE> method call, or its
- * equivalent primitive <CODE>write<I>type</I></CODE> method. <p/>
- * <P>
- * Note that byte values are returned as <CODE>byte[]</CODE>, not <CODE>Byte[]</CODE>. <p/>
- * <P>
- * An attempt to call <CODE>readObject</CODE> to read a byte field value into a new <CODE>byte[]</CODE> object
- * before the full value of the byte field has been read will throw a <CODE>MessageFormatException</CODE>.
- *
- * @return a Java object from the stream message, in objectified format (for example, if the object was written as
- * an <CODE>int</CODE>, an <CODE>Integer</CODE> is returned)
- * @throws JMSException
- * if the JMS provider fails to read the message due to some internal error.
- * @throws MessageEOFException
- * if unexpected end of message stream has been reached.
- * @throws MessageFormatException
- * if this type conversion is invalid.
- * @throws MessageNotReadableException
- * if the message is in write-only mode.
+ * Reads an object from the stream message.
+ * <p/>
+ * <p/>
+ * This method can be used to return, in objectified format, an object in
+ * the Java programming language ("Java object") that has been written to
+ * the stream with the equivalent <CODE>writeObject</CODE> method call, or
+ * its equivalent primitive <CODE>write<I>type</I></CODE> method.
+ * <p/>
+ * <p/>
+ * Note that byte values are returned as <CODE>byte[]</CODE>, not
+ * <CODE>Byte[]</CODE>.
+ * <p/>
+ * <p/>
+ * An attempt to call <CODE>readObject</CODE> to read a byte field value
+ * into a new <CODE>byte[]</CODE> object before the full value of the byte
+ * field has been read will throw a <CODE>MessageFormatException</CODE>.
+ *
+ * @return a Java object from the stream message, in objectified format (for
+ * example, if the object was written as an <CODE>int</CODE>, an
+ * <CODE>Integer</CODE> is returned)
+ * @throws JMSException if the JMS provider fails to read the message due to some
+ * internal error.
+ * @throws MessageEOFException if unexpected end of message stream has been reached.
+ * @throws MessageFormatException if this type conversion is invalid.
+ * @throws MessageNotReadableException if the message is in write-only mode.
* @see #readBytes(byte[] value)
*/
public Object readObject() throws JMSException {
@@ -798,11 +744,7 @@ public class BlazeJmsStreamMessage exten
throw new MessageFormatException("unknown type");
}
} catch (NumberFormatException mfe) {
- try {
- this.dataIn.reset();
- } catch (IOException ioe) {
- throw BlazeJmsExceptionSupport.create(ioe);
- }
+ this.dataIn.reset();
throw mfe;
} catch (EOFException e) {
JMSException jmsEx = new MessageEOFException(e.getMessage());
@@ -816,15 +758,14 @@ public class BlazeJmsStreamMessage exten
}
/**
- * Writes a <code>boolean</code> to the stream message. The value <code>true</code> is written as the value
- * <code>(byte)1</code>; the value <code>false</code> is written as the value <code>(byte)0</code>.
- *
- * @param value
- * the <code>boolean</code> value to be written
- * @throws JMSException
- * if the JMS provider fails to write the message due to some internal error.
- * @throws MessageNotWriteableException
- * if the message is in read-only mode.
+ * Writes a <code>boolean</code> to the stream message. The value
+ * <code>true</code> is written as the value <code>(byte)1</code>; the value
+ * <code>false</code> is written as the value <code>(byte)0</code>.
+ *
+ * @param value the <code>boolean</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due to some
+ * internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
*/
public void writeBoolean(boolean value) throws JMSException {
initializeWriting();
@@ -837,13 +778,11 @@ public class BlazeJmsStreamMessage exten
/**
* Writes a <code>byte</code> to the stream message.
- *
- * @param value
- * the <code>byte</code> value to be written
- * @throws JMSException
- * if the JMS provider fails to write the message due to some internal error.
- * @throws MessageNotWriteableException
- * if the message is in read-only mode.
+ *
+ * @param value the <code>byte</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due to some
+ * internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
*/
public void writeByte(byte value) throws JMSException {
initializeWriting();
@@ -856,13 +795,11 @@ public class BlazeJmsStreamMessage exten
/**
* Writes a <code>short</code> to the stream message.
- *
- * @param value
- * the <code>short</code> value to be written
- * @throws JMSException
- * if the JMS provider fails to write the message due to some internal error.
- * @throws MessageNotWriteableException
- * if the message is in read-only mode.
+ *
+ * @param value the <code>short</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due to some
+ * internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
*/
public void writeShort(short value) throws JMSException {
initializeWriting();
@@ -875,13 +812,11 @@ public class BlazeJmsStreamMessage exten
/**
* Writes a <code>char</code> to the stream message.
- *
- * @param value
- * the <code>char</code> value to be written
- * @throws JMSException
- * if the JMS provider fails to write the message due to some internal error.
- * @throws MessageNotWriteableException
- * if the message is in read-only mode.
+ *
+ * @param value the <code>char</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due to some
+ * internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
*/
public void writeChar(char value) throws JMSException {
initializeWriting();
@@ -894,13 +829,11 @@ public class BlazeJmsStreamMessage exten
/**
* Writes an <code>int</code> to the stream message.
- *
- * @param value
- * the <code>int</code> value to be written
- * @throws JMSException
- * if the JMS provider fails to write the message due to some internal error.
- * @throws MessageNotWriteableException
- * if the message is in read-only mode.
+ *
+ * @param value the <code>int</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due to some
+ * internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
*/
public void writeInt(int value) throws JMSException {
initializeWriting();
@@ -913,13 +846,11 @@ public class BlazeJmsStreamMessage exten
/**
* Writes a <code>long</code> to the stream message.
- *
- * @param value
- * the <code>long</code> value to be written
- * @throws JMSException
- * if the JMS provider fails to write the message due to some internal error.
- * @throws MessageNotWriteableException
- * if the message is in read-only mode.
+ *
+ * @param value the <code>long</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due to some
+ * internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
*/
public void writeLong(long value) throws JMSException {
initializeWriting();
@@ -932,13 +863,11 @@ public class BlazeJmsStreamMessage exten
/**
* Writes a <code>float</code> to the stream message.
- *
- * @param value
- * the <code>float</code> value to be written
- * @throws JMSException
- * if the JMS provider fails to write the message due to some internal error.
- * @throws MessageNotWriteableException
- * if the message is in read-only mode.
+ *
+ * @param value the <code>float</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due to some
+ * internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
*/
public void writeFloat(float value) throws JMSException {
initializeWriting();
@@ -951,13 +880,11 @@ public class BlazeJmsStreamMessage exten
/**
* Writes a <code>double</code> to the stream message.
- *
- * @param value
- * the <code>double</code> value to be written
- * @throws JMSException
- * if the JMS provider fails to write the message due to some internal error.
- * @throws MessageNotWriteableException
- * if the message is in read-only mode.
+ *
+ * @param value the <code>double</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due to some
+ * internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
*/
public void writeDouble(double value) throws JMSException {
initializeWriting();
@@ -970,13 +897,11 @@ public class BlazeJmsStreamMessage exten
/**
* Writes a <code>String</code> to the stream message.
- *
- * @param value
- * the <code>String</code> value to be written
- * @throws JMSException
- * if the JMS provider fails to write the message due to some internal error.
- * @throws MessageNotWriteableException
- * if the message is in read-only mode.
+ *
+ * @param value the <code>String</code> value to be written
+ * @throws JMSException if the JMS provider fails to write the message due to some
+ * internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
*/
public void writeString(String value) throws JMSException {
initializeWriting();
@@ -992,38 +917,37 @@ public class BlazeJmsStreamMessage exten
}
/**
- * Writes a byte array field to the stream message. <p/>
- * <P>
- * The byte array <code>value</code> is written to the message as a byte array field. Consecutively written byte
- * array fields are treated as two distinct fields when the fields are read.
- *
- * @param value
- * the byte array value to be written
- * @throws JMSException
- * if the JMS provider fails to write the message due to some internal error.
- * @throws MessageNotWriteableException
- * if the message is in read-only mode.
+ * Writes a byte array field to the stream message.
+ * <p/>
+ * <p/>
+ * The byte array <code>value</code> is written to the message as a byte
+ * array field. Consecutively written byte array fields are treated as two
+ * distinct fields when the fields are read.
+ *
+ * @param value the byte array value to be written
+ * @throws JMSException if the JMS provider fails to write the message due to some
+ * internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
*/
public void writeBytes(byte[] value) throws JMSException {
writeBytes(value, 0, value.length);
}
/**
- * Writes a portion of a byte array as a byte array field to the stream message. <p/>
- * <P>
- * The a portion of the byte array <code>value</code> is written to the message as a byte array field.
- * Consecutively written byte array fields are treated as two distinct fields when the fields are read.
- *
- * @param value
- * the byte array value to be written
- * @param offset
- * the initial offset within the byte array
- * @param length
- * the number of bytes to use
- * @throws JMSException
- * if the JMS provider fails to write the message due to some internal error.
- * @throws MessageNotWriteableException
- * if the message is in read-only mode.
+ * Writes a portion of a byte array as a byte array field to the stream
+ * message.
+ * <p/>
+ * <p/>
+ * The a portion of the byte array <code>value</code> is written to the
+ * message as a byte array field. Consecutively written byte array fields
+ * are treated as two distinct fields when the fields are read.
+ *
+ * @param value the byte array value to be written
+ * @param offset the initial offset within the byte array
+ * @param length the number of bytes to use
+ * @throws JMSException if the JMS provider fails to write the message due to some
+ * internal error.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
*/
public void writeBytes(byte[] value, int offset, int length) throws JMSException {
initializeWriting();
@@ -1035,19 +959,18 @@ public class BlazeJmsStreamMessage exten
}
/**
- * Writes an object to the stream message. <p/>
- * <P>
- * This method works only for the objectified primitive object types (<code>Integer</code>, <code>Double</code>,
- * <code>Long</code> ...), <code>String</code> objects, and byte arrays.
- *
- * @param value
- * the Java object to be written
- * @throws JMSException
- * if the JMS provider fails to write the message due to some internal error.
- * @throws MessageFormatException
- * if the object is invalid.
- * @throws MessageNotWriteableException
- * if the message is in read-only mode.
+ * Writes an object to the stream message.
+ * <p/>
+ * <p/>
+ * This method works only for the objectified primitive object types (
+ * <code>Integer</code>, <code>Double</code>, <code>Long</code> ...),
+ * <code>String</code> objects, and byte arrays.
+ *
+ * @param value the Java object to be written
+ * @throws JMSException if the JMS provider fails to write the message due to some
+ * internal error.
+ * @throws MessageFormatException if the object is invalid.
+ * @throws MessageNotWriteableException if the message is in read-only mode.
*/
public void writeObject(Object value) throws JMSException {
initializeWriting();
@@ -1083,10 +1006,17 @@ public class BlazeJmsStreamMessage exten
}
/**
- * Puts the message body in read-only mode and repositions the stream of bytes to the beginning.
+ * Puts the message body in read-only mode and repositions the stream of
+ * bytes to the beginning.
*/
- public void reset() {
- storeContent();
+ public void reset() throws JMSException {
+ try {
+ storeContent();
+ } catch (Exception e) {
+ JMSException ex = new JMSException(e.getMessage());
+ ex.initCause(e);
+ ex.setLinkedException(ex);
+ }
this.bytesOut = null;
this.dataIn = null;
this.dataOut = null;
@@ -1104,15 +1034,7 @@ public class BlazeJmsStreamMessage exten
protected void initializeReading() {
if (this.dataIn == null) {
if (this.dataIn == null) {
- BlazeData bd = getContent();
- Buffer data = null;
- if (bd != null && bd.getPayload() != null) {
- data = bd.getPayload();
- } else {
- data = new Buffer(new byte[0]);
- }
- BufferInputStream is = new BufferInputStream(data);
- this.dataIn = new DataInputStream(is);
+ this.dataIn = new BufferInputStream(this.payload);
}
}
}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsStreamMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsStreamMessage.java
------------------------------------------------------------------------------
svn:executable = *
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsTextMessage.java (from r755483, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsTextMessage.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsTextMessage.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java&r1=755483&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsTextMessage.java Fri Jul 19 18:44:21 2013
@@ -14,33 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activeblaze.jms.message;
+package org.apache.activeblaze.wire;
-import org.apache.activeblaze.BlazeException;
-import org.apache.activeblaze.BlazeRuntimeException;
-import org.apache.activeblaze.util.BufferOutputStream;
-import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.BufferInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
+
import javax.jms.JMSException;
import javax.jms.TextMessage;
-
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.BlazeRuntimeException;
/**
- *
* @version $Revision$
*/
public class BlazeJmsTextMessage extends BlazeJmsMessage implements TextMessage {
protected String text;
+ public int getPacketType() {
+ return PacketType.JMS_TEXT.getNumber();
+ }
+
public BlazeJmsTextMessage clone() {
BlazeJmsTextMessage copy = new BlazeJmsTextMessage();
try {
copy(copy);
- } catch (BlazeException e) {
+ } catch (Exception e) {
throw new BlazeRuntimeException(e);
}
return copy;
@@ -50,17 +47,17 @@ public class BlazeJmsTextMessage extends
* @param copy
* @throws BlazeException
*/
- protected void copy(BlazeJmsTextMessage copy) throws BlazeException {
+ protected void copy(BlazeJmsTextMessage copy) throws IOException {
storeContent();
super.copy(copy);
copy.text = this.text;
}
-
- /**
+
+ /**
* @return the type
* @see org.apache.activeblaze.BlazeMessage#getType()
*/
- public int getType(){
+ public int getType() {
return JmsMessageType.TEXT.ordinal();
}
@@ -69,53 +66,42 @@ public class BlazeJmsTextMessage extends
setContent(null);
}
- public void storeContent() {
+ protected void storeContent() throws IOException {
super.storeContent();
try {
BufferOutputStream os = new BufferOutputStream(this.text != null ? this.text.length() : 10);
- DataOutputStream dataOut = new DataOutputStream(os);
- MarshallingSupport.writeUTF8(dataOut, this.text);
- BlazeDataBean data = (BlazeDataBean) getContent();
- data.setPayload(os.toBuffer());
- dataOut.close();
+ MarshallingSupport.writeUTF8(os, this.text);
+ this.payload = os.toBuffer();
} catch (IOException e) {
throw new BlazeRuntimeException(e);
}
}
- public String getText() {
- if (this.text == null && getContent() != null) {
- Buffer data = null;
- if (getContent() == null || getContent().getPayload() == null) {
- data = getContent().getPayload();
- if (data == null) {
- data = new Buffer(new byte[0]);
- }
- }
- if (data == null) {
- data = getContent().getPayload();
- }
+ public String getText() throws JMSException {
+ if (this.text == null) {
try {
- BufferInputStream is = new BufferInputStream(data);
- DataInputStream dataIn = new DataInputStream(is);
- dataIn.close();
- setContent(null);
- this.text = MarshallingSupport.readUTF8(dataIn);
+ this.text = MarshallingSupport.readUTF8(new BufferInputStream(this.payload));
} catch (IOException e) {
- throw new BlazeRuntimeException(e);
+ JMSException ex = new JMSException(e.getMessage());
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ throw ex;
}
}
return this.text;
}
/**
- * Clears out the message body. Clearing a message's body does not clear its header values or property entries. <p/>
- * <P>
- * If this message body was read-only, calling this method leaves the message body in the same state as an empty
- * body in a newly created message.
- *
- * @throws JMSException
- * if the JMS provider fails to clear the message body due to some internal error.
+ * Clears out the message body. Clearing a message's body does not clear its
+ * header values or property entries.
+ * <p/>
+ * <p/>
+ * If this message body was read-only, calling this method leaves the
+ * message body in the same state as an empty body in a newly created
+ * message.
+ *
+ * @throws JMSException if the JMS provider fails to clear the message body due to
+ * some internal error.
*/
public void clearBody() throws JMSException {
super.clearBody();
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsTextMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BlazeJmsTextMessage.java
------------------------------------------------------------------------------
svn:executable = *
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BufferOutputStream.java (from r752825, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BufferOutputStream.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BufferOutputStream.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BufferOutputStream.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BufferOutputStream.java&r1=752825&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BufferOutputStream.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BufferOutputStream.java Fri Jul 19 18:44:21 2013
@@ -14,90 +14,248 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activeblaze.util;
+package org.apache.activeblaze.wire;
-import java.io.EOFException;
+import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
-import org.apache.activemq.protobuf.Buffer;
-
+import java.io.UTFDataFormatException;
/**
- * Very similar to the java.io.ByteArrayOutputStream but this version
- * is not thread safe and the resulting data is returned in a Buffer
- * to avoid an extra byte[] allocation.
+ * Very similar to the java.io.ByteArrayOutputStream but this version is not
+ * thread safe and the resulting data is returned in a Buffer to avoid an extra
+ * byte[] allocation.
*/
-final public class BufferOutputStream extends OutputStream {
-
- byte buffer[];
- int offset;
- int limit;
- int pos;
+final public class BufferOutputStream extends OutputStream implements DataOutput {
+ private byte writeBuffer[] = new byte[8];
+ private byte buffer[];
+ private int offset;
+ private int pos;
public BufferOutputStream(int size) {
this(new byte[size]);
- }
-
+ }
+
public BufferOutputStream(byte[] buffer) {
this.buffer = buffer;
- this.limit = buffer.length;
- }
-
+ }
+
public BufferOutputStream(Buffer data) {
this.buffer = data.data;
this.pos = this.offset = data.offset;
- this.limit = data.offset+data.length;
}
-
-
- public void write(int b) throws IOException {
- int newPos = pos + 1;
+
+ /**
+ * Get the buffer
+ *
+ * @return the buffer
+ */
+ public byte[] getBuffer() {
+ return this.buffer;
+ }
+
+ /**
+ * Get the offset
+ *
+ * @return the offset
+ */
+ public int getOffset() {
+ return this.offset;
+ }
+
+ /**
+ * Get the pos
+ *
+ * @return the pos
+ */
+ public int getPos() {
+ return this.pos;
+ }
+
+ public void skip(int skip) {
+ int newPos = this.pos + skip;
checkCapacity(newPos);
- buffer[pos] = (byte) b;
- pos = newPos;
+ this.pos = newPos;
}
- public void write(byte b[], int off, int len) throws IOException {
- int newPos = pos + len;
+ public void position(int pos) {
+ int newPos = this.offset + pos;
checkCapacity(newPos);
- System.arraycopy(b, off, buffer, pos, len);
- pos = newPos;
+ this.pos = newPos;
}
-
- public Buffer getNextBuffer(int len) throws IOException {
- int newPos = pos + len;
+
+ public void write(int b) {
+ int newPos = this.pos + 1;
checkCapacity(newPos);
- return new Buffer(buffer, pos, len);
+ this.buffer[this.pos] = (byte) b;
+ this.pos = newPos;
}
-
- /**
- * Ensures the the buffer has at least the minimumCapacity specified.
- * @param i
- * @throws EOFException
- */
- private void checkCapacity(int minimumCapacity) throws IOException {
- if (minimumCapacity > buffer.length) {
- byte b[] = new byte[Math.max(buffer.length << 1, minimumCapacity)];
- System.arraycopy(buffer, 0, b, 0, buffer.length);
- buffer = b;
+
+ public void write(byte b[], int off, int len) {
+ int newPos = this.pos + len;
+ checkCapacity(newPos);
+ System.arraycopy(b, off, this.buffer, this.pos, len);
+ this.pos = newPos;
+ }
+
+ public void write(Buffer buffer) {
+ if (buffer != null) {
+ writeInt(buffer.length);
+ write(buffer.data, buffer.offset, buffer.length);
+ } else {
+ writeInt(0);
}
}
+ public void writeObject(Object object) throws IOException {
+ IOUtils.writeObject(this, object);
+ }
+
public void reset() {
- pos = offset;
+ this.pos = this.offset;
}
public Buffer toBuffer() {
- return new Buffer(buffer, offset, pos);
+ return new Buffer(this.buffer, this.offset, this.pos);
}
-
+
public byte[] toByteArray() {
return toBuffer().toByteArray();
}
-
- public int size() {
- return offset-pos;
+
+ public int length() {
+ return pos - offset;
+ }
+
+ public void write(byte[] b) {
+ write(b, 0, b.length);
+ }
+
+ public void writeBoolean(boolean v) {
+ write(v ? 1 : 0);
+ }
+
+ public void writeByte(int v) {
+ write(v);
+ }
+
+ public void writeBytes(String s) {
+ int len = s.length();
+ for (int i = 0; i < len; i++) {
+ write((byte) s.charAt(i));
+ }
+ }
+
+ public void writeString(String s) {
+ if (s == null) {
+ writeShort(0);
+ } else {
+ int len = s.length();
+ writeShort(len);
+ for (int i = 0; i < len; i++) {
+ write((byte) s.charAt(i));
+ }
+ }
+ }
+
+ public void writeChar(int v) {
+ write((v >>> 8) & 0xFF);
+ write((v >>> 0) & 0xFF);
+ }
+
+ public void writeChars(String s) {
+ int len = s.length();
+ for (int i = 0; i < len; i++) {
+ int v = s.charAt(i);
+ write((v >>> 8) & 0xFF);
+ write((v >>> 0) & 0xFF);
+ }
}
-
+ public void writeDouble(double v) {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ public void writeFloat(float v) {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ public void writeInt(int v) {
+ write((v >>> 24) & 0xFF);
+ write((v >>> 16) & 0xFF);
+ write((v >>> 8) & 0xFF);
+ write((v >>> 0) & 0xFF);
+ }
+
+ /**
+ * @see java.io.DataOutput#writeLong(long)
+ */
+ public void writeLong(long v) {
+ writeBuffer[0] = (byte) (v >>> 56);
+ writeBuffer[1] = (byte) (v >>> 48);
+ writeBuffer[2] = (byte) (v >>> 40);
+ writeBuffer[3] = (byte) (v >>> 32);
+ writeBuffer[4] = (byte) (v >>> 24);
+ writeBuffer[5] = (byte) (v >>> 16);
+ writeBuffer[6] = (byte) (v >>> 8);
+ writeBuffer[7] = (byte) (v >>> 0);
+ write(writeBuffer, 0, 8);
+ }
+
+ public void writeShort(int v) {
+ write((v >>> 8) & 0xFF);
+ write((v >>> 0) & 0xFF);
+ }
+
+ public void writeUTF(String str) throws IOException {
+ int strlen = str.length();
+ int utflen = 0;
+ int c, count = 0;
+ /* use charAt instead of copying String to char array */
+ for (int i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ }
+ if (utflen > 65535)
+ throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes");
+ byte[] bytearr = null;
+ bytearr = new byte[utflen + 2];
+ bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+ bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
+ int i = 0;
+ for (i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if (!((c >= 0x0001) && (c <= 0x007F)))
+ break;
+ bytearr[count++] = (byte) c;
+ }
+ for (; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ bytearr[count++] = (byte) c;
+ } else if (c > 0x07FF) {
+ bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ } else {
+ bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ }
+ }
+ write(bytearr, 0, utflen + 2);
+ }
+
+ private final void checkCapacity(int minimumCapacity) {
+ if (minimumCapacity > this.buffer.length) {
+ byte b[] = new byte[Math.max(this.buffer.length << 1, minimumCapacity)];
+ System.arraycopy(this.buffer, 0, b, 0, this.buffer.length);
+ this.buffer = b;
+ }
+ }
}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/BufferOutputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/IOUtils.java (from r752825, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/IOUtils.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/IOUtils.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java&r1=752825&r2=1504961&rev=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/IOUtils.java Fri Jul 19 18:44:21 2013
@@ -14,8 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activeblaze.util;
+package org.apache.activeblaze.wire;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -23,20 +25,19 @@ import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.BufferInputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.activeblaze.util.ClassLoadingAwareObjectInputStream;
/**
* Utilities for ByteBuffers
- *
*/
public class IOUtils {
/**
* Create an InputStream to read a ByteBuffer
- *
- * @param buf
- * @return
+ *
+ * @return the InputStream
*/
public static InputStream getByteBufferInputStream(final ByteBuffer buf) {
return new InputStream() {
@@ -57,9 +58,6 @@ public class IOUtils {
/**
* Create an OutputStream for a ByteBuffer
- *
- * @param buf
- * @return
*/
public static OutputStream getByteBufferOutputStream(final ByteBuffer buf) {
return new OutputStream() {
@@ -75,12 +73,8 @@ public class IOUtils {
/**
* Create a Buffer from an Object
- *
- * @param object
- * @return
- * @throws Exception
*/
- public static Buffer getBuffer(Object object) throws Exception {
+ public static Buffer getBuffer(Object object) throws IOException {
if (object != null) {
BufferOutputStream bufferOut = new BufferOutputStream(512);
DataOutputStream dataOut = new DataOutputStream(bufferOut);
@@ -95,19 +89,144 @@ public class IOUtils {
}
/**
+ * @param out
+ * @param object
+ * @throws IOException
+ */
+ public static void writeObject(BufferOutputStream out, Object object) throws IOException {
+ if (object != null) {
+ ByteArrayOutputStream bufferOut = new ByteArrayOutputStream(512);
+ ObjectOutputStream objOut = new ObjectOutputStream(bufferOut);
+ objOut.writeObject(object);
+ objOut.flush();
+ objOut.close();
+ byte[] data = bufferOut.toByteArray();
+ out.writeInt(data.length);
+ out.write(data);
+ } else {
+ out.writeInt(0);
+ }
+ }
+
+ /**
+ * @return an Object
+ */
+ public static Object readObject(BufferInputStream in) throws IOException {
+ Object result = null;
+ int len = in.readInt();
+ if (len > 0) {
+ byte[] rawData = new byte[len];
+ in.readFully(rawData);
+ InputStream is = new ByteArrayInputStream(rawData);
+ DataInputStream dataIn = new DataInputStream(is);
+ ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
+ try {
+ result = objIn.readObject();
+ } catch (ClassNotFoundException e) {
+ IOException ex = new IOException("Class not Found " + e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ }
+ return result;
+ }
+
+ /**
* Create an Object from a Buffer
- *
- * @param buffer
- * @return
- * @throws Exception
+ *
+ * @return the Object
*/
- public static Object getObject(Buffer buffer) throws Exception {
+ public static Object getObject(Buffer buffer) throws IOException {
if (buffer != null) {
- InputStream is = new BufferInputStream(buffer);
+ InputStream is = new ByteArrayInputStream(buffer.data, buffer.offset, buffer.length);
DataInputStream dataIn = new DataInputStream(is);
ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
- return objIn.readObject();
+ try {
+ return objIn.readObject();
+ } catch (ClassNotFoundException e) {
+ IOException ex = new IOException("Class not Found " + e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
}
return null;
}
+
+ /**
+ * Write a Packet to a buffer
+ *
+ * @return the Buffer
+ */
+ public static Buffer writePacket(Packet packet) throws IOException {
+ BufferOutputStream out = new BufferOutputStream(1024);
+ out.writeByte(packet.getPacketType());
+ packet.write(out);
+ return out.toBuffer();
+ }
+
+ /**
+ * Read a packet from a buffer
+ *
+ * @return the Packet
+ */
+ public static Packet readPacket(Buffer buffer) throws IOException {
+ BufferInputStream in = new BufferInputStream(buffer);
+ int type = in.readByte();
+ Packet packet = PacketType.valueOf(type).createPacket();
+ packet.read(in);
+ return packet;
+ }
+
+ /**
+ * Compress the buffer
+ *
+ * @return compressed Buffer
+ */
+ public static Buffer compress(Buffer buffer) throws IOException {
+ Buffer result = buffer;
+ if (buffer != null) {
+ BufferOutputStream bytesOut = new BufferOutputStream(buffer.length);
+ GZIPOutputStream gzipOut = new GZIPOutputStream(bytesOut, buffer.length);
+ gzipOut.write(buffer.toByteArray());
+ gzipOut.close();
+ bytesOut.close();
+ result = bytesOut.toBuffer();
+ }
+ return result;
+ }
+
+ /**
+ * Inflate a compressed buffer
+ *
+ * @return inflated buffer
+ */
+ public static Buffer inflate(Buffer buffer) throws IOException {
+ Buffer result = buffer;
+ if (isCompressed(buffer)) {
+ InputStream bytesIn = new BufferInputStream(buffer);
+ GZIPInputStream gzipIn = new GZIPInputStream(bytesIn);
+ BufferOutputStream bytesOut = new BufferOutputStream(buffer.length);
+ byte[] data = new byte[4096];
+ int bytesRead = 0;
+ while ((bytesRead = gzipIn.read(data, 0, data.length)) > 0) {
+ bytesOut.write(data, 0, bytesRead);
+ }
+ gzipIn.close();
+ bytesIn.close();
+ result = bytesOut.toBuffer();
+ bytesOut.close();
+ }
+ return result;
+ }
+
+ static boolean isCompressed(Buffer data) {
+ boolean result = false;
+ if (data != null && data.length > 2) {
+ int ch1 = (int) (data.byteAt(0) & 0xff);
+ int ch2 = (int) (data.byteAt(1) & 0xff);
+ int magic = (ch1 | (ch2 << 8));
+ result = (magic == GZIPInputStream.GZIP_MAGIC);
+ }
+ return result;
+ }
}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/wire/IOUtils.java
------------------------------------------------------------------------------
svn:eol-style = native