You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/10/03 22:13:24 UTC

git commit: Refactor of BytesMessage handling to make the message facade more opaque to the JmsBytesMessage instance which allows for handling of message content scenarios being fully contained in the facade implementation.

Repository: qpid-jms
Updated Branches:
  refs/heads/master dbdfb5b83 -> 516d91087


Refactor of BytesMessage handling to make the message facade more opaque
to the JmsBytesMessage instance which allows for handling of message
content scenarios being fully contained in the facade implementation.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/516d9108
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/516d9108
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/516d9108

Branch: refs/heads/master
Commit: 516d91087db8850f403f75e1159670b5fb2dcc1b
Parents: dbdfb5b
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Oct 3 16:13:13 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Oct 3 16:13:13 2014 -0400

----------------------------------------------------------------------
 .../qpid/jms/message/JmsBytesMessage.java       | 474 ++-----------------
 .../message/facade/JmsBytesMessageFacade.java   |  78 ++-
 .../amqp/message/AmqpJmsBytesMessageFacade.java | 134 +++++-
 .../qpid/jms/message/JmsBytesMessageTest.java   |  58 ++-
 .../apache/qpid/jms/message/JmsMessageTest.java |   6 +-
 .../defaults/JmsDefaultBytesMessageFacade.java  | 100 +++-
 6 files changed, 339 insertions(+), 511 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/516d9108/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java
index eaf101e..6de4180 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java
@@ -16,80 +16,22 @@
  */
 package org.apache.qpid.jms.message;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.buffer.Unpooled;
-
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
-import javax.jms.MessageEOFException;
 import javax.jms.MessageFormatException;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
 
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.facade.JmsBytesMessageFacade;
 
-/**
- * A <CODE>BytesMessage</CODE> object is used to send a message containing a
- * stream of uninterpreted bytes. It inherits from the <CODE>Message</CODE>
- * interface and adds a bytes message body. The receiver of the message supplies
- * the interpretation of the bytes.
- * <p/>
- * The <CODE>BytesMessage</CODE> methods are based largely on those found in
- * <CODE>java.io.DataInputStream</CODE> and
- * <CODE>java.io.DataOutputStream</CODE>.
- * <p/>
- * This message type is for client encoding of existing message formats. If
- * possible, one of the other self-defining message types should be used
- * instead.
- * <p/>
- * Although the JMS API allows the use of message properties with byte messages,
- * they are typically not used, since the inclusion of properties may affect the
- * format.
- * <p/>
- * The primitive types can be written explicitly using methods for each type.
- * They may also be written generically as objects. For instance, a call to
- * <CODE>BytesMessage.writeInt(6)</CODE> is equivalent to
- * <CODE> BytesMessage.writeObject(new Integer(6))</CODE>. Both forms are
- * provided, because the explicit form is convenient for static programming, and
- * the object form is needed when types are not known at compile time.
- * <p/>
- * When the message is first created, and when <CODE>clearBody</CODE> is
- * called, the body of the message is in write-only mode. After the first call
- * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
- * After a message has been sent, the client that sent it can retain and modify
- * it without affecting the message that has been sent. The same message object
- * can be sent multiple times. When a message has been received, the provider
- * has called <CODE>reset</CODE> so that the message body is in read-only mode
- * for the client.
- * <p/>
- * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
- * message body is cleared and the message is in write-only mode.
- * <p/>
- * If a client attempts to read a message in write-only mode, a
- * <CODE>MessageNotReadableException</CODE> is thrown.
- * <p/>
- * If a client attempts to write a message in read-only mode, a
- * <CODE>MessageNotWriteableException</CODE> is thrown.
- *
- * @see javax.jms.Session#createBytesMessage()
- * @see javax.jms.MapMessage
- * @see javax.jms.Message
- * @see javax.jms.ObjectMessage
- * @see javax.jms.StreamMessage
- * @see javax.jms.TextMessage
- */
 public class JmsBytesMessage extends JmsMessage implements BytesMessage {
 
-    protected transient ByteBufOutputStream bytesOut;
+    protected transient DataOutputStream dataOut;
     protected transient DataInputStream dataIn;
-    protected transient int length;
 
     private final JmsBytesMessageFacade facade;
 
@@ -100,7 +42,6 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
 
     @Override
     public JmsBytesMessage copy() throws JMSException {
-        storeContent();
         JmsBytesMessage other = new JmsBytesMessage(facade.copy());
         other.copy(this);
         return other;
@@ -108,61 +49,33 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
 
     private void copy(JmsBytesMessage other) throws JMSException {
         super.copy(other);
-        this.bytesOut = null;
+        this.dataOut = null;
         this.dataIn = null;
     }
 
     @Override
-    public void onSend(boolean disableMsgId, boolean disableTimestamp, long producerTtl) throws JMSException {
-        this.storeContent();
-        super.onSend(disableMsgId, disableTimestamp, producerTtl);
+    public boolean equals(Object other) {
+        return super.equals(other);
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode();
     }
 
-    /**
-     * Clears out the message body. Clearing a message's body does not clear its
-     * header values or property entries.
-     * <p/>
-     * If this message body was read-only, calling this method leaves the
-     * message body in the same state as an empty body in a newly created
-     * message.
-     *
-     * @throws JMSException if the JMS provider fails to clear the message body
-     *                      due to some internal error.
-     */
     @Override
     public void clearBody() throws JMSException {
         super.clearBody();
+        this.dataOut = null;
         this.dataIn = null;
-        this.bytesOut = null;
     }
 
-    /**
-     * Gets the number of bytes of the message body when the message is in
-     * read-only mode. The value returned can be used to allocate a byte array.
-     * The value returned is the entire length of the message body, regardless
-     * of where the pointer for reading the message is currently located.
-     *
-     * @return number of bytes in the message
-     * @throws JMSException                if the JMS provider fails to read the message due to
-     *                                     some internal error.
-     * @throws MessageNotReadableException if the message is in write-only mode.
-     * @since 1.1
-     */
-
     @Override
     public long getBodyLength() throws JMSException {
         initializeReading();
-        return length;
+        return facade.getBodyLength();
     }
 
-    /**
-     * Reads a <code>boolean</code> from the bytes message stream.
-     *
-     * @return the <code>boolean</code> value read
-     * @throws JMSException                if the JMS provider fails to read the message due to
-     *                                     some internal error.
-     * @throws MessageNotReadableException if the message is in write-only mode.
-     */
     @Override
     public boolean readBoolean() throws JMSException {
         initializeReading();
@@ -175,15 +88,6 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
         }
     }
 
-    /**
-     * Reads a signed 8-bit value from the bytes message stream.
-     *
-     * @return the next byte from the bytes message stream as a signed 8-bit
-     *         <code>byte</code>
-     * @throws JMSException                if the JMS provider fails to read the message due to
-     *                                     some internal error.
-     * @throws MessageNotReadableException if the message is in write-only mode.
-     */
     @Override
     public byte readByte() throws JMSException {
         initializeReading();
@@ -196,17 +100,6 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
         }
     }
 
-    /**
-     * Reads an unsigned 8-bit number from the bytes message stream.
-     *
-     * @return the next byte from the bytes message stream, interpreted as an
-     *         unsigned 8-bit number
-     * @throws JMSException                  if the JMS provider fails to read the message due to
-     *                                       some internal error.
-     * @throws javax.jms.MessageEOFException if unexpected end of bytes stream has been
-     *                                       reached.
-     * @throws MessageNotReadableException   if the message is in write-only mode.
-     */
     @Override
     public int readUnsignedByte() throws JMSException {
         initializeReading();
@@ -219,17 +112,6 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
         }
     }
 
-    /**
-     * Reads a signed 16-bit number from the bytes message stream.
-     *
-     * @return the next two bytes from the bytes message stream, interpreted as
-     *         a signed 16-bit number
-     * @throws JMSException                if the JMS provider fails to read the message due to
-     *                                     some internal error.
-     * @throws MessageEOFException         if unexpected end of bytes stream has been
-     *                                     reached.
-     * @throws MessageNotReadableException if the message is in write-only mode.
-     */
     @Override
     public short readShort() throws JMSException {
         initializeReading();
@@ -242,17 +124,6 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
         }
     }
 
-    /**
-     * Reads an unsigned 16-bit number from the bytes message stream.
-     *
-     * @return the next two bytes from the bytes message stream, interpreted as
-     *         an unsigned 16-bit integer
-     * @throws JMSException                if the JMS provider fails to read the message due to
-     *                                     some internal error.
-     * @throws MessageEOFException         if unexpected end of bytes stream has been
-     *                                     reached.
-     * @throws MessageNotReadableException if the message is in write-only mode.
-     */
     @Override
     public int readUnsignedShort() throws JMSException {
         initializeReading();
@@ -265,17 +136,6 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
         }
     }
 
-    /**
-     * Reads a Unicode character value from the bytes message stream.
-     *
-     * @return the next two bytes from the bytes message stream as a Unicode
-     *         character
-     * @throws JMSException                if the JMS provider fails to read the message due to
-     *                                     some internal error.
-     * @throws MessageEOFException         if unexpected end of bytes stream has been
-     *                                     reached.
-     * @throws MessageNotReadableException if the message is in write-only mode.
-     */
     @Override
     public char readChar() throws JMSException {
         initializeReading();
@@ -288,17 +148,6 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
         }
     }
 
-    /**
-     * Reads a signed 32-bit integer from the bytes message stream.
-     *
-     * @return the next four bytes from the bytes message stream, interpreted as
-     *         an <code>int</code>
-     * @throws JMSException                if the JMS provider fails to read the message due to
-     *                                     some internal error.
-     * @throws MessageEOFException         if unexpected end of bytes stream has been
-     *                                     reached.
-     * @throws MessageNotReadableException if the message is in write-only mode.
-     */
     @Override
     public int readInt() throws JMSException {
         initializeReading();
@@ -311,17 +160,6 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
         }
     }
 
-    /**
-     * Reads a signed 64-bit integer from the bytes message stream.
-     *
-     * @return the next eight bytes from the bytes message stream, interpreted
-     *         as a <code>long</code>
-     * @throws JMSException                if the JMS provider fails to read the message due to
-     *                                     some internal error.
-     * @throws MessageEOFException         if unexpected end of bytes stream has been
-     *                                     reached.
-     * @throws MessageNotReadableException if the message is in write-only mode.
-     */
     @Override
     public long readLong() throws JMSException {
         initializeReading();
@@ -334,17 +172,6 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
         }
     }
 
-    /**
-     * Reads a <code>float</code> from the bytes message stream.
-     *
-     * @return the next four bytes from the bytes message stream, interpreted as
-     *         a <code>float</code>
-     * @throws JMSException                if the JMS provider fails to read the message due to
-     *                                     some internal error.
-     * @throws MessageEOFException         if unexpected end of bytes stream has been
-     *                                     reached.
-     * @throws MessageNotReadableException if the message is in write-only mode.
-     */
     @Override
     public float readFloat() throws JMSException {
         initializeReading();
@@ -357,17 +184,6 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
         }
     }
 
-    /**
-     * Reads a <code>double</code> from the bytes message stream.
-     *
-     * @return the next eight bytes from the bytes message stream, interpreted
-     *         as a <code>double</code>
-     * @throws JMSException                if the JMS provider fails to read the message due to
-     *                                     some internal error.
-     * @throws MessageEOFException         if unexpected end of bytes stream has been
-     *                                     reached.
-     * @throws MessageNotReadableException if the message is in write-only mode.
-     */
     @Override
     public double readDouble() throws JMSException {
         initializeReading();
@@ -380,22 +196,6 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
         }
     }
 
-    /**
-     * Reads a string that has been encoded using a modified UTF-8 format from
-     * the bytes message stream.
-     * <p/>
-     * For more information on the UTF-8 format, see "File System Safe UCS
-     * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
-     * X/Open Company Ltd., Document Number: P316. This information also appears
-     * in ISO/IEC 10646, Annex P.
-     *
-     * @return a Unicode string from the bytes message stream
-     * @throws JMSException                if the JMS provider fails to read the message due to
-     *                                     some internal error.
-     * @throws MessageEOFException         if unexpected end of bytes stream has been
-     *                                     reached.
-     * @throws MessageNotReadableException if the message is in write-only mode.
-     */
     @Override
     public String readUTF() throws JMSException {
         initializeReading();
@@ -408,57 +208,11 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
         }
     }
 
-    /**
-     * Reads a byte array from the bytes message stream.
-     * <p/>
-     * If the length of array <code>value</code> is less than the number of
-     * bytes remaining to be read from the stream, the array should be filled. A
-     * subsequent call reads the next increment, and so on.
-     * <p/>
-     * If the number of bytes remaining in the stream is less than the length of
-     * array <code>value</code>, the bytes should be read into the array. The
-     * return value of the total number of bytes read will be less than the
-     * length of the array, indicating that there are no more bytes left to be
-     * read from the stream. The next read of the stream returns -1.
-     *
-     * @param value the buffer into which the data is read
-     * @return the total number of bytes read into the buffer, or -1 if there is
-     *         no more data because the end of the stream has been reached
-     * @throws JMSException                if the JMS provider fails to read the message due to
-     *                                     some internal error.
-     * @throws MessageNotReadableException if the message is in write-only mode.
-     */
     @Override
     public int readBytes(byte[] value) throws JMSException {
         return readBytes(value, value.length);
     }
 
-    /**
-     * Reads a portion of the bytes message stream.
-     * <p/>
-     * If the length of array <code>value</code> is less than the number of
-     * bytes remaining to be read from the stream, the array should be filled. A
-     * subsequent call reads the next increment, and so on.
-     * <p/>
-     * If the number of bytes remaining in the stream is less than the length of
-     * array <code>value</code>, the bytes should be read into the array. The
-     * return value of the total number of bytes read will be less than the
-     * length of the array, indicating that there are no more bytes left to be
-     * read from the stream. The next read of the stream returns -1. <p/> If
-     * <code>length</code> is negative, or <code>length</code> is greater
-     * than the length of the array <code>value</code>, then an
-     * <code>IndexOutOfBoundsException</code> is thrown. No bytes will be read
-     * from the stream for this exception case.
-     *
-     * @param value  the buffer into which the data is read
-     * @param length the number of bytes to read; must be less than or equal to
-     *               <code>value.length</code>
-     * @return the total number of bytes read into the buffer, or -1 if there is
-     *         no more data because the end of the stream has been reached
-     * @throws JMSException                if the JMS provider fails to read the message due to
-     *                                     some internal error.
-     * @throws MessageNotReadableException if the message is in write-only mode.
-     */
     @Override
     public int readBytes(byte[] value, int length) throws JMSException {
         initializeReading();
@@ -488,242 +242,116 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
         }
     }
 
-    /**
-     * Writes a <code>boolean</code> to the bytes message stream as a 1-byte
-     * value. The value <code>true</code> is written as the value
-     * <code>(byte)1</code>; the value <code>false</code> is written as the
-     * value <code>(byte)0</code>.
-     *
-     * @param value the <code>boolean</code> value to be written
-     * @throws JMSException                 if the JMS provider fails to write the message due
-     *                                      to some internal error.
-     * @throws MessageNotWriteableException if the message is in read-only mode.
-     */
     @Override
     public void writeBoolean(boolean value) throws JMSException {
         initializeWriting();
         try {
-            this.bytesOut.writeBoolean(value);
+            this.dataOut.writeBoolean(value);
         } catch (IOException e) {
             throw JmsExceptionSupport.createMessageFormatException(e);
         }
     }
 
-    /**
-     * Writes a <code>byte</code> to the bytes message stream as a 1-byte
-     * value.
-     *
-     * @param value the <code>byte</code> value to be written
-     * @throws JMSException                 if the JMS provider fails to write the message due
-     *                                      to some internal error.
-     * @throws MessageNotWriteableException if the message is in read-only mode.
-     */
     @Override
     public void writeByte(byte value) throws JMSException {
         initializeWriting();
         try {
-            this.bytesOut.writeByte(value);
+            this.dataOut.writeByte(value);
         } catch (IOException e) {
             throw JmsExceptionSupport.createMessageFormatException(e);
         }
     }
 
-    /**
-     * Writes a <code>short</code> to the bytes message stream as two bytes,
-     * high byte first.
-     *
-     * @param value the <code>short</code> to be written
-     * @throws JMSException                 if the JMS provider fails to write the message due
-     *                                      to some internal error.
-     * @throws MessageNotWriteableException if the message is in read-only mode.
-     */
     @Override
     public void writeShort(short value) throws JMSException {
         initializeWriting();
         try {
-            this.bytesOut.writeShort(value);
+            this.dataOut.writeShort(value);
         } catch (IOException e) {
             throw JmsExceptionSupport.createMessageFormatException(e);
         }
     }
 
-    /**
-     * Writes a <code>char</code> to the bytes message stream as a 2-byte
-     * value, high byte first.
-     *
-     * @param value the <code>char</code> value to be written
-     * @throws JMSException                 if the JMS provider fails to write the message due
-     *                                      to some internal error.
-     * @throws MessageNotWriteableException if the message is in read-only mode.
-     */
     @Override
     public void writeChar(char value) throws JMSException {
         initializeWriting();
         try {
-            this.bytesOut.writeChar(value);
+            this.dataOut.writeChar(value);
         } catch (IOException e) {
             throw JmsExceptionSupport.createMessageFormatException(e);
         }
     }
 
-    /**
-     * Writes an <code>int</code> to the bytes message stream as four bytes,
-     * high byte first.
-     *
-     * @param value the <code>int</code> to be written
-     * @throws JMSException                 if the JMS provider fails to write the message due
-     *                                      to some internal error.
-     * @throws MessageNotWriteableException if the message is in read-only mode.
-     */
     @Override
     public void writeInt(int value) throws JMSException {
         initializeWriting();
         try {
-            this.bytesOut.writeInt(value);
+            this.dataOut.writeInt(value);
         } catch (IOException e) {
             throw JmsExceptionSupport.createMessageFormatException(e);
         }
     }
 
-    /**
-     * Writes a <code>long</code> to the bytes message stream as eight bytes,
-     * high byte first.
-     *
-     * @param value the <code>long</code> to be written
-     * @throws JMSException                 if the JMS provider fails to write the message due
-     *                                      to some internal error.
-     * @throws MessageNotWriteableException if the message is in read-only mode.
-     */
     @Override
     public void writeLong(long value) throws JMSException {
         initializeWriting();
         try {
-            this.bytesOut.writeLong(value);
+            this.dataOut.writeLong(value);
         } catch (IOException e) {
             throw JmsExceptionSupport.createMessageFormatException(e);
         }
     }
 
-    /**
-     * Converts the <code>float</code> argument to an <code>int</code> using
-     * the <code>floatToIntBits</code> method in class <code>Float</code>,
-     * and then writes that <code>int</code> value to the bytes message stream
-     * as a 4-byte quantity, high byte first.
-     *
-     * @param value the <code>float</code> value to be written
-     * @throws JMSException                 if the JMS provider fails to write the message due
-     *                                      to some internal error.
-     * @throws MessageNotWriteableException if the message is in read-only mode.
-     */
     @Override
     public void writeFloat(float value) throws JMSException {
         initializeWriting();
         try {
-            this.bytesOut.writeFloat(value);
+            this.dataOut.writeFloat(value);
         } catch (IOException e) {
             throw JmsExceptionSupport.createMessageFormatException(e);
         }
     }
 
-    /**
-     * Converts the <code>double</code> argument to a <code>long</code>
-     * using the <code>doubleToLongBits</code> method in class
-     * <code>Double</code>, and then writes that <code>long</code> value to
-     * the bytes message stream as an 8-byte quantity, high byte first.
-     *
-     * @param value the <code>double</code> value to be written
-     * @throws JMSException                 if the JMS provider fails to write the message due
-     *                                      to some internal error.
-     * @throws MessageNotWriteableException if the message is in read-only mode.
-     */
     @Override
     public void writeDouble(double value) throws JMSException {
         initializeWriting();
         try {
-            this.bytesOut.writeDouble(value);
+            this.dataOut.writeDouble(value);
         } catch (IOException e) {
             throw JmsExceptionSupport.createMessageFormatException(e);
         }
     }
 
-    /**
-     * Writes a string to the bytes message stream using UTF-8 encoding in a
-     * machine-independent manner.
-     * <p/>
-     * For more information on the UTF-8 format, see "File System Safe UCS
-     * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
-     * X/Open Company Ltd., Document Number: P316. This information also appears
-     * in ISO/IEC 10646, Annex P.
-     *
-     * @param value the <code>String</code> value to be written
-     * @throws JMSException                 if the JMS provider fails to write the message due
-     *                                      to some internal error.
-     * @throws MessageNotWriteableException if the message is in read-only mode.
-     */
     @Override
     public void writeUTF(String value) throws JMSException {
         initializeWriting();
         try {
-            this.bytesOut.writeUTF(value);
+            this.dataOut.writeUTF(value);
         } catch (IOException ioe) {
             throw JmsExceptionSupport.create(ioe);
         }
     }
 
-    /**
-     * Writes a byte array to the bytes message stream.
-     *
-     * @param value the byte array to be written
-     * @throws JMSException                 if the JMS provider fails to write the message due
-     *                                      to some internal error.
-     * @throws MessageNotWriteableException if the message is in read-only mode.
-     */
     @Override
     public void writeBytes(byte[] value) throws JMSException {
         initializeWriting();
         try {
-            this.bytesOut.write(value);
+            this.dataOut.write(value);
         } catch (IOException e) {
             throw JmsExceptionSupport.createMessageFormatException(e);
         }
     }
 
-    /**
-     * Writes a portion of a byte array to the bytes message stream.
-     *
-     * @param value  the byte array value to be written
-     * @param offset the initial offset within the byte array
-     * @param length the number of bytes to use
-     * @throws JMSException                 if the JMS provider fails to write the message due
-     *                                      to some internal error.
-     * @throws MessageNotWriteableException if the message is in read-only mode.
-     */
     @Override
     public void writeBytes(byte[] value, int offset, int length) throws JMSException {
         initializeWriting();
         try {
-            this.bytesOut.write(value, offset, length);
+            this.dataOut.write(value, offset, length);
         } catch (IOException e) {
             throw JmsExceptionSupport.createMessageFormatException(e);
         }
     }
 
-    /**
-     * Writes an object to the bytes message stream.
-     * <p/>
-     * This method works only for the objectified primitive object types (<code>Integer</code>,<code>Double</code>,
-     * <code>Long</code> &nbsp;...), <code>String</code> objects, and byte
-     * arrays.
-     *
-     * @param value the object in the Java programming language ("Java object")
-     *              to be written; it must not be null
-     * @throws JMSException                   if the JMS provider fails to write the message due
-     *                                        to some internal error.
-     * @throws MessageFormatException         if the object is of an invalid type.
-     * @throws MessageNotWriteableException   if the message is in read-only mode.
-     * @throws java.lang.NullPointerException if the parameter
-     *                                        <code>value</code> is null.
-     */
     @Override
     public void writeObject(Object value) throws JMSException {
         if (value == null) {
@@ -755,16 +383,10 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
         }
     }
 
-    /**
-     * Puts the message body in read-only mode and repositions the stream of
-     * bytes to the beginning.
-     *
-     * @throws JMSException if an internal error occurs
-     */
     @Override
     public void reset() throws JMSException {
-        storeContent();
-        this.bytesOut = null;
+        this.facade.reset();
+        this.dataOut = null;
         this.dataIn = null;
         setReadOnlyBody(true);
     }
@@ -777,58 +399,20 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
 
     @Override
     public String toString() {
-        return super.toString() + " JmsBytesMessage{ " + "bytesOut = " + bytesOut + ", dataIn = " + dataIn + " }";
-    }
-
-    /**
-     * Direct view of the underlying message contents.
-     *
-     * @return a ByteBuf holding the bytes contained in this message.
-     */
-    public ByteBuf getContent() {
-        return this.facade.getContent();
-    }
-
-    /**
-     * A direct write method to the underlying message content ByteBuf.
-     *
-     * @param content
-     *        the new content to assign to this message.
-     */
-    public void setContent(ByteBuf content) {
-        this.facade.setContent(content);
+        return super.toString() + " JmsBytesMessage{ " + "bytesOut = " + dataOut + ", dataIn = " + dataIn + " }";
     }
 
     private void initializeWriting() throws JMSException {
         checkReadOnlyBody();
-        if (this.bytesOut == null) {
-            this.bytesOut = new ByteBufOutputStream(Unpooled.buffer());
+        if (this.dataOut == null) {
+            this.dataOut = new DataOutputStream(this.facade.getOutputStream());
         }
     }
 
     private void initializeReading() throws JMSException {
         checkWriteOnlyBody();
         if (dataIn == null) {
-            ByteBuf buffer = facade.getContent();
-            if (buffer == null) {
-                buffer = Unpooled.EMPTY_BUFFER;
-            } else {
-                buffer.resetReaderIndex();
-            }
-            dataIn = new DataInputStream(new ByteBufInputStream(buffer));
-            this.length = buffer.readableBytes();
-        }
-    }
-
-    private void storeContent() throws JMSException {
-        try {
-            if (bytesOut != null) {
-                bytesOut.close();
-                facade.setContent(bytesOut.buffer());
-                bytesOut = null;
-            }
-        } catch (IOException ioe) {
-            throw JmsExceptionSupport.create(ioe);
+            dataIn = new DataInputStream(this.facade.getInputStream());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/516d9108/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java
index a382ff8..fd3adde 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java
@@ -16,8 +16,10 @@
  */
 package org.apache.qpid.jms.message.facade;
 
-import io.netty.buffer.ByteBuf;
+import java.io.InputStream;
+import java.io.OutputStream;
 
+import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 
 /**
@@ -27,6 +29,10 @@ import javax.jms.JMSException;
 public interface JmsBytesMessageFacade extends JmsMessageFacade {
 
     /**
+     * Performs a copy of this message facade into a new instance.  Calling this method
+     * results in a call to reset() prior to the message copy meaning any in use streams
+     * will be closed on return.
+     *
      * @returns a deep copy of this Message Facade including a complete copy
      * of the byte contents of the wrapped message.
      */
@@ -34,21 +40,73 @@ public interface JmsBytesMessageFacade extends JmsMessageFacade {
     JmsBytesMessageFacade copy() throws JMSException;
 
     /**
-     * Retrieves the contents of this message either wrapped in or copied
-     * into a Buffer instance.  If the message contents are empty a null
-     * Buffer instance may be returned.
+     * Create and return an InputStream instance that can be used to read the contents
+     * of this message.  If an OutputStream was previously created and no call to reset
+     * has yet been made then this method will throw an exception.
+     *
+     * Multiple calls to this method should return the same InputStream instance, only
+     * when the message has been reset should the current input stream instance be discarded
+     * and a new one created on demand.  While this means the multiple concurrent readers
+     * is possible it is strongly discouraged.
+     *
+     * If the message body contains data that has been compressed and can be determined
+     * to be so by the implementation then this method will return an InputStream instance
+     * that can inflate the compressed data.
+     *
+     * @return an InputStream instance to read the message body.
+     *
+     * @throws JMSException if an error occurs creating the stream.
+     * @throws IllegalStateException if there is a current OutputStream in use.
+     */
+    InputStream getInputStream() throws JMSException;
+
+    /**
+     * Create and return a new OuputStream used to populate the body of the message. If an
+     * InputStream was previously requested this method will fail until such time as a call
+     * to reset has been requested.
+     *
+     * If an existing OuputStream has already been created then this method will return
+     * that stream until such time as the reset method has been called.
+     *
+     * @return an OutputStream instance to write the message body.
+     *
+     * @throws JMSException if an error occurs creating the stream.
+     * @throws IllegalStateException if there is a current OutputStream in use.
+     */
+    OutputStream getOutputStream() throws JMSException;
+
+    /**
+     * Reset the message state such that a call to getInputStream or getOutputStream
+     * will succeed.  If an OutputStream instance exists it is closed an the current
+     * contents are stored into the message body.
+     */
+    void reset();
+
+    /**
+     * @return the number of bytes contained in the body of the message.
+     */
+    int getBodyLength();
+
+    /**
+     * Returns a copy of the body of the message in a new byte[] instance.  If the
+     * message body is empty an empty byte[] will be returned.
      *
-     * @returns a new ByteBuf that contains the contents of this message.
+     * @returns a new byte[] that contains a copy of the contents of this message.
+     *
+     * @throws JMSException if an error occurs accessing the message body.
      */
-    ByteBuf getContent();
+    byte[] getBody() throws JMSException;
 
     /**
-     * Sets the contents of the message to the new value based on the bytes
-     * stored in the passed in ByteBuf.
+     * Sets the contents of the message from the given byte[] the given bytes are
+     * copied into the body of the message.  Any previous message body content is
+     * discarded.
      *
-     * @param contents
+     * @param content
      *        the new bytes to store in this message.
+     *
+     * @throws JMSException if an error occurs accessing the message body.
      */
-    void setContent(ByteBuf content);
+    void setBody(byte[] content) throws JMSException;
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/516d9108/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
index ea29b06..e4aebf2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
@@ -19,8 +19,18 @@ package org.apache.qpid.jms.provider.amqp.message;
 import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_BYTES_MESSAGE;
 import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufOutputStream;
 import io.netty.buffer.Unpooled;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+
 import org.apache.qpid.jms.message.facade.JmsBytesMessageFacade;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
 import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
@@ -37,7 +47,11 @@ import org.apache.qpid.proton.message.Message;
 public class AmqpJmsBytesMessageFacade extends AmqpJmsMessageFacade implements JmsBytesMessageFacade {
 
     private static final String CONTENT_TYPE = "application/octet-stream";
-    private static final Data EMPTY_DATA = new Data(new Binary(new byte[0]));
+    private static final Binary EMPTY_BODY = new Binary(new byte[0]);
+    private static final Data EMPTY_DATA = new Data(EMPTY_BODY);
+
+    private transient ByteBufInputStream bytesIn;
+    private transient ByteBufOutputStream bytesOut;
 
     /**
      * Creates a new facade instance
@@ -66,9 +80,19 @@ public class AmqpJmsBytesMessageFacade extends AmqpJmsMessageFacade implements J
 
     @Override
     public AmqpJmsBytesMessageFacade copy() {
+        reset();
         AmqpJmsBytesMessageFacade copy = new AmqpJmsBytesMessageFacade(connection);
         copyInto(copy);
-        copy.setContent(getContent());
+
+        Binary payload = getBinaryFromBody();
+        if (payload != null && payload.getLength() > 0) {
+            byte[] result = new byte[payload.getLength()];
+            System.arraycopy(payload.getArray(), payload.getArrayOffset(), result, 0, payload.getLength());
+            copy.message.setBody(new Data(new Binary(result)));
+        } else {
+            copy.message.setBody(EMPTY_DATA);
+        }
+
         return copy;
     }
 
@@ -84,35 +108,119 @@ public class AmqpJmsBytesMessageFacade extends AmqpJmsMessageFacade implements J
     }
 
     @Override
-    public ByteBuf getContent() {
-        ByteBuf result = Unpooled.EMPTY_BUFFER;
+    public void clearBody() {
+        if (bytesIn != null) {
+            try {
+                bytesIn.close();
+            } catch (IOException e) {
+            }
+            bytesIn = null;
+        }
+        if (bytesOut != null) {
+            try {
+                bytesOut.close();
+            } catch (IOException e) {
+            }
+
+            bytesOut = null;
+        }
+
+        message.setBody(EMPTY_DATA);
+    }
+
+    @Override
+    public byte[] getBody() throws JMSException {
+        if (bytesIn != null || bytesOut != null) {
+            throw new JMSException("Body cannot be read until message is reset()");
+        }
+
+        byte[] result = new byte[0];
         Binary payload = getBinaryFromBody();
         if (payload != null && payload.getLength() > 0) {
-            result = Unpooled.wrappedBuffer(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+            result = new byte[payload.getLength()];
+            System.arraycopy(payload.getArray(), payload.getArrayOffset(), result, 0, payload.getLength());
         }
 
         return result;
     }
 
     @Override
-    public void setContent(ByteBuf content) {
+    public void setBody(byte[] content) throws JMSException {
+        if (bytesIn != null || bytesOut != null) {
+            throw new JMSException("Body cannot be read until message is reset()");
+        }
+
         Data body = EMPTY_DATA;
         if (content != null) {
-            body = new Data(new Binary(content.array(), content.arrayOffset(), content.readableBytes()));
+            byte[] copy = Arrays.copyOf(content, content.length);
+            body = new Data(new Binary(copy, 0, copy.length));
         }
 
         getAmqpMessage().setBody(body);
     }
 
+    @Override
+    public InputStream getInputStream() throws JMSException {
+        if (bytesOut != null) {
+            throw new IllegalStateException("Body is being written to, cannot perform a read.");
+        }
+
+        if (bytesIn == null) {
+            Binary body = getBinaryFromBody();
+            // Duplicate the content buffer to allow for getBodyLength() validity.
+            bytesIn = new ByteBufInputStream(
+                Unpooled.wrappedBuffer(body.getArray(), body.getArrayOffset(), body.getLength()));
+        }
+
+        return bytesIn;
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws JMSException {
+        if (bytesIn != null) {
+            throw new IllegalStateException("Body is being read from, cannot perform a write.");
+        }
+
+        if (bytesOut == null) {
+            bytesOut = new ByteBufOutputStream(Unpooled.buffer());
+            message.setBody(EMPTY_DATA);
+        }
+
+        return bytesOut;
+    }
+
+    @Override
+    public void reset() {
+        if (bytesOut != null) {
+            ByteBuf writeBuf = bytesOut.buffer();
+            Binary body = new Binary(writeBuf.array(), writeBuf.arrayOffset(), writeBuf.readableBytes());
+            message.setBody(new Data(body));
+            try {
+                bytesOut.close();
+            } catch (IOException e) {
+            }
+            bytesOut = null;
+        } else if (bytesIn != null) {
+            try {
+                bytesIn.close();
+            } catch (IOException e) {
+            }
+            bytesIn = null;
+        }
+    }
+
+    @Override
+    public int getBodyLength() {
+        return getBinaryFromBody().getLength();
+    }
+
     private Binary getBinaryFromBody() {
         Section body = getAmqpMessage().getBody();
-        Binary result = null;
+        Binary result = EMPTY_BODY;
 
         if (body == null) {
             return result;
-        }
-
-        if (body instanceof Data) {
+        } else if (body instanceof Data) {
             Binary payload = ((Data) body).getValue();
             if (payload != null && payload.getLength() != 0) {
                 result = payload;
@@ -129,10 +237,10 @@ public class AmqpJmsBytesMessageFacade extends AmqpJmsMessageFacade implements J
                     result = payload;
                 }
             } else {
-                throw new IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName());
+                throw new java.lang.IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName());
             }
         } else {
-            throw new IllegalStateException("Unexpected body content type: " + body.getClass().getSimpleName());
+            throw new java.lang.IllegalStateException("Unexpected body content type: " + body.getClass().getSimpleName());
         }
 
         return result;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/516d9108/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsBytesMessageTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsBytesMessageTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsBytesMessageTest.java
index 7a103a1..acb8ffa 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsBytesMessageTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsBytesMessageTest.java
@@ -18,12 +18,8 @@ package org.apache.qpid.jms.message;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 
 import java.util.Arrays;
 
@@ -79,23 +75,23 @@ public class JmsBytesMessageTest {
 
     @Test
     public void testReadBytesUsingReceivedMessageWithBodyReturnsBytes() throws Exception {
-        ByteBuf content = Unpooled.wrappedBuffer("myBytesData".getBytes());
+        byte[] content = "myBytesData".getBytes();
         JmsDefaultBytesMessageFacade facade = new JmsDefaultBytesMessageFacade();
-        facade.setContent(content);
+        facade.setBody(content);
 
         JmsBytesMessage bytesMessage = new JmsBytesMessage(facade);
         bytesMessage.onDispatch();
 
         // retrieve the expected bytes, check they match
-        byte[] receivedBytes = new byte[content.array().length];
+        byte[] receivedBytes = new byte[content.length];
         bytesMessage.readBytes(receivedBytes);
-        assertTrue(Arrays.equals(content.array(), receivedBytes));
+        assertTrue(Arrays.equals(content, receivedBytes));
 
         // verify no more bytes remain, i.e EOS
         assertEquals("Expected input stream to be at end but data was returned",
                      END_OF_STREAM, bytesMessage.readBytes(new byte[1]));
 
-        assertEquals("Message reports unexpected length", content.array().length, bytesMessage.getBodyLength());
+        assertEquals("Message reports unexpected length", content.length, bytesMessage.getBodyLength());
     }
 
     /**
@@ -104,13 +100,13 @@ public class JmsBytesMessageTest {
      */
     @Test(expected = MessageNotWriteableException.class)
     public void testReceivedBytesMessageThrowsMessageNotWriteableExceptionOnWriteBytes() throws Exception {
-        ByteBuf content = Unpooled.wrappedBuffer("myBytesData".getBytes());
+        byte[] content = "myBytesData".getBytes();
         JmsDefaultBytesMessageFacade facade = new JmsDefaultBytesMessageFacade();
-        facade.setContent(content);
+        facade.setBody(content);
 
         JmsBytesMessage bytesMessage = new JmsBytesMessage(facade);
         bytesMessage.onDispatch();
-        bytesMessage.writeBytes(content.array());
+        bytesMessage.writeBytes(content);
     }
 
     /**
@@ -130,9 +126,9 @@ public class JmsBytesMessageTest {
      */
     @Test
     public void testClearBodyOnReceivedBytesMessageMakesMessageWritable() throws Exception {
-        ByteBuf content = Unpooled.wrappedBuffer("myBytesData".getBytes());
+        byte[] content = "myBytesData".getBytes();
         JmsDefaultBytesMessageFacade facade = new JmsDefaultBytesMessageFacade();
-        facade.setContent(content);
+        facade.setBody(content);
 
         JmsBytesMessage bytesMessage = new JmsBytesMessage(facade);
         bytesMessage.onDispatch();
@@ -147,16 +143,16 @@ public class JmsBytesMessageTest {
      */
     @Test
     public void testClearBodyOnReceivedBytesMessageClearsUnderlyingMessageBody() throws Exception {
-        ByteBuf content = Unpooled.wrappedBuffer("myBytesData".getBytes());
+        byte[] content = "myBytesData".getBytes();
         JmsDefaultBytesMessageFacade facade = new JmsDefaultBytesMessageFacade();
-        facade.setContent(content);
+        facade.setBody(content);
 
         JmsBytesMessage bytesMessage = new JmsBytesMessage(facade);
         bytesMessage.onDispatch();
 
-        assertNotNull("Expected message content but none was present", facade.getContent());
+        assertTrue("Expected message content but none was present", facade.getBody().length > 0);
         bytesMessage.clearBody();
-        assertNull("Expected no message content but was present", facade.getContent());
+        assertTrue("Expected no message content but was present", facade.getBody().length == 0);
     }
 
     /**
@@ -165,13 +161,13 @@ public class JmsBytesMessageTest {
      */
     @Test
     public void testGetBodyLengthOnClearedReceivedMessageThrowsMessageNotReadableException() throws Exception {
-        ByteBuf content = Unpooled.wrappedBuffer("myBytesData".getBytes());
+        byte[] content = "myBytesData".getBytes();
         JmsDefaultBytesMessageFacade facade = new JmsDefaultBytesMessageFacade();
-        facade.setContent(content);
+        facade.setBody(content);
 
         JmsBytesMessage bytesMessage = new JmsBytesMessage(facade);
         bytesMessage.onDispatch();
-        assertEquals("Unexpected message length", content.array().length, bytesMessage.getBodyLength());
+        assertEquals("Unexpected message length", content.length, bytesMessage.getBodyLength());
         bytesMessage.clearBody();
 
         try {
@@ -188,9 +184,9 @@ public class JmsBytesMessageTest {
      */
     @Test
     public void testResetOnReceivedBytesMessageResetsMarker() throws Exception {
-        ByteBuf content = Unpooled.wrappedBuffer("resetTestBytes".getBytes());
+        byte[] content = "myBytesData".getBytes();
         JmsDefaultBytesMessageFacade facade = new JmsDefaultBytesMessageFacade();
-        facade.setContent(content);
+        facade.setBody(content);
 
         JmsBytesMessage bytesMessage = new JmsBytesMessage(facade);
         bytesMessage.onDispatch();
@@ -198,15 +194,15 @@ public class JmsBytesMessageTest {
         // retrieve a few bytes, check they match the first few expected bytes
         byte[] partialBytes = new byte[3];
         bytesMessage.readBytes(partialBytes);
-        byte[] partialOriginalBytes = Arrays.copyOf(content.array(), 3);
+        byte[] partialOriginalBytes = Arrays.copyOf(content, 3);
         assertTrue(Arrays.equals(partialOriginalBytes, partialBytes));
 
         bytesMessage.reset();
 
         // retrieve all the expected bytes, check they match
-        byte[] resetBytes = new byte[content.array().length];
+        byte[] resetBytes = new byte[content.length];
         bytesMessage.readBytes(resetBytes);
-        assertTrue(Arrays.equals(content.array(), resetBytes));
+        assertTrue(Arrays.equals(content, resetBytes));
     }
 
     /**
@@ -215,21 +211,21 @@ public class JmsBytesMessageTest {
      */
     @Test
     public void testResetOnNewlyPopulatedBytesMessageResetsMarkerAndMakesReadable() throws Exception {
-        ByteBuf content = Unpooled.wrappedBuffer("newResetTestBytes".getBytes());
+        byte[] content = "myBytesData".getBytes();
         JmsDefaultBytesMessageFacade facade = new JmsDefaultBytesMessageFacade();
-        facade.setContent(content);
+        facade.setBody(content);
 
         JmsBytesMessage bytesMessage = new JmsBytesMessage(facade);
 
         assertFalse("Message should be writable", bytesMessage.isReadOnlyBody());
-        bytesMessage.writeBytes(content.array());
+        bytesMessage.writeBytes(content);
         bytesMessage.reset();
         assertTrue("Message should not be writable", bytesMessage.isReadOnlyBody());
 
         // retrieve the bytes, check they match
-        byte[] resetBytes = new byte[content.array().length];
+        byte[] resetBytes = new byte[content.length];
         bytesMessage.readBytes(resetBytes);
-        assertTrue(Arrays.equals(content.array(), resetBytes));
+        assertTrue(Arrays.equals(content, resetBytes));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/516d9108/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessageTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessageTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessageTest.java
index a60b676..144cee8 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessageTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessageTest.java
@@ -33,9 +33,6 @@ import javax.jms.MessageNotWriteableException;
 
 import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.JmsTopic;
-import org.apache.qpid.jms.message.JmsBytesMessage;
-import org.apache.qpid.jms.message.JmsMessage;
-import org.apache.qpid.jms.message.JmsMessageFactory;
 import org.apache.qpid.jms.message.facade.defaults.JmsDefaultMessageFactory;
 import org.junit.Before;
 import org.junit.Test;
@@ -511,7 +508,8 @@ public class JmsMessageTest {
         JmsBytesMessage message = factory.createBytesMessage();
         message.clearBody();
         assertFalse(message.isReadOnlyBody());
-        assertNull(message.getContent());
+        message.reset();
+        assertEquals(0, message.getBodyLength());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/516d9108/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultBytesMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultBytesMessageFacade.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultBytesMessageFacade.java
index 23f1817..04b949c 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultBytesMessageFacade.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultBytesMessageFacade.java
@@ -17,6 +17,16 @@
 package org.apache.qpid.jms.message.facade.defaults;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
 
 import org.apache.qpid.jms.message.facade.JmsBytesMessageFacade;
 
@@ -25,7 +35,9 @@ import org.apache.qpid.jms.message.facade.JmsBytesMessageFacade;
  */
 public final class JmsDefaultBytesMessageFacade extends JmsDefaultMessageFacade implements JmsBytesMessageFacade {
 
-    private ByteBuf content;
+    private ByteBuf content = Unpooled.EMPTY_BUFFER;
+    private ByteBufOutputStream bytesOut;
+    private ByteBufInputStream bytesIn;
 
     @Override
     public JmsMsgType getMsgType() {
@@ -34,10 +46,11 @@ public final class JmsDefaultBytesMessageFacade extends JmsDefaultMessageFacade
 
     @Override
     public JmsDefaultBytesMessageFacade copy() {
+        reset();
         JmsDefaultBytesMessageFacade copy = new JmsDefaultBytesMessageFacade();
         copyInto(copy);
         if (this.content != null) {
-            copy.setContent(this.content.copy());
+            copy.content = this.content.copy();
         }
 
         return copy;
@@ -54,16 +67,87 @@ public final class JmsDefaultBytesMessageFacade extends JmsDefaultMessageFacade
 
     @Override
     public void clearBody() {
-        this.content = null;
+        if (bytesIn != null) {
+            try {
+                bytesIn.close();
+            } catch (IOException e) {
+            }
+            bytesIn = null;
+        }
+        if (bytesOut != null) {
+            try {
+                bytesOut.close();
+            } catch (IOException e) {
+            }
+            bytesOut = null;
+        }
+
+        content = Unpooled.EMPTY_BUFFER;
     }
 
     @Override
-    public ByteBuf getContent() {
-        return content;
+    public InputStream getInputStream() throws JMSException {
+        if (bytesOut != null) {
+            throw new IllegalStateException("Body is being written to, cannot perform a read.");
+        }
+
+        if (bytesIn == null) {
+            // Duplicate the content buffer to allow for getBodyLength() validity.
+            bytesIn = new ByteBufInputStream(content.duplicate());
+        }
+
+        return bytesIn;
     }
 
     @Override
-    public void setContent(ByteBuf content) {
-        this.content = content;
+    public OutputStream getOutputStream() throws JMSException {
+        if (bytesIn != null) {
+            throw new IllegalStateException("Body is being read from, cannot perform a write.");
+        }
+
+        if (bytesOut == null) {
+            bytesOut = new ByteBufOutputStream(Unpooled.buffer());
+            content = Unpooled.EMPTY_BUFFER;
+        }
+
+        return bytesOut;
     }
-}
+
+    @Override
+    public void reset() {
+        if (bytesOut != null) {
+            content = bytesOut.buffer();
+            try {
+                bytesOut.close();
+            } catch (IOException e) {
+            }
+            bytesOut = null;
+        } else if (bytesIn != null) {
+            try {
+                bytesIn.close();
+            } catch (IOException e) {
+            }
+            bytesIn = null;
+        }
+    }
+
+    @Override
+    public int getBodyLength() {
+        return content.readableBytes();
+    }
+
+    @Override
+    public byte[] getBody() throws JMSException {
+        if (bytesIn != null || bytesOut != null) {
+            throw new JMSException("Body is in use, call reset before attempting to access it.");
+        }
+        return content.copy().array();
+    }
+
+    @Override
+    public void setBody(byte[] content) throws JMSException {
+        if (bytesIn != null || bytesOut != null) {
+            throw new JMSException("Body is in use, call reset before attempting to access it.");
+        }
+        this.content = Unpooled.copiedBuffer(content);
+    }}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org