You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/10/18 15:21:36 UTC

[02/10] qpid-broker-j git commit: QPID-7832: [Java Broker] Refactor store/protocol API using Collection

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
index 9a48b84..6d9c190 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
@@ -23,28 +23,26 @@ package org.apache.qpid.server.protocol.converter.v0_8_v0_10;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.net.URISyntaxException;
-import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.converter.MessageConversionException;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
-import org.apache.qpid.server.protocol.v0_8.AMQShortString;
-import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
-import org.apache.qpid.server.protocol.v0_8.FieldTable;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
 import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
-import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryPriority;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.url.AMQBindingURL;
 
 @PluggableService
@@ -93,7 +91,7 @@ public class MessageConverter_0_8_to_0_10  implements MessageConverter<AMQMessag
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent(final int offset, final int length)
+            public QpidByteBuffer getContent(final int offset, final int length)
             {
                 return message_0_8.getContent(offset, length);
             }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java
index e46d732..645d102 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_10_to_0_8Test.java
@@ -578,8 +578,12 @@ public class PropertyConverter_0_10_to_0_8Test extends QpidTestCase
         if (content != null)
         {
             when(storedMessage.getContentSize()).thenReturn(content.length);
-            when(storedMessage.getContent(0, content.length)).thenReturn(Collections.singleton(QpidByteBuffer.wrap(
-                    content)));
+            when(storedMessage.getContent(0, content.length)).thenReturn(QpidByteBuffer.wrap(content));
+        }
+        else
+        {
+            when(storedMessage.getContentSize()).thenReturn(0);
+            when(storedMessage.getContent(0, 0)).thenReturn(QpidByteBuffer.emptyQpidByteBuffer());
         }
         return new MessageTransferMessage(storedMessage, null);
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_8_to_0_10Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_8_to_0_10Test.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_8_to_0_10Test.java
index 8bb4a47..76bc200 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_8_to_0_10Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/PropertyConverter_0_8_to_0_10Test.java
@@ -24,7 +24,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -472,8 +471,12 @@ public class PropertyConverter_0_8_to_0_10Test extends QpidTestCase
         if (content != null)
         {
             when(storedMessage.getContentSize()).thenReturn(content.length);
-            when(storedMessage.getContent(0, content.length)).thenReturn(Collections.singleton(QpidByteBuffer.wrap(
-                    content)));
+            when(storedMessage.getContent(0, content.length)).thenReturn(QpidByteBuffer.wrap(content));
+        }
+        else
+        {
+            when(storedMessage.getContentSize()).thenReturn(0);
+            when(storedMessage.getContent(0, 0)).thenReturn(QpidByteBuffer.emptyQpidByteBuffer());
         }
 
         return new AMQMessage(storedMessage);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index dcdd430..e8a02fd 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -32,8 +32,6 @@ import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.get
 import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getTtl;
 import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getUserId;
 
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Date;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -117,9 +115,9 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
             }
 
             @Override
-            public Collection<QpidByteBuffer> getContent(final int offset, final int length)
+            public QpidByteBuffer getContent(final int offset, final int length)
             {
-                return Collections.singleton(QpidByteBuffer.wrap(convertedContent, offset, length));
+                return QpidByteBuffer.wrap(convertedContent, offset, length);
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
index df1a786..4fa5734 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java
@@ -30,8 +30,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -39,8 +37,6 @@ import java.util.UUID;
 
 import com.google.common.collect.Lists;
 import org.mockito.ArgumentCaptor;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
@@ -340,12 +336,12 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
         return expected;
     }
 
-    private List<EncodingRetainingSection<?>> getEncodingRetainingSections(final Collection<QpidByteBuffer> content,
+    private List<EncodingRetainingSection<?>> getEncodingRetainingSections(final QpidByteBuffer content,
                                                                            final int expectedNumberOfSections)
             throws Exception
     {
         SectionDecoder sectionDecoder = new SectionDecoderImpl(_typeRegistry.getSectionDecoderRegistry());
-        final List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(new ArrayList<>(content));
+        final List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(content);
         assertEquals("Unexpected number of sections", expectedNumberOfSections, sections.size());
         return sections;
     }
@@ -376,15 +372,8 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
         final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
 
         when(_handle.getContent(offsetCaptor.capture(),
-                                sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>()
-        {
-            @Override
-            public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable
-            {
-                final QpidByteBuffer view = combined.view(offsetCaptor.getValue(), sizeCaptor.getValue());
-                return Collections.singleton(view);
-            }
-        });
+                                sizeCaptor.capture())).then(invocation -> combined.view(offsetCaptor.getValue(),
+                                                                                        sizeCaptor.getValue()));
     }
 
     private Byte getJmsMessageTypeAnnotation(final Message_1_0 convertedMessage)
@@ -466,7 +455,7 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase
     {
         final AMQMessage sourceMessage = getAmqMessage(messageBytes, mimeType);
         final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
 
         List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1);
         EncodingRetainingSection<?> encodingRetainingSection = sections.get(0);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
index c9d17cc..cc2ada2 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java
@@ -27,9 +27,9 @@ import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -67,7 +67,6 @@ import org.apache.qpid.server.typedmessage.mimecontentconverter.JmsMapMessageToM
 import org.apache.qpid.server.typedmessage.mimecontentconverter.JmsStreamMessageToList;
 import org.apache.qpid.server.typedmessage.mimecontentconverter.ListToJmsStreamMessage;
 import org.apache.qpid.server.typedmessage.mimecontentconverter.MapToJmsMapMessage;
-import org.apache.qpid.server.util.ByteBufferUtils;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
@@ -127,7 +126,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
 
         assertEquals("Unexpected mime type",
                      "application/java-object-stream",
@@ -143,7 +142,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
 
         assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
         assertArrayEquals("Unexpected content size",
@@ -235,7 +234,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
 
         assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
 
@@ -265,7 +264,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
         assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertEquals("Unexpected content", expected, new String(getBytes(content), UTF_8));
     }
 
@@ -279,7 +278,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
         assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertEquals("Unexpected content", expected, new String(getBytes(content), UTF_8));
     }
 
@@ -295,7 +294,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
         assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
 
         Map<String, Object> convertedMap = new JmsMapMessageToMap().toObject(getBytes(content));
         assertEquals("Unexpected size", originalMap.size(), convertedMap.size());
@@ -316,7 +315,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
         assertEquals("Unexpected mime type", "amqp/map", convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
 
         Map<String, Object> convertedMap = new AmqpMapToMapConverter().toObject(getBytes(content));
         assertEquals("Unexpected size", originalMap.size(), convertedMap.size());
@@ -352,7 +351,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
         assertEquals("Unexpected mime type", "jms/stream-message", convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
 
         List<Object> convertedList = new JmsStreamMessageToList().toObject(getBytes(content));
         assertEquals("Unexpected size", originalList.size(), convertedList.size());
@@ -371,7 +370,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
         assertEquals("Unexpected mime type", "amqp/list", convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
 
         List<Object> convertedList = new AmqpListToListConverter().toObject(getBytes(content));
         assertEquals("Unexpected size", originalList.size(), convertedList.size());
@@ -424,7 +423,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
         assertEquals("Unexpected mime type", "jms/stream-message", convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertEquals("Unexpected content", expected, new JmsStreamMessageToList().toObject(getBytes(content)));
     }
 
@@ -437,7 +436,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
         assertEquals("Unexpected mime type", "amqp/list", convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
 
         List<Object> convertedList = new AmqpListToListConverter().toObject(getBytes(content));
         assertEquals("Unexpected size", originalList.size(), convertedList.size());
@@ -497,7 +496,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         assertEquals("Unexpected mime type",
                      expectedMimeType, convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertArrayEquals("Unexpected content", data, getBytes(content));
     }
 
@@ -517,7 +516,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         assertEquals("Unexpected mime type",
                      expectedMimeType, convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertArrayEquals("Unexpected content", data, getBytes(content));
     }
 
@@ -550,7 +549,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         assertEquals("Unexpected mime type",
                      expectedMimeType, convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertArrayEquals("Unexpected content", data, getBytes(content));
     }
 
@@ -570,7 +569,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         assertEquals("Unexpected mime type",
                      expectedMimeType, convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertArrayEquals("Unexpected content", data, getBytes(content));
     }
 
@@ -640,7 +639,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
         assertEquals("Unexpected mime type", "amqp/map", convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertArrayEquals("Unexpected content", bytes, getBytes(content));
     }
 
@@ -657,7 +656,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
         assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertArrayEquals("Unexpected content", bytes, getBytes(content));
     }
 
@@ -674,7 +673,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
         assertEquals("Unexpected mime type", "amqp/list", convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertArrayEquals("Unexpected content", bytes, getBytes(content));
     }
 
@@ -691,7 +690,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
         assertEquals("Unexpected mime type", "jms/stream-message", convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertArrayEquals("Unexpected content", bytes, getBytes(content));
     }
 
@@ -708,7 +707,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         assertEquals("Unexpected mime type",
                      "application/java-object-stream",
                      convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertArrayEquals("Unexpected content", expected, getBytes(content));
     }
 
@@ -726,7 +725,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         assertEquals("Unexpected mime type",
                      "application/java-object-stream",
                      convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertArrayEquals("Unexpected content", expected, getBytes(content));
     }
 
@@ -743,7 +742,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         assertEquals("Unexpected mime type",
                      "application/octet-stream",
                      convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertArrayEquals("Unexpected content", expected, getBytes(content));
     }
 
@@ -758,7 +757,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
         assertEquals("Unexpected mime type",
                      "application/octet-stream",
                      convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertArrayEquals("Unexpected content", expected, getBytes(content));
     }
 
@@ -778,7 +777,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
 
         assertEquals("Unexpected mime type",
                      "application/java-object-stream",
@@ -792,7 +791,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
 
         assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
         assertArrayEquals("Unexpected content size",
@@ -891,7 +890,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
 
         assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType());
 
@@ -926,21 +925,21 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class));
 
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
         assertEquals("Unexpected content", expected, new String(getBytes(content), UTF_8));
 
         assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType());
     }
 
-    private byte[] getBytes(final Collection<QpidByteBuffer> content) throws Exception
+    private byte[] getBytes(final QpidByteBuffer content) throws Exception
     {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        for (QpidByteBuffer buf : content)
+        try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+             InputStream contentInputStream = content.asInputStream())
         {
-            ByteStreams.copy(buf.asInputStream(), bos);
-            buf.dispose();
+            ByteStreams.copy(contentInputStream, bos);
+            content.dispose();
+            return bos.toByteArray();
         }
-        return bos.toByteArray();
     }
 
     private Message_1_0 createTestMessage(final EncodingRetainingSection encodingRetainingSection)
@@ -1005,19 +1004,19 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         if (section != null)
         {
-            final QpidByteBuffer combined = QpidByteBuffer.wrap(ByteBufferUtils.combine(section.getEncodedForm()));
+            // TODO this seems to leak QBBs
+            final QpidByteBuffer combined = section.getEncodedForm();
             when(storedMessage.getContentSize()).thenReturn((int) section.getEncodedSize());
             final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class);
             final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class);
 
             when(storedMessage.getContent(offsetCaptor.capture(),
-                                          sizeCaptor.capture())).then(invocation ->
-                                                                      {
-                                                                          final QpidByteBuffer view = combined.view(
-                                                                                  offsetCaptor.getValue(),
-                                                                                  sizeCaptor.getValue());
-                                                                          return Collections.singleton(view);
-                                                                      });
+                                          sizeCaptor.capture())).then(invocation -> combined.view(offsetCaptor.getValue(),
+                                                                                                  sizeCaptor.getValue()));
+        }
+        else
+        {
+            when(storedMessage.getContent(0,0)).thenReturn(QpidByteBuffer.emptyQpidByteBuffer());
         }
         return new Message_1_0(storedMessage);
     }
@@ -1045,7 +1044,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase
 
         assertEquals("Unexpected mime type",
                      expectedMimeType, convertedMessage.getMessageHeader().getMimeType());
-        final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize());
+        final QpidByteBuffer content = convertedMessage.getContent();
         assertArrayEquals("Unexpected content", data, getBytes(content));
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_0_8_to_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_0_8_to_1_0Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_0_8_to_1_0Test.java
index 0520baf..f9443e0 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_0_8_to_1_0Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_0_8_to_1_0Test.java
@@ -472,8 +472,12 @@ public class PropertyConverter_0_8_to_1_0Test extends QpidTestCase
         if (content != null)
         {
             when(storedMessage.getContentSize()).thenReturn(content.length);
-            when(storedMessage.getContent(0, content.length)).thenReturn(Collections.singleton(QpidByteBuffer.wrap(
-                    content)));
+            when(storedMessage.getContent(0, content.length)).thenReturn(QpidByteBuffer.wrap(content));
+        }
+        else
+        {
+            when(storedMessage.getContentSize()).thenReturn(0);
+            when(storedMessage.getContent(0, 0)).thenReturn(QpidByteBuffer.emptyQpidByteBuffer());
         }
 
         return new AMQMessage(storedMessage);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java
index 059e268..6f7e75c 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java
@@ -33,13 +33,12 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.exchange.ExchangeDefaults;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.converter.MessageConversionException;
-import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
-import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo;
 import org.apache.qpid.server.protocol.v0_8.AMQMessage;
 import org.apache.qpid.server.protocol.v0_8.AMQShortString;
 import org.apache.qpid.server.protocol.v0_8.FieldTable;
@@ -922,6 +921,8 @@ public class PropertyConverter_1_0_to_0_8Test extends QpidTestCase
                                           final long arrivalTime)
     {
         final StoredMessage<MessageMetaData_1_0> storedMessage = mock(StoredMessage.class);
+        when(storedMessage.getContentSize()).thenReturn(0);
+        when(storedMessage.getContent(0, 0)).thenReturn(QpidByteBuffer.emptyQpidByteBuffer());
         MessageMetaData_1_0 metaData = new MessageMetaData_1_0(header.createEncodingRetainingSection(),
                                                                deliveryAnnotations.createEncodingRetainingSection(),
                                                                messageAnnotations.createEncodingRetainingSection(),

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java b/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
index e417f9a..d5a4de6 100644
--- a/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
+++ b/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.store.derby;
 
 
+import java.io.InputStream;
 import java.sql.CallableStatement;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -137,9 +138,9 @@ public abstract class AbstractDerbyMessageStore extends AbstractJDBCMessageStore
     }
 
     @Override
-    protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+    protected InputStream getBlobAsInputStream(ResultSet rs, int col) throws SQLException
     {
-        return DerbyUtils.getBlobAsBytes(rs, col);
+        return DerbyUtils.getBlobAsInputStream(rs, col);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
----------------------------------------------------------------------
diff --git a/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java b/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
index 9af478d..62c9889 100644
--- a/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
+++ b/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.derby;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.Writer;
 import java.nio.charset.StandardCharsets;
 import java.sql.Blob;
@@ -151,10 +152,10 @@ public class DerbyUtils
         return new String(bytes, StandardCharsets.UTF_8);
     }
 
-    protected static byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+    protected static InputStream getBlobAsInputStream(ResultSet rs, int col) throws SQLException
     {
         Blob dataAsBlob = rs.getBlob(col);
-        return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
+        return dataAsBlob.getBinaryStream();
     }
 
     public static boolean tableExists(final String tableName, final Connection conn) throws SQLException

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 099029c..9c6cfa9 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.store.jdbc;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.ByteBuffer;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -31,7 +30,6 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -831,11 +829,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             final int bodySize = 1 + metaData.getStorableSize();
             byte[] underlying = new byte[bodySize];
             underlying[0] = (byte) metaData.getType().ordinal();
-            QpidByteBuffer buf = QpidByteBuffer.wrap(underlying);
-            buf.position(1);
-            buf = buf.slice();
-
-            metaData.writeToBuffer(buf);
+            try (QpidByteBuffer buf = QpidByteBuffer.wrap(underlying))
+            {
+                buf.position(1);
+                try (QpidByteBuffer bufSlice = buf.slice())
+                {
+                    metaData.writeToBuffer(buf);
+                }
+            }
             try(ByteArrayInputStream bis = new ByteArrayInputStream(underlying))
             {
                 stmt.setBinaryStream(2, bis, underlying.length);
@@ -938,7 +939,14 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
                     if (rs.next())
                     {
-                        return getStorableMessageMetaData(messageId, getBlobAsBytes(rs, 1));
+                        try (InputStream blobAsInputStream = getBlobAsInputStream(rs, 1))
+                        {
+                            return getStorableMessageMetaData(messageId, blobAsInputStream);
+                        }
+                        catch (IOException e)
+                        {
+                            throw new StoreException("Error reading meta data from the store for message with id " + messageId, e);
+                        }
                     }
                     else
                     {
@@ -949,20 +957,18 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         }
     }
 
-    private StorableMessageMetaData getStorableMessageMetaData(final long messageId, final byte[] blobAsBytes)
+    private StorableMessageMetaData getStorableMessageMetaData(final long messageId, final InputStream stream)
             throws SQLException
     {
-        try(InputStream stream = new ByteArrayInputStream(blobAsBytes))
+        try
         {
             int typeOrdinal = stream.read() & 0xff;
             MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(typeOrdinal);
-            List<QpidByteBuffer> bufs = QpidByteBuffer.asQpidByteBuffers(stream);
-            StorableMessageMetaData metaData = type.createMetaData(bufs);
-            for (final QpidByteBuffer buf : bufs)
+
+            try (QpidByteBuffer buf = QpidByteBuffer.asQpidByteBuffer(stream))
             {
-                buf.dispose();
+                return type.createMetaData(buf);
             }
-            return metaData;
         }
         catch (IOException | RuntimeException e)
         {
@@ -970,41 +976,30 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         }
     }
 
-    protected abstract byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException;
+    protected abstract InputStream getBlobAsInputStream(ResultSet rs, int col) throws SQLException;
 
     private void addContent(final Connection conn, long messageId,
-                            Collection<QpidByteBuffer> contentBody)
+                            QpidByteBuffer contentBody)
     {
         getLogger().debug("Adding content for message {}", messageId);
 
-        int size = 0;
-
-        for(QpidByteBuffer buf : contentBody)
-        {
-            size += buf.remaining();
-        }
-        byte[] data = new byte[size];
-        ByteBuffer dst = ByteBuffer.wrap(data);
-        for(QpidByteBuffer buf : contentBody)
-        {
-            buf.copyTo(dst);
-        }
-
-        try(PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + getMessageContentTableName()
-                                                           + "( message_id, content ) values (?, ?)"))
+        try (PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + getMessageContentTableName()
+                                                            + "( message_id, content ) values (?, ?)");
+             QpidByteBuffer bodyDuplicate = contentBody.duplicate();
+             InputStream inputStream = bodyDuplicate.asInputStream())
         {
             stmt.setLong(1, messageId);
-            stmt.setBinaryStream(2, new ByteArrayInputStream(data), data.length);
+            stmt.setBinaryStream(2, inputStream, contentBody.remaining());
             stmt.executeUpdate();
         }
-        catch (SQLException e)
+        catch (SQLException | IOException e)
         {
             JdbcUtils.closeConnection(conn, getLogger());
             throw new StoreException("Error adding content for message " + messageId + ": " + e.getMessage(), e);
         }
     }
 
-    Collection<QpidByteBuffer> getAllContent(long messageId) throws StoreException
+    QpidByteBuffer getAllContent(long messageId) throws StoreException
     {
         getLogger().debug("Message Id: {} Getting content body", messageId);
 
@@ -1017,18 +1012,10 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
 
             if (rs.next())
             {
-                byte[] data = getBlobAsBytes(rs, 1);
-                int offset = 0;
-                int length = data.length;
-                Collection<QpidByteBuffer> buffers = QpidByteBuffer.allocateDirectCollection(length);
-                for(QpidByteBuffer buf : buffers)
+                try (InputStream blobAsInputStream = getBlobAsInputStream(rs, 1))
                 {
-                    int bufSize = buf.remaining();
-                    buf.put(data, offset, bufSize);
-                    buf.flip();
-                    offset+=bufSize;
+                    return QpidByteBuffer.asQpidByteBuffer(blobAsInputStream);
                 }
-                return buffers;
             }
             else
             {
@@ -1036,7 +1023,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             }
 
         }
-        catch (SQLException e)
+        catch (SQLException | IOException e)
         {
             throw new StoreException("Error retrieving content for message " + messageId + ": " + e.getMessage(), e);
         }
@@ -1262,7 +1249,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
     private static class MessageDataRef<T extends StorableMessageMetaData>
     {
         private volatile T _metaData;
-        private volatile Collection<QpidByteBuffer> _data;
+        private volatile QpidByteBuffer _data;
         private volatile boolean _isHardRef;
 
         private MessageDataRef(final T metaData, boolean isHardRef)
@@ -1270,7 +1257,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             this(metaData, null, isHardRef);
         }
 
-        private MessageDataRef(final T metaData, Collection<QpidByteBuffer> data, boolean isHardRef)
+        private MessageDataRef(final T metaData, QpidByteBuffer data, boolean isHardRef)
         {
             _metaData = metaData;
             _data = data;
@@ -1282,12 +1269,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             return _metaData;
         }
 
-        public Collection<QpidByteBuffer> getData()
+        public QpidByteBuffer getData()
         {
             return _data;
         }
 
-        public void setData(final Collection<QpidByteBuffer> data)
+        public void setData(final QpidByteBuffer data)
         {
             _data = data;
         }
@@ -1322,11 +1309,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
             }
             if(_data != null)
             {
-                for(QpidByteBuffer buf : _data)
-                {
-                    bytesCleared += buf.remaining();
-                    buf.dispose();
-                }
+                bytesCleared += _data.remaining();
+                _data.dispose();
                 _data = null;
             }
             return bytesCleared;
@@ -1405,20 +1389,17 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         @Override
         public synchronized void addContent(QpidByteBuffer src)
         {
-            src = src.slice();
-            Collection<QpidByteBuffer> data = _messageDataRef.getData();
-            if(data == null)
+            try(QpidByteBuffer data = _messageDataRef.getData())
             {
-                _messageDataRef.setData(Collections.singleton(src));
-            }
-            else
-            {
-                List<QpidByteBuffer> newCollection = new ArrayList<>(data.size()+1);
-                newCollection.addAll(data);
-                newCollection.add(src);
-                _messageDataRef.setData(Collections.unmodifiableCollection(newCollection));
+                if(data == null)
+                {
+                    _messageDataRef.setData(src.slice());
+                }
+                else
+                {
+                    _messageDataRef.setData(QpidByteBuffer.concatenate(Arrays.asList(data, src)));
+                }
             }
-
         }
 
         @Override
@@ -1429,11 +1410,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         }
 
         /**
-         * returns QBBs containing the content. The caller must not dispose of them because we keep a reference in _messageDataRef.
+         * returns QBB containing the content. The caller must not dispose of them because we keep a reference in _messageDataRef.
          */
-        private Collection<QpidByteBuffer> getContentAsByteBuffer()
+        private QpidByteBuffer getContentAsByteBuffer()
         {
-            Collection<QpidByteBuffer> data = _messageDataRef == null ? Collections.<QpidByteBuffer>emptyList() : _messageDataRef.getData();
+            QpidByteBuffer data = _messageDataRef == null ? QpidByteBuffer.emptyQpidByteBuffer() : _messageDataRef.getData();
             if(data == null)
             {
                 if(stored())
@@ -1445,57 +1426,16 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                 }
                 else
                 {
-                    data = Collections.emptyList();
+                    data = QpidByteBuffer.emptyQpidByteBuffer();
                 }
             }
             return data;
         }
 
         @Override
-        public synchronized Collection<QpidByteBuffer> getContent(int offset, int length)
+        public synchronized QpidByteBuffer getContent(int offset, int length)
         {
-            Collection<QpidByteBuffer> bufs = getContentAsByteBuffer();
-            Collection<QpidByteBuffer> content = new ArrayList<>(bufs.size());
-
-            int pos = 0;
-            for (QpidByteBuffer buf : bufs)
-            {
-                if (length > 0)
-                {
-                    int bufRemaining = buf.remaining();
-                    if (pos + bufRemaining <= offset)
-                    {
-                        pos += bufRemaining;
-                    }
-                    else if (pos >= offset)
-                    {
-                        buf = buf.duplicate();
-                        if (bufRemaining <= length)
-                        {
-                            length -= bufRemaining;
-                        }
-                        else
-                        {
-                            buf.limit(length);
-                            length = 0;
-                        }
-                        content.add(buf);
-                        pos += buf.remaining();
-
-                    }
-                    else
-                    {
-                        int offsetInBuf = offset - pos;
-                        int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf;
-                        final QpidByteBuffer bufView = buf.view(offsetInBuf, limit);
-                        content.add(bufView);
-                        length -= limit;
-                        pos+=limit+offsetInBuf;
-                    }
-                }
-
-            }
-            return content;
+            return getContentAsByteBuffer().view(offset, length);
         }
 
         @Override
@@ -1517,7 +1457,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                 AbstractJDBCMessageStore.this.storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
                 AbstractJDBCMessageStore.this.addContent(conn, _messageId,
                                                          _messageDataRef.getData() == null
-                                                                ? Collections.<QpidByteBuffer>emptySet()
+                                                                ? QpidByteBuffer.emptyQpidByteBuffer()
                                                                 : _messageDataRef.getData());
 
                 getLogger().debug("Storing message {} to store", _messageId);
@@ -1569,14 +1509,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                 metaData.dispose();
             }
 
-            Collection<QpidByteBuffer> data = _messageDataRef.getData();
-            if(data != null)
+            try (QpidByteBuffer data = _messageDataRef.getData())
             {
-                bytesCleared += getContentSize();
-                _messageDataRef.setData(null);
-                for(QpidByteBuffer buf : data)
+                if (data != null)
                 {
-                    buf.dispose();
+                    bytesCleared += getContentSize();
+                    _messageDataRef.setData(null);
                 }
             }
             _messageDataRef = null;
@@ -1670,9 +1608,11 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                     {
                         if (rs.next())
                         {
-                            byte[] dataAsBytes = getBlobAsBytes(rs, 2);
-                            StorableMessageMetaData metaData = getStorableMessageMetaData(messageId, dataAsBytes);
-                            message = createStoredJDBCMessage(messageId, metaData, true);
+                            try (InputStream blobAsInputStream = getBlobAsInputStream(rs, 2))
+                            {
+                                final StorableMessageMetaData metaData = getStorableMessageMetaData(messageId, blobAsInputStream);
+                                message = createStoredJDBCMessage(messageId, metaData, true);
+                            }
                         }
                         else
                         {
@@ -1682,7 +1622,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                 }
                 return message;
             }
-            catch (SQLException e)
+            catch (SQLException | IOException e)
             {
                 throw new StoreException("Error encountered when visiting messages", e);
             }
@@ -1710,18 +1650,20 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
                         while (rs.next())
                         {
                             long messageId = rs.getLong(1);
-                            byte[] dataAsBytes = getBlobAsBytes(rs, 2);
-                            StorableMessageMetaData metaData = getStorableMessageMetaData(messageId, dataAsBytes);
-                            StoredJDBCMessage message = createStoredJDBCMessage(messageId, metaData, true);
-                            if (!handler.handle(message))
+                            try (InputStream dataAsInputStream = getBlobAsInputStream(rs, 2))
                             {
-                                break;
+                                StorableMessageMetaData metaData = getStorableMessageMetaData(messageId, dataAsInputStream);
+                                StoredJDBCMessage message = createStoredJDBCMessage(messageId, metaData, true);
+                                if (!handler.handle(message))
+                                {
+                                    break;
+                                }
                             }
                         }
                     }
                 }
             }
-            catch (SQLException e)
+            catch (SQLException | IOException e)
             {
                 throw new StoreException("Error encountered when visiting messages", e);
             }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
index 3b77b10..efbedb3 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
@@ -22,7 +22,9 @@ package org.apache.qpid.server.store.jdbc;
 import static org.apache.qpid.server.store.jdbc.AbstractJDBCConfigurationStore.State.CLOSED;
 import static org.apache.qpid.server.store.jdbc.AbstractJDBCConfigurationStore.State.CONFIGURED;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.sql.Blob;
 import java.sql.Connection;
@@ -204,17 +206,16 @@ public class GenericJDBCConfigurationStore extends AbstractJDBCConfigurationStor
 
     }
 
-    protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+    protected InputStream getBlobAsInputStream(ResultSet rs, int col) throws SQLException
     {
         if(_useBytesMethodsForBlob)
         {
-            return rs.getBytes(col);
+            return new ByteArrayInputStream(rs.getBytes(col));
         }
         else
         {
             Blob dataAsBlob = rs.getBlob(col);
-            return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
-
+            return dataAsBlob.getBinaryStream();
         }
     }
 
@@ -298,9 +299,9 @@ public class GenericJDBCConfigurationStore extends AbstractJDBCConfigurationStor
         }
 
         @Override
-        protected byte[] getBlobAsBytes(final ResultSet rs, final int col) throws SQLException
+        protected InputStream getBlobAsInputStream(final ResultSet rs, final int col) throws SQLException
         {
-            return GenericJDBCConfigurationStore.this.getBlobAsBytes(rs, col);
+            return GenericJDBCConfigurationStore.this.getBlobAsInputStream(rs, col);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
index 1b3c827..551362d 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
@@ -21,7 +21,9 @@
 package org.apache.qpid.server.store.jdbc;
 
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.InputStream;
 import java.sql.Blob;
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -156,17 +158,16 @@ public class GenericJDBCMessageStore extends GenericAbstractJDBCMessageStore
     }
 
     @Override
-    protected byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException
+    protected InputStream getBlobAsInputStream(ResultSet rs, int col) throws SQLException
     {
         if(_useBytesMethodsForBlob)
         {
-            return rs.getBytes(col);
+            return new ByteArrayInputStream(rs.getBytes(col));
         }
         else
         {
             Blob dataAsBlob = rs.getBlob(col);
-            return dataAsBlob.getBytes(1,(int) dataAsBlob.length());
-
+            return dataAsBlob.getBinaryStream();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index f81648b..5dbbe67 100644
--- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -34,9 +34,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
@@ -856,69 +856,18 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
 
     private static class QBBTrackingThreadPool extends QueuedThreadPool
     {
-        private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
+        private final ThreadFactory _threadFactory;
 
         public QBBTrackingThreadPool(@Name("maxThreads") final int maxThreads, @Name("minThreads") final int minThreads)
         {
             super(maxThreads, minThreads);
-        }
-
-        @Override
-        protected void doStop() throws Exception
-        {
-            try
-            {
-                super.doStop();
-            }
-            finally
-            {
-                for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
-                {
-                    qpidByteBuffer.dispose();
-                }
-                _cachedBufferMap.clear();
-            }
+            _threadFactory = QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(r -> QBBTrackingThreadPool.super.newThread(r));
         }
 
         @Override
         protected Thread newThread(final Runnable runnable)
         {
-            return super.newThread(() ->
-                {
-                    try
-                    {
-                        runnable.run();
-                    }
-                    finally
-                    {
-                        QpidByteBuffer qbb = _cachedBufferMap.remove(Thread.currentThread());
-                        if (qbb != null)
-                        {
-                            qbb.dispose();
-                        }
-                    }
-            });
-        }
-
-        @Override
-        protected void runJob(final Runnable job)
-        {
-            try
-            {
-                super.runJob(job);
-            }
-            finally
-            {
-                final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
-                if (cachedThreadLocalBuffer != null)
-                {
-                    _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
-                }
-                else
-                {
-                    _cachedBufferMap.remove(Thread.currentThread());
-                }
-            }
+            return _threadFactory.newThread(runnable);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
index 3242f01..a33a3d6 100644
--- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
+++ b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/report/ReportRunner.java
@@ -45,7 +45,6 @@ import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.QueueEntryVisitor;
-import org.apache.qpid.server.util.ByteBufferUtils;
 
 public class ReportRunner<T>
 {
@@ -216,11 +215,11 @@ public class ReportRunner<T>
     {
         final MessageInfoImpl messageInfo = new MessageInfoImpl(entry, true);
         ServerMessage message = entry.getMessage();
-        final Collection<QpidByteBuffer> contentBuffers = message.getContent(0, (int) message.getSize());
-        final ByteBuffer content = ByteBufferUtils.combine(contentBuffers);
-        for(QpidByteBuffer buf : contentBuffers)
+        byte[] content;
+        try (QpidByteBuffer contentBuffer = message.getContent())
         {
-            buf.dispose();
+            content = new byte[contentBuffer.remaining()];
+            contentBuffer.get(content);
         }
 
         return new ReportableMessage()
@@ -240,7 +239,7 @@ public class ReportRunner<T>
             @Override
             public ByteBuffer getContent()
             {
-                return content.asReadOnlyBuffer();
+                return ByteBuffer.wrap(content).asReadOnlyBuffer();
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java
index a1c3f34..7c85f39 100644
--- a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java
+++ b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/report/ReportRunnerTest.java
@@ -35,6 +35,7 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
@@ -67,6 +68,7 @@ public class ReportRunnerTest extends QpidTestCase
         final ServerMessage message = mock(ServerMessage.class);
         final AMQMessageHeader header = mock(AMQMessageHeader.class);
         when(message.getMessageHeader()).thenReturn(header);
+        when(message.getContent()).thenReturn(QpidByteBuffer.emptyQpidByteBuffer());
 
         return message;
     }
@@ -158,6 +160,7 @@ public class ReportRunnerTest extends QpidTestCase
             }
         });
         when(header.getHeaderNames()).thenReturn(props.keySet());
+        when(message.getContent()).thenReturn(QpidByteBuffer.emptyQpidByteBuffer());
         return message;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
----------------------------------------------------------------------
diff --git a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index 76b3fbd..8394ca8 100644
--- a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -30,11 +30,10 @@ import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.ssl.SSLContext;
@@ -130,68 +129,7 @@ class WebSocketProvider implements AcceptingTransport
     {
         _idleTimeoutChecker.start();
 
-        _server = new Server(new QueuedThreadPool()
-        {
-            private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
-
-            @Override
-            protected void doStop() throws Exception
-            {
-                try
-                {
-                    super.doStop();
-                }
-                finally
-                {
-                    for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
-                    {
-                        qpidByteBuffer.dispose();
-                    }
-                    _cachedBufferMap.clear();
-                }
-            }
-
-            @Override
-            protected Thread newThread(final Runnable runnable)
-            {
-                return super.newThread(() ->
-                                       {
-                                           try
-                                           {
-                                               runnable.run();
-                                           }
-                                           finally
-                                           {
-                                               QpidByteBuffer qbb = _cachedBufferMap.remove(Thread.currentThread());
-                                               if (qbb != null)
-                                               {
-                                                   qbb.dispose();
-                                               }
-                                           }
-                                       });
-            }
-
-            @Override
-            protected void runJob(final Runnable job)
-            {
-                try
-                {
-                    super.runJob(job);
-                }
-                finally
-                {
-                    final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
-                    if (cachedThreadLocalBuffer != null)
-                    {
-                        _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
-                    }
-                    else
-                    {
-                        _cachedBufferMap.remove(Thread.currentThread());
-                    }
-                }
-            }
-        });
+        _server = new Server(new QBBTrackingThreadPool());
 
         final ServerConnector connector;
         HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory();
@@ -357,6 +295,17 @@ class WebSocketProvider implements AcceptingTransport
                 ((ServerConnector) server.getConnectors()[0]).getLocalPort();
     }
 
+    private static class QBBTrackingThreadPool extends QueuedThreadPool
+    {
+        private final ThreadFactory _threadFactory = QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(r -> QBBTrackingThreadPool.super.newThread(r));
+
+        @Override
+        protected Thread newThread(final Runnable runnable)
+        {
+            return _threadFactory.newThread(runnable);
+        }
+    }
+
     @WebSocket
     public class AmqpWebSocket
     {
@@ -442,13 +391,14 @@ class WebSocketProvider implements AcceptingTransport
 
         private void restoreApplicationBufferForWrite()
         {
-            QpidByteBuffer oldNetInputBuffer = _netInputBuffer;
-            int unprocessedDataLength = _netInputBuffer.remaining();
+            try (QpidByteBuffer oldNetInputBuffer = _netInputBuffer)
+            {
+                int unprocessedDataLength = _netInputBuffer.remaining();
 
-            _netInputBuffer.limit(_netInputBuffer.capacity());
-            _netInputBuffer = oldNetInputBuffer.slice();
-            _netInputBuffer.limit(unprocessedDataLength);
-            oldNetInputBuffer.dispose();
+                _netInputBuffer.limit(_netInputBuffer.capacity());
+                _netInputBuffer = oldNetInputBuffer.slice();
+                _netInputBuffer.limit(unprocessedDataLength);
+            }
             if (_netInputBuffer.limit() != _netInputBuffer.capacity())
             {
                 _netInputBuffer.position(_netInputBuffer.limit());
@@ -456,22 +406,23 @@ class WebSocketProvider implements AcceptingTransport
             }
             else
             {
-                QpidByteBuffer currentBuffer = _netInputBuffer;
-                int newBufSize;
-
-                if (currentBuffer.capacity() < _broker.getNetworkBufferSize())
+                try (QpidByteBuffer currentBuffer = _netInputBuffer)
                 {
-                    newBufSize = _broker.getNetworkBufferSize();
-                }
-                else
-                {
-                    newBufSize = currentBuffer.capacity() + _broker.getNetworkBufferSize();
-                    reportUnexpectedByteBufferSizeUsage();
-                }
+                    int newBufSize;
 
-                _netInputBuffer = QpidByteBuffer.allocateDirect(newBufSize);
-                _netInputBuffer.put(currentBuffer);
-                currentBuffer.dispose();
+                    if (currentBuffer.capacity() < _broker.getNetworkBufferSize())
+                    {
+                        newBufSize = _broker.getNetworkBufferSize();
+                    }
+                    else
+                    {
+                        newBufSize = currentBuffer.capacity() + _broker.getNetworkBufferSize();
+                        reportUnexpectedByteBufferSizeUsage();
+                    }
+
+                    _netInputBuffer = QpidByteBuffer.allocateDirect(newBufSize);
+                    _netInputBuffer.put(currentBuffer);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index 4cb0ccb..4d53751 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -26,7 +26,6 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 
 import java.net.InetSocketAddress;
-import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -34,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -158,11 +158,32 @@ public class FrameTransport implements AutoCloseable
     {
         Preconditions.checkState(_channel != null, "Not connected");
         ChannelPromise promise = _channel.newPromise();
-        final List<QpidByteBuffer> payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null;
-        TransportFrame transportFrame = new TransportFrame(channel.shortValue(), frameBody, payload);
-        _channel.write(transportFrame, promise);
-        _channel.flush();
-        return JdkFutureAdapters.listenInPoolThread(promise);
+        final TransportFrame transportFrame;
+        try (QpidByteBuffer payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null)
+        {
+            final QpidByteBuffer duplicate;
+            if (payload == null)
+            {
+                duplicate = null;
+            }
+            else
+            {
+                duplicate = payload.duplicate();
+            }
+            transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
+            _channel.write(transportFrame, promise);
+            _channel.flush();
+            final ListenableFuture<Void> listenableFuture = JdkFutureAdapters.listenInPoolThread(promise);
+            if (frameBody instanceof Transfer)
+            {
+                listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
+            }
+            if (duplicate != null)
+            {
+                listenableFuture.addListener(() -> duplicate.dispose(), MoreExecutors.directExecutor());
+            }
+            return listenableFuture;
+        }
     }
 
     public ListenableFuture<Void> sendPerformative(final SaslFrameBody frameBody) throws Exception

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 0b91273..52518ab 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -761,7 +761,7 @@ public class Interaction
         return this;
     }
 
-    public Interaction transferPayload(final List<QpidByteBuffer> payload)
+    public Interaction transferPayload(final QpidByteBuffer payload)
     {
         _transfer.setPayload(payload);
         return this;
@@ -777,12 +777,10 @@ public class Interaction
     {
         AmqpValue amqpValue = new AmqpValue(payload);
         final AmqpValueSection section = amqpValue.createEncodingRetainingSection();
-        final List<QpidByteBuffer> encodedForm = section.getEncodedForm();
-        transfer.setPayload(encodedForm);
-        section.dispose();
-        for (QpidByteBuffer qbb : encodedForm)
+        try (QpidByteBuffer encodedForm = section.getEncodedForm())
         {
-            qbb.dispose();
+            transfer.setPayload(encodedForm);
+            section.dispose();
         }
     }
 
@@ -806,15 +804,12 @@ public class Interaction
         transferCopy.setResume(transfer.getResume());
         transferCopy.setAborted(transfer.getAborted());
         transferCopy.setBatchable(transfer.getBatchable());
-        final List<QpidByteBuffer> payload = transfer.getPayload();
-        if (payload != null)
+        try (QpidByteBuffer payload = transfer.getPayload())
         {
-            final List<QpidByteBuffer> payloadCopy = new ArrayList<>(payload.size());
-            for (QpidByteBuffer qpidByteBuffer : payload)
+            if (payload != null)
             {
-                payloadCopy.add(qpidByteBuffer.duplicate());
+                transferCopy.setPayload(payload);
             }
-            transferCopy.setPayload(payloadCopy);
         }
         return transferCopy;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
index 8656b9b..1a513f7 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
@@ -32,7 +32,6 @@ import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
@@ -72,14 +71,20 @@ public class MessageDecoder
         {
             throw new IllegalStateException("The section fragments have already been parsed");
         }
-        _fragments.addAll(transfer.getPayload());
+        _fragments.add(transfer.getPayload());
     }
 
     public void parse() throws AmqpErrorException
     {
         if (!_parsed)
         {
-            List<EncodingRetainingSection<?>> sections = _sectionDecoder.parseAll(_fragments);
+            List<EncodingRetainingSection<?>> sections;
+            try (QpidByteBuffer combined = QpidByteBuffer.concatenate(_fragments))
+            {
+                sections = _sectionDecoder.parseAll(combined);
+            }
+            _fragments.forEach(QpidByteBuffer::dispose);
+
             Iterator<EncodingRetainingSection<?>> iter = sections.iterator();
             EncodingRetainingSection<?> s = iter.hasNext() ? iter.next() : null;
             if (s instanceof HeaderSection)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
index 0c77ab1..37e327e 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
@@ -44,12 +44,12 @@ public class MessageEncoder
         _header = header;
     }
 
-    public List<QpidByteBuffer> getPayload()
+    public QpidByteBuffer getPayload()
     {
         List<QpidByteBuffer> payload = new ArrayList<>();
         if (_header != null)
         {
-            payload.addAll(_header.createEncodingRetainingSection().getEncodedForm());
+            payload.add(_header.createEncodingRetainingSection().getEncodedForm());
         }
 
         if (_data.isEmpty())
@@ -70,10 +70,12 @@ public class MessageEncoder
 
         for (EncodingRetainingSection<?> section: dataSections)
         {
-            payload.addAll(section.getEncodedForm());
+            payload.add(section.getEncodedForm());
             section.dispose();
         }
 
-        return payload;
+        QpidByteBuffer combined = QpidByteBuffer.concatenate(payload);
+        payload.forEach(QpidByteBuffer::dispose);
+        return combined;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
index dbd4cd6..68f4322 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
@@ -59,8 +59,10 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
                 @Override
                 public void send(final QpidByteBuffer msg)
                 {
+                    byte[] data = new byte[msg.remaining()];
+                    msg.get(data);
                     ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
-                    buffer.writeBytes(msg.asByteBuffer());
+                    buffer.writeBytes(data);
                     try
                     {
                         OutputHandler.super.write(ctx, buffer, promise);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org