You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/12/15 16:45:48 UTC

svn commit: r1720183 [1/3] - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/message/internal/ broker-core/s...

Author: rgodfrey
Date: Tue Dec 15 15:45:46 2015
New Revision: 1720183

URL: http://svn.apache.org/viewvc?rev=1720183&view=rev
Log:
QPID-6953 : Remove use of DataOutput for AMQP 0-8/9/9-1 encoding

Removed:
    qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/util/BytesDataOutput.java
Modified:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
    qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
    qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
    qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
    qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
    qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
    qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicAckBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicNackBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicQosBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicQosOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionCloseOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldArray.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueBindOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxCommitBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxCommitOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxRollbackBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxRollbackOkBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxSelectBody.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/TxSelectOkBody.java
    qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
    qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
    qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Tue Dec 15 15:45:46 2015
@@ -1123,14 +1123,49 @@ public abstract class AbstractBDBMessage
             return data;
         }
 
+
         @Override
-        public synchronized Collection<QpidByteBuffer> getContent()
+        public synchronized Collection<QpidByteBuffer> getContent(int offset, int length)
         {
             Collection<QpidByteBuffer> bufs = getContentAsByteBuffer();
             Collection<QpidByteBuffer> content = new ArrayList<>(bufs.size());
+            int pos = 0;
             for (QpidByteBuffer buf : bufs)
             {
-                content.add(buf.duplicate());
+                if(length > 0)
+                {
+                    int bufRemaining = buf.remaining();
+                    if (pos + bufRemaining <= offset)
+                    {
+                        pos += bufRemaining;
+                    }
+                    else if (pos >= offset)
+                    {
+                        buf = buf.duplicate();
+                        if (bufRemaining <= length)
+                        {
+                            length -= bufRemaining;
+                        }
+                        else
+                        {
+                            buf.limit(length);
+                            length = 0;
+                        }
+                        content.add(buf);
+                        pos += buf.remaining();
+
+                    }
+                    else
+                    {
+                        int offsetInBuf = offset - pos;
+                        int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
+                        final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
+                        content.add(bufView);
+                        length -= limit;
+                        pos+=limit+offsetInBuf;
+                    }
+                }
+
             }
             return content;
         }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Tue Dec 15 15:45:46 2015
@@ -168,13 +168,13 @@ public abstract class AbstractServerMess
     }
 
     @Override
-    final public Collection<QpidByteBuffer> getContent()
+    final public Collection<QpidByteBuffer> getContent(int offset, int length)
     {
         StoredMessage<T> storedMessage = getStoredMessage();
         boolean wasInMemory = storedMessage.isInMemory();
         try
         {
-            return storedMessage.getContent();
+            return storedMessage.getContent(offset, length);
         }
         finally
         {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContentSource.java Tue Dec 15 15:45:46 2015
@@ -28,7 +28,7 @@ import org.apache.qpid.bytebuffer.QpidBy
 
 public interface MessageContentSource
 {
-    Collection<QpidByteBuffer> getContent();
+    Collection<QpidByteBuffer> getContent(int offset, int length);
 
     long getSize();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java Tue Dec 15 15:45:46 2015
@@ -64,7 +64,7 @@ public class InternalMessage extends Abs
     {
         super(msg, null);
         _contentSize = msg.getMetaData().getContentSize();
-        Collection<QpidByteBuffer> bufs = msg.getContent();
+        Collection<QpidByteBuffer> bufs = msg.getContent(0, _contentSize);
 
         try(ObjectInputStream is = new ObjectInputStream(new ByteBufferInputStream(ByteBufferUtils.combine(bufs))))
         {
@@ -223,9 +223,9 @@ public class InternalMessage extends Abs
                     }
 
                     @Override
-                    public Collection<QpidByteBuffer> getContent()
+                    public Collection<QpidByteBuffer> getContent(final int offset, final int length)
                     {
-                        return Collections.singleton(QpidByteBuffer.wrap(bytes));
+                        return Collections.singleton(QpidByteBuffer.wrap(bytes, offset, length));
                     }
 
                     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Dec 15 15:45:46 2015
@@ -2596,7 +2596,8 @@ public abstract class AbstractQueue<X ex
         @Override
         public void write(OutputStream outputStream) throws IOException
         {
-            Collection<QpidByteBuffer> content = _messageReference.getMessage().getContent();
+            ServerMessage message = _messageReference.getMessage();
+            Collection<QpidByteBuffer> content = message.getContent(0, (int) message.getSize());
             try
             {
                 for (QpidByteBuffer b : content)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Tue Dec 15 15:45:46 2015
@@ -1506,13 +1506,48 @@ public abstract class AbstractJDBCMessag
         }
 
         @Override
-        public synchronized Collection<QpidByteBuffer> getContent()
+        public synchronized Collection<QpidByteBuffer> getContent(int offset, int length)
         {
             Collection<QpidByteBuffer> bufs = getContentAsByteBuffer();
             Collection<QpidByteBuffer> content = new ArrayList<>(bufs.size());
+
+            int pos = 0;
             for (QpidByteBuffer buf : bufs)
             {
-                content.add(buf.duplicate());
+                if (length > 0)
+                {
+                    int bufRemaining = buf.remaining();
+                    if (pos + bufRemaining <= offset)
+                    {
+                        pos += bufRemaining;
+                    }
+                    else if (pos >= offset)
+                    {
+                        buf = buf.duplicate();
+                        if (bufRemaining <= length)
+                        {
+                            length -= bufRemaining;
+                        }
+                        else
+                        {
+                            buf.limit(length);
+                            length = 0;
+                        }
+                        content.add(buf);
+                        pos += buf.remaining();
+
+                    }
+                    else
+                    {
+                        int offsetInBuf = offset - pos;
+                        int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
+                        final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
+                        content.add(bufView);
+                        length -= limit;
+                        pos+=limit+offsetInBuf;
+                    }
+                }
+
             }
             return content;
         }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Tue Dec 15 15:45:46 2015
@@ -83,14 +83,15 @@ public class StoredMemoryMessage<T exten
         return this;
     }
 
+
     @Override
-    public Collection<QpidByteBuffer> getContent()
+    public Collection<QpidByteBuffer> getContent(int offset, int length)
     {
         if(_content == null)
         {
             return null;
         }
-        return Collections.singleton(_content.duplicate());
+        return Collections.singleton(_content.view(offset, length));
     }
 
     public T getMetaData()

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java Tue Dec 15 15:45:46 2015
@@ -31,7 +31,8 @@ public interface StoredMessage<M extends
 
     long getMessageNumber();
 
-    Collection<QpidByteBuffer> getContent();
+    Collection<QpidByteBuffer> getContent(int offset, int length);
+
 
     void remove();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Tue Dec 15 15:45:46 2015
@@ -106,7 +106,6 @@ public class NetworkConnectionScheduler
         {
             rerun = false;
             boolean closed = connection.doWork();
-
             if (!closed && connection.getScheduler() == this)
             {
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java Tue Dec 15 15:45:46 2015
@@ -184,7 +184,14 @@ public class NonBlockingNetworkTransport
                 else
                 {
                     LOGGER.error("No Engine available.");
-                }
+                    try
+                    {
+                        socketChannel.close();
+                    }
+                    catch (IOException e)
+                    {
+                        LOGGER.debug("Failed to close socket " + socketChannel, e);
+                    }                }
             }
         }
         catch (IOException e)

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java Tue Dec 15 15:45:46 2015
@@ -100,7 +100,7 @@ public class TrustStoreMessageSourceTest
     {
         final int bodySize = (int) message.getSize();
         byte[] msgContent = new byte[bodySize];
-        final Collection<QpidByteBuffer> allData = message.getStoredMessage().getContent();
+        final Collection<QpidByteBuffer> allData = message.getStoredMessage().getContent(0, bodySize);
         int total = 0;
         for(QpidByteBuffer b : allData)
         {

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java Tue Dec 15 15:45:46 2015
@@ -20,16 +20,11 @@
  */
 package org.apache.qpid.server.store;
 
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.framing.EncodingUtils;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.util.ByteBufferOutputStream;
 
 public class TestMessageMetaData implements StorableMessageMetaData
 {
@@ -96,17 +91,8 @@ public class TestMessageMetaData impleme
     public int writeToBuffer(QpidByteBuffer dest)
     {
         int oldPosition = dest.position();
-        try
-        {
-            DataOutput dataOutputStream = dest.asDataOutput();
-            EncodingUtils.writeLong(dataOutputStream, _messageId);
-            EncodingUtils.writeInteger(dataOutputStream, _contentSize);
-        }
-        catch (IOException e)
-        {
-            // This shouldn't happen as we are not actually using anything that can throw an IO Exception
-            throw new RuntimeException(e);
-        }
+        dest.putLong(_messageId);
+        dest.putInt(_contentSize);
 
         return dest.position() - oldPosition;
     };

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Tue Dec 15 15:45:46 2015
@@ -99,11 +99,12 @@ public class TestMessageMetaDataType imp
         }
 
         @Override
-        public Collection<QpidByteBuffer> getContent()
+        public Collection<QpidByteBuffer> getContent(int offset, int length)
         {
             return null;
         }
 
+
         @Override
         public Object getConnectionReference()
         {

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Tue Dec 15 15:45:46 2015
@@ -109,7 +109,7 @@ class MockServerMessage implements Serve
     }
 
     @Override
-    public Collection<QpidByteBuffer> getContent()
+    public Collection<QpidByteBuffer> getContent(int offset, int length)
     {
         throw new UnsupportedOperationException();
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Tue Dec 15 15:45:46 2015
@@ -78,9 +78,9 @@ public class MessageConverter_Internal_t
                     }
 
                     @Override
-                    public Collection<QpidByteBuffer> getContent()
+                    public Collection<QpidByteBuffer> getContent(final int offset, final int length)
                     {
-                        return Collections.singleton(QpidByteBuffer.wrap(messageContent));
+                        return Collections.singleton(QpidByteBuffer.wrap(messageContent, offset, length));
                     }
 
                     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Tue Dec 15 15:45:46 2015
@@ -85,9 +85,9 @@ public class MessageConverter_v0_10 impl
                     }
 
                     @Override
-                    public Collection<QpidByteBuffer> getContent()
+                    public Collection<QpidByteBuffer> getContent(final int offset, final int length)
                     {
-                        return serverMsg.getContent();
+                        return serverMsg.getContent(offset, length);
                     }
 
                     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java Tue Dec 15 15:45:46 2015
@@ -63,7 +63,7 @@ public class MessageConverter_v0_10_to_I
         final String mimeType = serverMessage.getMessageHeader().getMimeType();
         byte[] data = new byte[(int) serverMessage.getSize()];
         int total = 0;
-        for(QpidByteBuffer b : serverMessage.getContent())
+        for(QpidByteBuffer b : serverMessage.getContent(0, (int) serverMessage.getSize()))
         {
             int len = b.remaining();
             b.get(data, total, len);

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Tue Dec 15 15:45:46 2015
@@ -83,6 +83,6 @@ public class MessageTransferMessage exte
 
     public Collection<QpidByteBuffer> getBody()
     {
-        return  getContent();
+        return  getContent(0, (int) getSize());
     }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Tue Dec 15 15:45:46 2015
@@ -392,14 +392,7 @@ public class AMQPConnection_0_8
             _logger.debug("SEND: " + frame);
         }
 
-        try
-        {
-            frame.writePayload(_sender);
-        }
-        catch (IOException e)
-        {
-            throw new ServerScopedRuntimeException(e);
-        }
+        frame.writePayload(_sender);
 
 
         updateLastWriteTime();

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Tue Dec 15 15:45:46 2015
@@ -91,9 +91,9 @@ public class MessageConverter_Internal_t
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent()
+            public Collection<QpidByteBuffer> getContent(final int offset, final int length)
             {
-                return Collections.singleton(QpidByteBuffer.wrap(messageContent));
+                return Collections.singleton(QpidByteBuffer.wrap(messageContent, offset, length));
             }
 
             @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java Tue Dec 15 15:45:46 2015
@@ -63,7 +63,7 @@ public class MessageConverter_v0_8_to_In
         final String mimeType = serverMessage.getMessageHeader().getMimeType();
         byte[] data = new byte[(int) serverMessage.getSize()];
         int total = 0;
-        for(QpidByteBuffer b : serverMessage.getContent())
+        for(QpidByteBuffer b : serverMessage.getContent(0, (int) serverMessage.getSize()))
         {
             int len = b.remaining();
             b.get(data, total, len);

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Tue Dec 15 15:45:46 2015
@@ -110,32 +110,24 @@ public class MessageMetaData implements
     public int writeToBuffer(final QpidByteBuffer dest)
     {
         int oldPosition = dest.position();
-        try
-        {
-
-            dest.putInt(_contentHeaderBody.getSize());
-            _contentHeaderBody.writePayload(dest);
 
-            EncodingUtils.writeShortStringBytes(dest, _messagePublishInfo.getExchange());
-            EncodingUtils.writeShortStringBytes(dest, _messagePublishInfo.getRoutingKey());
-            byte flags = 0;
-            if(_messagePublishInfo.isMandatory())
-            {
-                flags |= MANDATORY_FLAG;
-            }
-            if(_messagePublishInfo.isImmediate())
-            {
-                flags |= IMMEDIATE_FLAG;
-            }
-            dest.put(flags);
-            dest.putLong(_arrivalTime);
+        dest.putInt(_contentHeaderBody.getSize());
+        _contentHeaderBody.writePayload(dest);
 
+        EncodingUtils.writeShortStringBytes(dest, _messagePublishInfo.getExchange());
+        EncodingUtils.writeShortStringBytes(dest, _messagePublishInfo.getRoutingKey());
+        byte flags = 0;
+        if(_messagePublishInfo.isMandatory())
+        {
+            flags |= MANDATORY_FLAG;
         }
-        catch (IOException e)
+        if(_messagePublishInfo.isImmediate())
         {
-            // This shouldn't happen as we are not actually using anything that can throw an IO Exception
-            throw new ConnectionScopedRuntimeException(e);
+            flags |= IMMEDIATE_FLAG;
         }
+        dest.put(flags);
+        dest.putLong(_arrivalTime);
+
 
         return dest.position()-oldPosition;
     }
@@ -143,55 +135,47 @@ public class MessageMetaData implements
     @Override
     public Collection<QpidByteBuffer> asByteBuffers()
     {
-        try
-        {
-            final List<QpidByteBuffer> buffers = new ArrayList<>();
-            QpidByteBuffer buf = QpidByteBuffer.allocateDirect(4);
-            buffers.add(buf);
-            buf.putInt(0, _contentHeaderBody.getSize());
-            _contentHeaderBody.writePayload(new ByteBufferSender()
+        final List<QpidByteBuffer> buffers = new ArrayList<>();
+        QpidByteBuffer buf = QpidByteBuffer.allocateDirect(4);
+        buffers.add(buf);
+        buf.putInt(0, _contentHeaderBody.getSize());
+        _contentHeaderBody.writePayload(new ByteBufferSender()
+                                        {
+                                            @Override
+                                            public void send(final QpidByteBuffer msg)
+                                            {
+                                                buffers.add(msg.duplicate());
+                                            }
+
+                                            @Override
+                                            public void flush()
+                                            {
+
+                                            }
+
+                                            @Override
+                                            public void close()
                                             {
-                                                @Override
-                                                public void send(final QpidByteBuffer msg)
-                                                {
-                                                    buffers.add(msg.duplicate());
-                                                }
-
-                                                @Override
-                                                public void flush()
-                                                {
-
-                                                }
-
-                                                @Override
-                                                public void close()
-                                                {
-
-                                                }
-                                            });
-            buf = QpidByteBuffer.allocateDirect(9+EncodingUtils.encodedShortStringLength(_messagePublishInfo.getExchange())+EncodingUtils.encodedShortStringLength(_messagePublishInfo.getRoutingKey()));
-            EncodingUtils.writeShortStringBytes(buf, _messagePublishInfo.getExchange());
-            EncodingUtils.writeShortStringBytes(buf, _messagePublishInfo.getRoutingKey());
-            byte flags = 0;
-            if(_messagePublishInfo.isMandatory())
-            {
-                flags |= MANDATORY_FLAG;
-            }
-            if(_messagePublishInfo.isImmediate())
-            {
-                flags |= IMMEDIATE_FLAG;
-            }
-            buf.put(flags);
-            buf.putLong(_arrivalTime);
-            buf.flip();
-            buffers.add(buf);
-            return buffers;
+
+                                            }
+                                        });
+        buf = QpidByteBuffer.allocateDirect(9+EncodingUtils.encodedShortStringLength(_messagePublishInfo.getExchange())+EncodingUtils.encodedShortStringLength(_messagePublishInfo.getRoutingKey()));
+        EncodingUtils.writeShortStringBytes(buf, _messagePublishInfo.getExchange());
+        EncodingUtils.writeShortStringBytes(buf, _messagePublishInfo.getRoutingKey());
+        byte flags = 0;
+        if(_messagePublishInfo.isMandatory())
+        {
+            flags |= MANDATORY_FLAG;
         }
-        catch (IOException e)
+        if(_messagePublishInfo.isImmediate())
         {
-            // This shouldn't happen as we are not actually using anything that can throw an IO Exception
-            throw new ConnectionScopedRuntimeException(e);
+            flags |= IMMEDIATE_FLAG;
         }
+        buf.put(flags);
+        buf.putLong(_arrivalTime);
+        buf.flip();
+        buffers.add(buf);
+        return buffers;
     }
 
     public int getContentSize()

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Tue Dec 15 15:45:46 2015
@@ -20,11 +20,9 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,8 +36,6 @@ import org.apache.qpid.framing.AMQMethod
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicCancelOkBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicGetOkBody;
-import org.apache.qpid.framing.BasicReturnBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.MessagePublishInfo;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
@@ -49,7 +45,6 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.util.ByteBufferUtils;
 import org.apache.qpid.util.GZIPUtils;
 
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
@@ -100,73 +95,64 @@ public class ProtocolOutputConverterImpl
         return writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody);
     }
 
+    interface DisposableMessageContentSource extends MessageContentSource
+    {
+        void dispose();
+    }
+
     private long writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
     {
 
         int bodySize = (int) message.getSize();
         boolean msgCompressed = isCompressed(contentHeaderBody);
-        Collection<QpidByteBuffer> modifiedContentBuffers = null;
+        DisposableMessageContentSource modifiedContent = null;
 
         boolean compressionSupported = _connection.isCompressionSupported();
 
-        Collection<QpidByteBuffer> contentBuffers = message.getContent();
 
         long length;
         if(msgCompressed
            && !compressionSupported
-           && (contentBuffers != null)
-           && (modifiedContentBuffers = inflateIfPossible(contentBuffers)) != null)
+           && (modifiedContent = inflateIfPossible(message)) != null)
         {
             BasicContentHeaderProperties modifiedProps =
                     new BasicContentHeaderProperties(contentHeaderBody.getProperties());
             modifiedProps.setEncoding((String)null);
 
-            length = writeMessageDeliveryModified(modifiedContentBuffers, channelId, deliverBody, modifiedProps);
+            length = writeMessageDeliveryModified(modifiedContent, channelId, deliverBody, modifiedProps);
        }
         else if(!msgCompressed
                 && compressionSupported
                 && contentHeaderBody.getProperties().getEncoding()==null
                 && bodySize > _connection.getMessageCompressionThreshold()
-                && (contentBuffers != null)
-                && (modifiedContentBuffers = deflateIfPossible(contentBuffers)) != null)
+                && (modifiedContent = deflateIfPossible(message)) != null)
         {
             BasicContentHeaderProperties modifiedProps =
                     new BasicContentHeaderProperties(contentHeaderBody.getProperties());
             modifiedProps.setEncoding(GZIP_ENCODING);
 
-            length = writeMessageDeliveryModified(modifiedContentBuffers, channelId, deliverBody, modifiedProps);
+            length = writeMessageDeliveryModified(modifiedContent, channelId, deliverBody, modifiedProps);
         }
         else
         {
-            writeMessageDeliveryUnchanged(contentBuffers, channelId, deliverBody, contentHeaderBody, bodySize);
+            writeMessageDeliveryUnchanged(message, channelId, deliverBody, contentHeaderBody, bodySize);
 
             length = bodySize;
         }
 
-        if (contentBuffers != null)
+        if (modifiedContent != null)
         {
-            for (QpidByteBuffer buf : contentBuffers)
-            {
-                buf.dispose();
-            }
-        }
-
-        if (modifiedContentBuffers != null)
-        {
-            for(QpidByteBuffer buf : modifiedContentBuffers)
-            {
-                buf.dispose();
-            }
+            modifiedContent.dispose();
         }
 
         return length;
     }
 
-    private Collection<QpidByteBuffer> deflateIfPossible(final Collection<QpidByteBuffer> buffers)
+    private DisposableMessageContentSource deflateIfPossible(MessageContentSource source)
     {
         try
         {
-            return QpidByteBuffer.deflate(buffers);
+            return new ModifiedContentSource(QpidByteBuffer.deflate(source.getContent(0, (int) source.getSize())));
         }
         catch (IOException e)
         {
@@ -175,11 +161,12 @@ public class ProtocolOutputConverterImpl
         }
     }
 
-    private Collection<QpidByteBuffer> inflateIfPossible(final Collection<QpidByteBuffer> buffers)
+
+    private DisposableMessageContentSource inflateIfPossible(MessageContentSource source)
     {
         try
         {
-            return QpidByteBuffer.inflate(buffers);
+            return new ModifiedContentSource(QpidByteBuffer.inflate(source.getContent(0, (int) source.getSize())));
         }
         catch (IOException e)
         {
@@ -188,18 +175,19 @@ public class ProtocolOutputConverterImpl
         }
     }
 
-    private int writeMessageDeliveryModified(final Collection<QpidByteBuffer> contentBuffers, final int channelId,
+
+    private int writeMessageDeliveryModified(final MessageContentSource content, final int channelId,
                                              final AMQBody deliverBody,
                                              final BasicContentHeaderProperties modifiedProps)
     {
-        final int bodySize = ByteBufferUtils.remaining(contentBuffers);
+        final int bodySize = (int) content.getSize();
         ContentHeaderBody modifiedHeaderBody = new ContentHeaderBody(modifiedProps, bodySize);
-        writeMessageDeliveryUnchanged(contentBuffers, channelId, deliverBody, modifiedHeaderBody, bodySize);
+        writeMessageDeliveryUnchanged(content, channelId, deliverBody, modifiedHeaderBody, bodySize);
         return bodySize;
     }
 
 
-    private void writeMessageDeliveryUnchanged(Collection<QpidByteBuffer> contentBuffers,
+    private void writeMessageDeliveryUnchanged(MessageContentSource content,
                                                int channelId, AMQBody deliverBody, ContentHeaderBody contentHeaderBody,
                                                int bodySize)
     {
@@ -219,7 +207,7 @@ public class ProtocolOutputConverterImpl
 
             int writtenSize = capacity;
 
-            AMQBody firstContentBody = new MessageContentSourceBody(contentBuffers, 0, capacity);
+            AMQBody firstContentBody = new MessageContentSourceBody(content, 0, capacity);
 
             CompositeAMQBodyBlock
                     compositeBlock =
@@ -229,7 +217,7 @@ public class ProtocolOutputConverterImpl
             while (writtenSize < bodySize)
             {
                 capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
-                AMQBody body = new MessageContentSourceBody(contentBuffers, writtenSize, capacity);
+                AMQBody body = new MessageContentSourceBody(content, writtenSize, capacity);
                 writtenSize += capacity;
 
                 writeFrame(new AMQFrame(channelId, body));
@@ -246,47 +234,12 @@ public class ProtocolOutputConverterImpl
     {
         public static final byte TYPE = 3;
         private final int _length;
-        private final Collection<QpidByteBuffer> _contentBuffers;
+        private final MessageContentSource _content;
         private final int _offset;
 
-        public MessageContentSourceBody(Collection<QpidByteBuffer> bufs, int offset, int length)
+        public MessageContentSourceBody(MessageContentSource content, int offset, int length)
         {
-            int pos = 0;
-            int added = 0;
-
-            List<QpidByteBuffer> content = new ArrayList<>(bufs.size());
-            for(QpidByteBuffer buf : bufs)
-            {
-                if(pos < offset)
-                {
-                    final int remaining = buf.remaining();
-                    if(pos + remaining > offset)
-                    {
-                        buf = buf.view(offset-pos,length);
-
-                        content.add(buf);
-                        added += buf.remaining();
-                    }
-                    pos += remaining;
-
-                }
-                else
-                {
-                    buf = buf.slice();
-                    if(buf.remaining() > (length-added))
-                    {
-                        buf.limit(length-added);
-                    }
-                    content.add(buf);
-                    added += buf.remaining();
-                }
-                if(added >= length)
-                {
-                    break;
-                }
-            }
-
-            _contentBuffers = content;
+            _content = content;
             _offset = offset;
             _length = length;
         }
@@ -301,32 +254,11 @@ public class ProtocolOutputConverterImpl
             return _length;
         }
 
-        public void writePayload(DataOutput buffer) throws IOException
-        {
-            for(QpidByteBuffer buf : _contentBuffers)
-            {
-                if (buf.hasArray())
-                {
-                    buffer.write(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
-                }
-                else
-                {
-
-                    byte[] data = new byte[_length];
-
-                    buf.get(data);
-
-                    buffer.write(data);
-                }
-                buf.dispose();
-            }
-        }
-
         @Override
-        public long writePayload(final ByteBufferSender sender) throws IOException
+        public long writePayload(final ByteBufferSender sender)
         {
-            long size = 0l;
-            for(QpidByteBuffer buf : _contentBuffers)
+            long size = 0L;
+            for(QpidByteBuffer buf : _content.getContent(_offset, _length))
             {
                 size += buf.remaining();
 
@@ -373,8 +305,7 @@ public class ProtocolOutputConverterImpl
         exchangeName = pb.getExchange();
         routingKey = pb.getRoutingKey();
 
-        final AMQBody returnBlock = new EncodedDeliveryBody(deliveryTag, routingKey, exchangeName, consumerTag, isRedelivered);
-        return returnBlock;
+        return new EncodedDeliveryBody(deliveryTag, routingKey, exchangeName, consumerTag, isRedelivered);
     }
 
     private class EncodedDeliveryBody implements AMQBody
@@ -418,16 +349,7 @@ public class ProtocolOutputConverterImpl
             return _underlyingBody.getSize();
         }
 
-        public void writePayload(DataOutput buffer) throws IOException
-        {
-            if(_underlyingBody == null)
-            {
-                _underlyingBody = createAMQBody();
-            }
-            _underlyingBody.writePayload(buffer);
-        }
-
-        public long writePayload(ByteBufferSender sender) throws IOException
+        public long writePayload(ByteBufferSender sender)
         {
             if(_underlyingBody == null)
             {
@@ -461,14 +383,11 @@ public class ProtocolOutputConverterImpl
 
         final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED));
 
-        BasicGetOkBody getOkBody =
-                _connection.getMethodRegistry().createBasicGetOkBody(deliveryTag,
-                                                                          isRedelivered,
-                                                                          exchangeName,
-                                                                          routingKey,
-                                                                          queueSize);
-
-        return getOkBody;
+        return _connection.getMethodRegistry().createBasicGetOkBody(deliveryTag,
+                                                            isRedelivered,
+                                                            exchangeName,
+                                                            routingKey,
+                                                            queueSize);
     }
 
     private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
@@ -476,14 +395,11 @@ public class ProtocolOutputConverterImpl
                                              AMQShortString replyText)
     {
 
-        BasicReturnBody basicReturnBody =
-                _connection.getMethodRegistry().createBasicReturnBody(replyCode,
-                                                                           replyText,
-                                                                           messagePublishInfo.getExchange(),
-                                                                           messagePublishInfo.getRoutingKey());
 
-
-        return basicReturnBody;
+        return _connection.getMethodRegistry().createBasicReturnBody(replyCode,
+                                                             replyText,
+                                                             messagePublishInfo.getExchange(),
+                                                             messagePublishInfo.getRoutingKey());
     }
 
     public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
@@ -533,13 +449,8 @@ public class ProtocolOutputConverterImpl
             return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
         }
 
-        public void writePayload(DataOutput buffer) throws IOException
-        {
-            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
-        }
-
         @Override
-        public long writePayload(final ByteBufferSender sender) throws IOException
+        public long writePayload(final ByteBufferSender sender)
         {
             long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender);
 
@@ -586,13 +497,8 @@ public class ProtocolOutputConverterImpl
             return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
         }
 
-        public void writePayload(DataOutput buffer) throws IOException
-        {
-            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
-        }
-
         @Override
-        public long writePayload(final ByteBufferSender sender) throws IOException
+        public long writePayload(final ByteBufferSender sender)
         {
             long size = (new AMQFrame(_channel, _methodBody)).writePayload(sender);
             size += (new AMQFrame(_channel, _headerBody)).writePayload(sender);
@@ -611,4 +517,81 @@ public class ProtocolOutputConverterImpl
         }
     }
 
+    private static class ModifiedContentSource implements DisposableMessageContentSource
+    {
+        private final Collection<QpidByteBuffer> _buffers;
+        private final int _size;
+
+        public ModifiedContentSource(final Collection<QpidByteBuffer> buffers)
+        {
+            _buffers = buffers;
+            int size = 0;
+            for(QpidByteBuffer buf : buffers)
+            {
+                size += buf.remaining();
+            }
+            _size = size;
+        }
+
+        @Override
+        public void dispose()
+        {
+            for(QpidByteBuffer buffer : _buffers)
+            {
+                buffer.dispose();
+            }
+        }
+
+        @Override
+        public Collection<QpidByteBuffer> getContent(final int offset, int length)
+        {
+            Collection<QpidByteBuffer> content = new ArrayList<>(_buffers.size());
+            int pos = 0;
+            for (QpidByteBuffer buf : _buffers)
+            {
+                if (length > 0)
+                {
+                    int bufRemaining = buf.remaining();
+                    if (pos + bufRemaining <= offset)
+                    {
+                        pos += bufRemaining;
+                    }
+                    else if (pos >= offset)
+                    {
+                        buf = buf.duplicate();
+                        if (bufRemaining <= length)
+                        {
+                            length -= bufRemaining;
+                        }
+                        else
+                        {
+                            buf.limit(length);
+                            length = 0;
+                        }
+                        content.add(buf);
+                        pos+=buf.remaining();
+
+                    }
+                    else
+                    {
+                        int offsetInBuf = offset - pos;
+                        int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
+                        final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
+                        content.add(bufView);
+                        length -= limit;
+                        pos+=limit+offsetInBuf;
+                    }
+                }
+
+            }
+            return content;
+
+        }
+
+        @Override
+        public long getSize()
+        {
+            return _size;
+        }
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java Tue Dec 15 15:45:46 2015
@@ -51,6 +51,7 @@ import org.apache.qpid.amqp_1_0.type.mes
 import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
 import org.apache.qpid.amqp_1_0.type.messaging.Data;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.transport.codec.BBEncoder;
 import org.apache.qpid.typedmessage.TypedBytesContentWriter;
@@ -63,7 +64,7 @@ public class MessageConverter_from_1_0
     public static Object convertBodyToObject(final Message_1_0 serverMessage)
     {
         byte[] data = new byte[(int) serverMessage.getSize()];
-        final Collection<QpidByteBuffer> allData = serverMessage.getStoredMessage().getContent();
+        final Collection<QpidByteBuffer> allData = serverMessage.getContent(0, (int) serverMessage.getSize());
         int offset = 0;
         for(QpidByteBuffer buf : allData)
         {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Tue Dec 15 15:45:46 2015
@@ -209,7 +209,7 @@ public abstract class MessageConverter_t
         final String mimeType = serverMessage.getMessageHeader().getMimeType();
         byte[] data = new byte[(int) serverMessage.getSize()];
         int total = 0;
-        for(QpidByteBuffer b : serverMessage.getContent())
+        for(QpidByteBuffer b : serverMessage.getContent(0, (int) serverMessage.getSize()))
         {
             int len = b.remaining();
             b.get(data, total, len);
@@ -245,9 +245,9 @@ public abstract class MessageConverter_t
                         }
 
                         @Override
-                        public Collection<QpidByteBuffer> getContent()
+                        public Collection<QpidByteBuffer> getContent(int offset, int length)
                         {
-                            return Collections.singleton(allData.duplicate());
+                            return Collections.singleton(allData.view(offset, length));
                         }
 
                         @Override

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Tue Dec 15 15:45:46 2015
@@ -100,7 +100,7 @@ public class Message_1_0 extends Abstrac
 
     public Collection<QpidByteBuffer> getFragments()
     {
-        return getContent();
+        return getContent(0, (int) getSize());
     }
 
 }

Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java Tue Dec 15 15:45:46 2015
@@ -89,9 +89,9 @@ public class MessageConverter_1_0_to_v0_
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent()
+            public Collection<QpidByteBuffer> getContent(final int offset, final int length)
             {
-                return Collections.singleton(QpidByteBuffer.wrap(messageContent));
+                return Collections.singleton(QpidByteBuffer.wrap(messageContent, offset, length));
             }
 
             @Override

Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Tue Dec 15 15:45:46 2015
@@ -193,9 +193,9 @@ public class MessageConverter_0_10_to_0_
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent()
+            public Collection<QpidByteBuffer> getContent(final int offset, final int length)
             {
-                return message.getContent();
+                return message.getContent(offset, length);
             }
 
             @Override

Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Tue Dec 15 15:45:46 2015
@@ -82,9 +82,9 @@ public class MessageConverter_0_8_to_0_1
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent()
+            public Collection<QpidByteBuffer> getContent(final int offset, final int length)
             {
-                return message_0_8.getContent();
+                return message_0_8.getContent(offset, length);
             }
 
             @Override

Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java Tue Dec 15 15:45:46 2015
@@ -91,9 +91,9 @@ public class MessageConverter_1_0_to_v0_
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent()
+            public Collection<QpidByteBuffer> getContent(final int offset, final int length)
             {
-                return Collections.singleton(QpidByteBuffer.wrap(messageContent));
+                return Collections.singleton(QpidByteBuffer.wrap(messageContent, offset, length));
             }
 
             @Override

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java Tue Dec 15 15:45:46 2015
@@ -218,7 +218,8 @@ public class ReportRunner<T>
     private static ReportableMessage convertMessage(QueueEntry entry)
     {
         final MessageInfoImpl messageInfo = new MessageInfoImpl(entry, true);
-        final Collection<QpidByteBuffer> contentBuffers = entry.getMessage().getContent();
+        ServerMessage message = entry.getMessage();
+        final Collection<QpidByteBuffer> contentBuffers = message.getContent(0, (int) message.getSize());
         final ByteBuffer content = ByteBufferUtils.combine(contentBuffers);
         for(QpidByteBuffer buf : contentBuffers)
         {

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java Tue Dec 15 15:45:46 2015
@@ -30,7 +30,6 @@ import java.util.concurrent.CopyOnWriteA
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
 import org.slf4j.Logger;
@@ -47,7 +46,6 @@ import org.apache.qpid.client.state.AMQS
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateWaiter;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.codec.ClientDecoder;
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.AMQBody;
@@ -71,7 +69,6 @@ import org.apache.qpid.transport.Excepti
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.transport.network.TransportActivity;
-import org.apache.qpid.util.BytesDataOutput;
 
 public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, TransportActivity
 {
@@ -142,8 +139,6 @@ public class AMQProtocolHandler implemen
 
     private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
     private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
-    private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
-    private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
 
     private int _queueId = 1;
     private final Object _queueIdLock = new Object();
@@ -557,10 +552,9 @@ public class AMQProtocolHandler implemen
 
     public  synchronized void writeFrame(AMQDataBlock frame, boolean flush)
     {
-        final ByteBuffer buf = asByteBuffer(frame);
         _lastWriteTime = System.currentTimeMillis();
-        _writtenBytes += buf.remaining();
-        _sender.send(QpidByteBuffer.wrap(buf));
+        _writtenBytes += frame.getSize();
+        frame.writePayload(_sender);
         if(flush)
         {
             _sender.flush();
@@ -581,49 +575,6 @@ public class AMQProtocolHandler implemen
 
     }
 
-    private ByteBuffer asByteBuffer(AMQDataBlock block)
-    {
-        final int size = (int) block.getSize();
-
-        final byte[] data;
-
-
-        if(size > REUSABLE_BYTE_BUFFER_CAPACITY)
-        {
-            data= new byte[size];
-        }
-        else
-        {
-
-            data = _reusableBytes;
-        }
-        _reusableDataOutput.setBuffer(data);
-
-        try
-        {
-            block.writePayload(_reusableDataOutput);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-
-        final ByteBuffer buf;
-
-        if(size < REUSABLE_BYTE_BUFFER_CAPACITY)
-        {
-            buf = _reusableByteBuffer;
-            buf.position(0);
-        }
-        else
-        {
-            buf = ByteBuffer.wrap(data);
-        }
-        buf.limit(_reusableDataOutput.length());
-
-        return buf;
-    }
-
 
     /**
      * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Tue Dec 15 15:45:46 2015
@@ -58,7 +58,6 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.Option;
 import org.apache.qpid.transport.codec.BBEncoder;
-import org.apache.qpid.util.BytesDataOutput;
 import org.apache.qpid.util.GZIPUtils;
 import org.apache.qpid.util.Strings;
 
@@ -251,9 +250,6 @@ public class BasicMessageProducer_0_10 e
 
                 final int headerLength = buf.remaining();
                 byte[] unencryptedBytes = new byte[headerLength + (data == null ? 0 : data.remaining())];
-                BytesDataOutput output = new BytesDataOutput(unencryptedBytes);
-
-                output.write(buf.array(), buf.arrayOffset()+buf.position(), buf.remaining());
 
                 if (data != null)
                 {

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Tue Dec 15 15:45:46 2015
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
 import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -59,7 +60,6 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.framing.ExchangeDeclareBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.util.BytesDataOutput;
 import org.apache.qpid.util.GZIPUtils;
 
 public class BasicMessageProducer_0_8 extends BasicMessageProducer
@@ -222,8 +222,8 @@ public class BasicMessageProducer_0_8 ex
 
                 final int headerLength = contentHeaderProperties.getPropertyListSize() + 2;
                 byte[] unencryptedBytes = new byte[headerLength + size];
-                BytesDataOutput output = new BytesDataOutput(unencryptedBytes);
-                output.writeShort((short) (contentHeaderProperties.getPropertyFlags() & 0xffff));
+                QpidByteBuffer output = QpidByteBuffer.wrap(unencryptedBytes);
+                output.putShort((short) (contentHeaderProperties.getPropertyFlags() & 0xffff));
                 contentHeaderProperties.writePropertyListPayload(output);
 
                 if (size != 0)

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Tue Dec 15 15:45:46 2015
@@ -29,6 +29,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
  * the content body/ies.
@@ -40,6 +43,7 @@ public class UnprocessedMessage_0_8 exte
 {
     private long _bytesReceived = 0;
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(UnprocessedMessage_0_8.class);
 
     private AMQShortString _exchange;
     private AMQShortString _routingKey;
@@ -124,6 +128,7 @@ public class UnprocessedMessage_0_8 exte
 
     public boolean isAllBodyDataReceived()
     {
+        LOGGER.debug("Received {} of {} bytes for message body", _bytesReceived, getContentHeader().getBodySize());
         return _bytesReceived == getContentHeader().getBodySize();
     }
 

Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java (original)
+++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java Tue Dec 15 15:45:46 2015
@@ -38,13 +38,13 @@ import javax.crypto.spec.IvParameterSpec
 import javax.crypto.spec.SecretKeySpec;
 import javax.security.auth.x500.X500Principal;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.test.utils.TestSSLConstants;
 import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.util.BytesDataOutput;
 
 public class Encrypted091MessageFactoryTest extends QpidTestCase
 {
@@ -78,8 +78,8 @@ public class Encrypted091MessageFactoryT
 
             final int headerLength = _props.getPropertyListSize() + 2;
             _unencrypted = new byte[headerLength + _data.length];
-            BytesDataOutput output = new BytesDataOutput(_unencrypted);
-            output.writeShort((short) (_props.getPropertyFlags() & 0xffff));
+            QpidByteBuffer output = QpidByteBuffer.wrap(_unencrypted);
+            output.putShort((short) (_props.getPropertyFlags() & 0xffff));
             _props.writePropertyListPayload(output);
 
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1720183&r1=1720182&r2=1720183&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Tue Dec 15 15:45:46 2015
@@ -21,7 +21,6 @@
 package org.apache.qpid.bytebuffer;
 
 import java.io.BufferedOutputStream;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -440,11 +439,6 @@ public final class QpidByteBuffer
     }
 
 
-    public DataOutput asDataOutput()
-    {
-        return new BufferDataOutput();
-    }
-
     public static QpidByteBuffer allocate(int size)
     {
         return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
@@ -910,118 +904,4 @@ public final class QpidByteBuffer
         }
     }
 
-    private final class BufferDataOutput implements DataOutput
-    {
-        public void write(int b)
-        {
-            _buffer.put((byte) b);
-        }
-
-        public void write(byte[] b)
-        {
-            _buffer.put(b);
-        }
-
-
-        public void write(byte[] b, int off, int len)
-        {
-            _buffer.put(b, off, len);
-
-        }
-
-        public void writeBoolean(boolean v)
-        {
-            _buffer.put(v ? (byte) 1 : (byte) 0);
-        }
-
-        public void writeByte(int v)
-        {
-            _buffer.put((byte) v);
-        }
-
-        public void writeShort(int v)
-        {
-            _buffer.putShort((short) v);
-        }
-
-        public void writeChar(int v)
-        {
-            _buffer.put((byte) (v >>> 8));
-            _buffer.put((byte) v);
-        }
-
-        public void writeInt(int v)
-        {
-            _buffer.putInt(v);
-        }
-
-        public void writeLong(long v)
-        {
-            _buffer.putLong(v);
-        }
-
-        public void writeFloat(float v)
-        {
-            writeInt(Float.floatToIntBits(v));
-        }
-
-        public void writeDouble(double v)
-        {
-            writeLong(Double.doubleToLongBits(v));
-        }
-
-        public void writeBytes(String s)
-        {
-            throw new UnsupportedOperationException("writeBytes(String s) not supported");
-        }
-
-        public void writeChars(String s)
-        {
-            int len = s.length();
-            for (int i = 0 ; i < len ; i++)
-            {
-                int v = s.charAt(i);
-                _buffer.put((byte) (v >>> 8));
-                _buffer.put((byte) v);
-            }
-        }
-
-        public void writeUTF(String s)
-        {
-            int strlen = s.length();
-
-            int pos = _buffer.position();
-            _buffer.position(pos + 2);
-
-
-            for (int i = 0; i < strlen; i++)
-            {
-                int c = s.charAt(i);
-                if ((c >= 0x0001) && (c <= 0x007F))
-                {
-                    c = s.charAt(i);
-                    _buffer.put((byte) c);
-
-                }
-                else if (c > 0x07FF)
-                {
-                    _buffer.put((byte) (0xE0 | ((c >> 12) & 0x0F)));
-                    _buffer.put((byte) (0x80 | ((c >> 6) & 0x3F)));
-                    _buffer.put((byte) (0x80 | (c & 0x3F)));
-                }
-                else
-                {
-                    _buffer.put((byte) (0xC0 | ((c >> 6) & 0x1F)));
-                    _buffer.put((byte) (0x80 | (c & 0x3F)));
-                }
-            }
-
-            int len = _buffer.position() - (pos + 2);
-
-            _buffer.put(pos++, (byte) (len >>> 8));
-            _buffer.put(pos, (byte) len);
-        }
-
-    }
-
 }




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org