You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/08/07 02:28:20 UTC

svn commit: r1694594 [4/5] - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ bdbs...

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Fri Aug  7 00:28:17 2015
@@ -50,6 +50,7 @@ import org.apache.qpid.amqp_1_0.transpor
 import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.FrameBody;
 import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.protocol.AMQConstant;
@@ -166,7 +167,7 @@ public class AMQPConnection_1_0 extends
 
         _frameWriter =  new FrameWriter(_endpoint.getDescribedTypeRegistry());
 
-        getSender().send(headerResponse.duplicate());
+        getSender().send(QpidByteBuffer.wrap(headerResponse.duplicate()));
         getSender().flush();
 
         if(useSASL)
@@ -254,7 +255,7 @@ public class AMQPConnection_1_0 extends
             {
                 if (_endpoint.isAuthenticated())
                 {
-                    getSender().send(AMQP_LAYER_HEADER.duplicate());
+                    getSender().send(QpidByteBuffer.wrap(AMQP_LAYER_HEADER.duplicate()));
                     getSender().flush();
                 }
                 else
@@ -343,14 +344,14 @@ public class AMQPConnection_1_0 extends
     private final Logger RAW_LOGGER = LoggerFactory.getLogger("RAW");
 
 
-    public synchronized void received(final ByteBuffer msg)
+    public synchronized void received(final QpidByteBuffer msg)
     {
         try
         {
             updateLastReadTime();
             if(RAW_LOGGER.isDebugEnabled())
             {
-                ByteBuffer dup = msg.duplicate();
+                QpidByteBuffer dup = msg.duplicate();
                 byte[] data = new byte[dup.remaining()];
                 dup.get(data);
                 Binary bin = new Binary(data);
@@ -531,7 +532,7 @@ public class AMQPConnection_1_0 extends
 
             _frameWriter.setValue(amqFrame);
 
-            ByteBuffer dup = ByteBuffer.allocateDirect(_endpoint.getMaxFrameSize());
+            QpidByteBuffer dup = QpidByteBuffer.allocateDirect(_endpoint.getMaxFrameSize());
 
             int size = _frameWriter.writeToBuffer(dup);
             if (size > _endpoint.getMaxFrameSize())
@@ -543,7 +544,7 @@ public class AMQPConnection_1_0 extends
 
             if (RAW_LOGGER.isDebugEnabled())
             {
-                ByteBuffer dup2 = dup.duplicate();
+                QpidByteBuffer dup2 = dup.duplicate();
                 byte[] data = new byte[dup2.remaining()];
                 dup2.get(data);
                 Binary bin = new Binary(data);

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Aug  7 00:28:17 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -43,6 +44,7 @@ import org.apache.qpid.amqp_1_0.type.mes
 import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
 import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
 import org.apache.qpid.server.consumer.ConsumerImpl;
@@ -138,24 +140,23 @@ class ConsumerTarget_1_0 extends Abstrac
         Transfer transfer = new Transfer();
         //TODO
 
-
-        List<ByteBuffer> fragments = message.getFragments();
-        ByteBuffer payload;
+        Collection<QpidByteBuffer> fragments = message.getFragments();
+        QpidByteBuffer payload;
         if(fragments.size() == 1)
         {
-            payload = fragments.get(0);
+            payload = fragments.iterator().next();
         }
         else
         {
             int size = 0;
-            for(ByteBuffer fragment : fragments)
+            for(QpidByteBuffer fragment : fragments)
             {
                 size += fragment.remaining();
             }
 
-            payload = ByteBuffer.allocateDirect(size);
+            payload = QpidByteBuffer.allocateDirect(size);
 
-            for(ByteBuffer fragment : fragments)
+            for(QpidByteBuffer fragment : fragments)
             {
                 payload.put(fragment.duplicate());
             }
@@ -171,7 +172,6 @@ class ConsumerTarget_1_0 extends Abstrac
             Header oldHeader = null;
             try
             {
-                ByteBuffer encodedBuf = payload.duplicate();
                 Object value = valueHandler.parse(payload);
                 if(value instanceof Header)
                 {
@@ -200,8 +200,8 @@ class ConsumerTarget_1_0 extends Abstrac
             _sectionEncoder.encodeObject(header);
             Binary encodedHeader = _sectionEncoder.getEncoding();
 
-            ByteBuffer oldPayload = payload;
-            payload = ByteBuffer.allocateDirect(oldPayload.remaining() + encodedHeader.getLength());
+            QpidByteBuffer oldPayload = payload;
+            payload = QpidByteBuffer.allocateDirect(oldPayload.remaining() + encodedHeader.getLength());
             payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength());
             payload.put(oldPayload);
             payload.flip();

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java Fri Aug  7 00:28:17 2015
@@ -49,6 +49,7 @@ import org.apache.qpid.amqp_1_0.type.Uns
 import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence;
 import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
 import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.transport.codec.BBEncoder;
 import org.apache.qpid.typedmessage.TypedBytesContentWriter;
@@ -68,7 +69,7 @@ public class MessageConverter_from_1_0
         Object bodyObject;
         try
         {
-            List<Section> sections = sectionDecoder.parseAll(ByteBuffer.wrap(data));
+            List<Section> sections = sectionDecoder.parseAll(QpidByteBuffer.wrap(data));
             ListIterator<Section> iterator = sections.listIterator();
             Section previousSection = null;
             while(iterator.hasNext())

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Fri Aug  7 00:28:17 2015
@@ -38,6 +38,7 @@ import org.apache.qpid.amqp_1_0.type.Sym
 import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
 import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.store.StoredMessage;
@@ -257,7 +258,7 @@ public abstract class MessageConverter_t
                         }
 
                         @Override
-                        public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+                        public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
                         {
                             ByteBuffer buf = allData.duplicate();
                             buf.position(offsetInMessage);
@@ -266,7 +267,7 @@ public abstract class MessageConverter_t
                             {
                                 buf.limit(size);
                             }
-                            return Collections.singleton(buf);
+                            return Collections.singleton(QpidByteBuffer.wrap(buf));
                         }
 
                         @Override

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Fri Aug  7 00:28:17 2015
@@ -49,6 +49,7 @@ import org.apache.qpid.amqp_1_0.type.mes
 import org.apache.qpid.amqp_1_0.type.messaging.Header;
 import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
 import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
 import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -70,9 +71,9 @@ public class MessageMetaData_1_0 impleme
     private Map _appProperties;
     private Map _footer;
 
-    private List<ByteBuffer> _encodedSections = new ArrayList<ByteBuffer>(3);
+    private List<QpidByteBuffer> _encodedSections = new ArrayList<>(3);
 
-    private volatile ByteBuffer _encoded;
+    private volatile QpidByteBuffer _encoded;
     private MessageHeader_1_0 _messageHeader;
 
 
@@ -92,29 +93,29 @@ public class MessageMetaData_1_0 impleme
         return _header;
     }
 
-    private static ArrayList<ByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder)
+    private static ArrayList<QpidByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder)
     {
-        ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(sections.size());
+        ArrayList<QpidByteBuffer> encodedSections = new ArrayList<QpidByteBuffer>(sections.size());
         for(Section section : sections)
         {
             encoder.encodeObject(section);
-            encodedSections.add(encoder.getEncoding().asByteBuffer());
+            encodedSections.add(QpidByteBuffer.wrap(encoder.getEncoding().asByteBuffer()));
             encoder.reset();
         }
         return encodedSections;
     }
 
-    public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder)
+    public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder)
     {
-        this(fragments, decoder, new ArrayList<ByteBuffer>(3));
+        this(fragments, decoder, new ArrayList<QpidByteBuffer>(3));
     }
 
-    public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder, List<ByteBuffer> immutableSections)
+    public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder, List<QpidByteBuffer> immutableSections)
     {
         this(constructSections(fragments, decoder,immutableSections), immutableSections);
     }
 
-    private MessageMetaData_1_0(List<Section> sections, List<ByteBuffer> encodedSections)
+    private MessageMetaData_1_0(List<Section> sections, List<QpidByteBuffer> encodedSections)
     {
         _encodedSections = encodedSections;
 
@@ -161,11 +162,11 @@ public class MessageMetaData_1_0 impleme
 
     }
 
-    private static List<Section> constructSections(final ByteBuffer[] fragments, final SectionDecoder decoder, List<ByteBuffer> encodedSections)
+    private static List<Section> constructSections(final QpidByteBuffer[] fragments, final SectionDecoder decoder, List<QpidByteBuffer> encodedSections)
     {
         List<Section> sections = new ArrayList<Section>(3);
 
-        ByteBuffer src;
+        QpidByteBuffer src;
         if(fragments.length == 1)
         {
             src = fragments[0].duplicate();
@@ -173,12 +174,12 @@ public class MessageMetaData_1_0 impleme
         else
         {
             int size = 0;
-            for(ByteBuffer buf : fragments)
+            for(QpidByteBuffer buf : fragments)
             {
                 size += buf.remaining();
             }
-            src = ByteBuffer.allocateDirect(size);
-            for(ByteBuffer buf : fragments)
+            src = QpidByteBuffer.allocateDirect(size);
+            for(QpidByteBuffer buf : fragments)
             {
                 src.put(buf.duplicate());
             }
@@ -274,7 +275,7 @@ public class MessageMetaData_1_0 impleme
 
 
             int pos = 0;
-            for(ByteBuffer buf : fragments)
+            for(QpidByteBuffer buf : fragments)
             {
 /*
                 if(pos < startBarePos)
@@ -315,7 +316,7 @@ public class MessageMetaData_1_0 impleme
     {
         int size = 0;
 
-        for(ByteBuffer bin : _encodedSections)
+        for(QpidByteBuffer bin : _encodedSections)
         {
             size += bin.limit();
         }
@@ -323,11 +324,11 @@ public class MessageMetaData_1_0 impleme
         return size;
     }
 
-    private ByteBuffer encodeAsBuffer()
+    private QpidByteBuffer encodeAsBuffer()
     {
-        ByteBuffer buf = ByteBuffer.allocateDirect(getStorableSize());
+        QpidByteBuffer buf = QpidByteBuffer.allocateDirect(getStorableSize());
 
-        for(ByteBuffer bin : _encodedSections)
+        for(QpidByteBuffer bin : _encodedSections)
         {
             buf.put(bin.duplicate());
         }
@@ -337,7 +338,7 @@ public class MessageMetaData_1_0 impleme
 
     public int writeToBuffer(ByteBuffer dest)
     {
-        ByteBuffer buf = _encoded;
+        QpidByteBuffer buf = _encoded;
 
         if(buf == null)
         {
@@ -353,13 +354,13 @@ public class MessageMetaData_1_0 impleme
         {
             buf.limit(dest.remaining());
         }
-        dest.put(buf);
+        buf.get(dest);
         return buf.limit();
     }
 
     public int getContentSize()
     {
-        ByteBuffer buf = _encoded;
+        QpidByteBuffer buf = _encoded;
 
         if(buf == null)
         {
@@ -399,17 +400,18 @@ public class MessageMetaData_1_0 impleme
             ValueHandler valueHandler = new ValueHandler(_typeRegistry);
 
             ArrayList<Section> sections = new ArrayList<Section>(3);
-            ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(3);
+            ArrayList<QpidByteBuffer> encodedSections = new ArrayList<>(3);
 
             while(buf.hasRemaining())
             {
                 try
                 {
                     ByteBuffer encodedBuf = buf.duplicate();
-                    Object parse = valueHandler.parse(buf);
+                    final QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(buf);
+                    Object parse = valueHandler.parse(qpidByteBuffer);
                     sections.add((Section) parse);
                     encodedBuf.limit(buf.position());
-                    encodedSections.add(encodedBuf);
+                    encodedSections.add(QpidByteBuffer.wrap(encodedBuf));
 
                 }
                 catch (AmqpErrorException e)

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Fri Aug  7 00:28:17 2015
@@ -24,8 +24,10 @@ package org.apache.qpid.server.protocol.
 import java.lang.ref.SoftReference;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.util.ByteBufferUtils;
@@ -33,7 +35,7 @@ import org.apache.qpid.util.ByteBufferUt
 public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
 {
 
-    private volatile SoftReference<List<ByteBuffer>> _fragmentsRef;
+    private volatile SoftReference<Collection<QpidByteBuffer>> _fragmentsRef;
     private long _arrivalTime;
     private final long _size;
 
@@ -41,18 +43,18 @@ public class Message_1_0 extends Abstrac
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
     {
         super(storedMessage, null);
-        final List<ByteBuffer> fragments = restoreFragments(getStoredMessage());
+        final Collection<QpidByteBuffer> fragments = restoreFragments(getStoredMessage());
         _fragmentsRef = new SoftReference<>(fragments);
         _size = calculateSize(fragments);
     }
 
-    private long calculateSize(final List<ByteBuffer> fragments)
+    private long calculateSize(final Collection<QpidByteBuffer> fragments)
     {
 
         long size = 0l;
         if(fragments != null)
         {
-            for(ByteBuffer buf : fragments)
+            for(QpidByteBuffer buf : fragments)
             {
                 size += buf.remaining();
             }
@@ -60,28 +62,14 @@ public class Message_1_0 extends Abstrac
         return size;
     }
 
-    private static List<ByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
+    private static Collection<QpidByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
     {
-        ArrayList<ByteBuffer> fragments = new ArrayList<ByteBuffer>();
-        final int FRAGMENT_SIZE = 2048;
-        int offset = 0;
-        ByteBuffer b;
-        do
-        {
+        return storedMessage.getContent(0, Integer.MAX_VALUE);
 
-            b = ByteBufferUtils.combine(storedMessage.getContent(offset,FRAGMENT_SIZE));
-            if(b.hasRemaining())
-            {
-                fragments.add(b);
-                offset+= b.remaining();
-            }
-        }
-        while(b.hasRemaining());
-        return fragments;
     }
 
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
-                       final List<ByteBuffer> fragments,
+                       final Collection<QpidByteBuffer> fragments,
                        final Object connectionReference)
     {
         super(storedMessage, connectionReference);
@@ -128,10 +116,10 @@ public class Message_1_0 extends Abstrac
         return _arrivalTime;
     }
 
-    public List<ByteBuffer> getFragments()
+    public Collection<QpidByteBuffer> getFragments()
     {
 
-        List<ByteBuffer> fragments = _fragmentsRef.get();
+        Collection<QpidByteBuffer> fragments = _fragmentsRef.get();
         if(fragments == null)
         {
             fragments = restoreFragments(getStoredMessage());

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Fri Aug  7 00:28:17 2015
@@ -41,6 +41,7 @@ import org.apache.qpid.amqp_1_0.type.tra
 import org.apache.qpid.amqp_1_0.type.transport.Detach;
 import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.store.MessageHandle;
 import org.apache.qpid.server.store.StoredMessage;
@@ -87,7 +88,7 @@ public class ReceivingLink_1_0 implement
     {
         // TODO - cope with fragmented messages
 
-        List<ByteBuffer> fragments = null;
+        List<QpidByteBuffer> fragments = null;
 
 
 
@@ -108,7 +109,7 @@ public class ReceivingLink_1_0 implement
                 return;
             }
 
-            fragments = new ArrayList<ByteBuffer>(_incompleteMessage.size());
+            fragments = new ArrayList<QpidByteBuffer>(_incompleteMessage.size());
             for(Transfer t : _incompleteMessage)
             {
                 fragments.add(t.getPayload());
@@ -146,20 +147,16 @@ public class ReceivingLink_1_0 implement
         else
         {
             MessageMetaData_1_0 mmd = null;
-            List<ByteBuffer> immutableSections = new ArrayList<ByteBuffer>(3);
-            mmd = new MessageMetaData_1_0(fragments.toArray(new ByteBuffer[fragments.size()]),
+            List<QpidByteBuffer> immutableSections = new ArrayList<>(3);
+            mmd = new MessageMetaData_1_0(fragments.toArray(new QpidByteBuffer[fragments.size()]),
                     _sectionDecoder,
                     immutableSections);
 
             MessageHandle<MessageMetaData_1_0> handle = _vhost.getMessageStore().addMessage(mmd);
 
-            boolean skipping = true;
-            int offset = 0;
-
-            for(ByteBuffer bareMessageBuf : immutableSections)
+            for(QpidByteBuffer bareMessageBuf : immutableSections)
             {
                 handle.addContent(bareMessageBuf.duplicate());
-                offset += bareMessageBuf.remaining();
             }
             final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
             Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java Fri Aug  7 00:28:17 2015
@@ -47,6 +47,7 @@ import org.apache.qpid.amqp_1_0.type.tra
 import org.apache.qpid.amqp_1_0.type.transport.Detach;
 import org.apache.qpid.amqp_1_0.type.transport.Error;
 import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -78,7 +79,7 @@ public class TxnCoordinatorLink_1_0 impl
     {
         // TODO - cope with fragmented messages
 
-        ByteBuffer payload = null;
+        QpidByteBuffer payload = null;
 
 
         if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
@@ -100,7 +101,7 @@ public class TxnCoordinatorLink_1_0 impl
             {
                 size += t.getPayload().limit();
             }
-            payload = ByteBuffer.allocateDirect(size);
+            payload = QpidByteBuffer.allocateDirect(size);
             for(Transfer t : _incompleteMessage)
             {
                 payload.put(t.getPayload().duplicate());

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java Fri Aug  7 00:28:17 2015
@@ -40,6 +40,7 @@ import java.util.UUID;
 
 import javax.security.auth.Subject;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.model.*;
 import org.apache.qpid.server.transport.AMQPConnection;
@@ -135,14 +136,14 @@ public class ProtocolEngine_1_0_0Test ex
         final ByteBufferSender sender = mock(ByteBufferSender.class);
         when(_networkConnection.getSender()).thenReturn(sender);
 
-        final ArgumentCaptor<ByteBuffer> byteBufferCaptor = ArgumentCaptor.forClass(ByteBuffer.class);
+        final ArgumentCaptor<QpidByteBuffer> byteBufferCaptor = ArgumentCaptor.forClass(QpidByteBuffer.class);
 
         doAnswer(new Answer()
         {
             @Override
             public Object answer(final InvocationOnMock invocation) throws Throwable
             {
-                _sentBuffers.add(byteBufferCaptor.getValue());
+                _sentBuffers.add(byteBufferCaptor.getValue().getNativeBuffer());
                 return null;
             }
         }).when(sender).send(byteBufferCaptor.capture());
@@ -163,11 +164,12 @@ public class ProtocolEngine_1_0_0Test ex
 
         createEngine(useSASL, Transport.TCP);
 
-        _protocolEngine_1_0_0.received(ByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
+        _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance()
+                                                                   .getHeaderIdentifier()));
 
         Open open = new Open();
         _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
-        ByteBuffer buf = ByteBuffer.allocate(64*1024);
+        QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
         _frameWriter.writeToBuffer(buf);
         buf.flip();
         _protocolEngine_1_0_0.received(buf);
@@ -186,11 +188,11 @@ public class ProtocolEngine_1_0_0Test ex
 
         createEngine(useSASL, Transport.TCP);
 
-        _protocolEngine_1_0_0.received(ByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
+        _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
 
         Open open = new Open();
         _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
-        ByteBuffer buf = ByteBuffer.allocate(64*1024);
+        QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
         _frameWriter.writeToBuffer(buf);
         buf.flip();
         _protocolEngine_1_0_0.received(buf);
@@ -216,11 +218,11 @@ public class ProtocolEngine_1_0_0Test ex
         final boolean useSASL = false;
 
         createEngine(useSASL, Transport.SSL);
-        _protocolEngine_1_0_0.received(ByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
+        _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
 
         Open open = new Open();
         _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
-        ByteBuffer buf = ByteBuffer.allocate(64*1024);
+        QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
         _frameWriter.writeToBuffer(buf);
         buf.flip();
         _protocolEngine_1_0_0.received(buf);
@@ -247,22 +249,22 @@ public class ProtocolEngine_1_0_0Test ex
 
         createEngine(useSASL, Transport.TCP);
 
-        _protocolEngine_1_0_0.received(ByteBuffer.wrap(ProtocolEngineCreator_1_0_0_SASL.getInstance().getHeaderIdentifier()));
+        _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0_SASL.getInstance().getHeaderIdentifier()));
 
         SaslInit init = new SaslInit();
         init.setMechanism(Symbol.valueOf("ANONYMOUS"));
         _frameWriter.setValue(new SASLFrame(init));
-        ByteBuffer buf = ByteBuffer.allocate(64*1024);
+        QpidByteBuffer buf = QpidByteBuffer.allocate(64*1024);
         _frameWriter.writeToBuffer(buf);
 
         buf.flip();
         _protocolEngine_1_0_0.received(buf);
 
-        _protocolEngine_1_0_0.received(ByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
+        _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
 
         Open open = new Open();
         _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));
-        buf = ByteBuffer.allocate(64*1024);
+        buf = QpidByteBuffer.allocate(64*1024);
         _frameWriter.writeToBuffer(buf);
         buf.flip();
         _protocolEngine_1_0_0.received(buf);

Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java Fri Aug  7 00:28:17 2015
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
@@ -102,9 +103,9 @@ public class MessageConverter_1_0_to_v0_
             }
 
             @Override
-            public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+            public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
             {
-                return Collections.singleton(ByteBuffer.wrap(messageContent, offsetInMessage, size));
+                return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size));
             }
 
             @Override

Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Fri Aug  7 00:28:17 2015
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -199,7 +200,7 @@ public class MessageConverter_0_10_to_0_
             }
 
             @Override
-            public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+            public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
             {
                 return message.getContent(offsetInMessage, size);
             }

Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java Fri Aug  7 00:28:17 2015
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
@@ -88,7 +89,7 @@ public class MessageConverter_0_8_to_0_1
             }
 
             @Override
-            public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+            public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
             {
                 return message_0_8.getContent(offsetInMessage, size);
             }

Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java Fri Aug  7 00:28:17 2015
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
@@ -104,9 +105,9 @@ public class MessageConverter_1_0_to_v0_
             }
 
             @Override
-            public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+            public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
             {
-                return Collections.singleton(ByteBuffer.wrap(messageContent, offsetInMessage, size));
+                return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size));
             }
 
             @Override

Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original)
+++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Fri Aug  7 00:28:17 2015
@@ -45,6 +45,7 @@ import org.eclipse.jetty.util.ssl.SslCon
 import org.eclipse.jetty.websocket.WebSocket;
 import org.eclipse.jetty.websocket.WebSocketHandler;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.transport.MultiVersionProtocolEngine;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
@@ -223,7 +224,7 @@ class WebSocketProvider implements Accep
         @Override
         public void onMessage(final byte[] data, final int offset, final int length)
         {
-            _engine.received(ByteBuffer.wrap(data, offset, length).slice());
+            _engine.received(QpidByteBuffer.wrap(data, offset, length).slice());
         }
 
         @Override
@@ -277,8 +278,29 @@ class WebSocketProvider implements Accep
 
         }
 
+        private void send(final ByteBuffer msg)
+        {
+            try
+            {
+                if (msg.hasArray())
+                {
+                    _connection.sendMessage(msg.array(), msg.arrayOffset() + msg.position(), msg.remaining());
+                }
+                else
+                {
+                    byte[] copy = new byte[msg.remaining()];
+                    msg.duplicate().get(copy);
+                    _connection.sendMessage(copy, 0, copy.length);
+                }
+            }
+            catch (IOException e)
+            {
+                close();
+            }
+        }
+
         @Override
-        public void send(final ByteBuffer msg)
+        public void send(final QpidByteBuffer msg)
         {
             try
             {
@@ -299,6 +321,7 @@ class WebSocketProvider implements Accep
             }
         }
 
+
         @Override
         public void flush()
         {

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java Fri Aug  7 00:28:17 2015
@@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteA
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
 import org.slf4j.Logger;
@@ -121,7 +122,7 @@ public class AMQProtocolHandler implemen
 
     /** Object to lock on when changing the latch */
     private Object _failoverLatchChange = new Object();
-    private AMQDecoder _decoder;
+    private ClientDecoder _decoder;
 
     private ProtocolVersion _suggestedProtocolVersion;
 
@@ -559,7 +560,7 @@ public class AMQProtocolHandler implemen
         final ByteBuffer buf = asByteBuffer(frame);
         _lastWriteTime = System.currentTimeMillis();
         _writtenBytes += buf.remaining();
-        _sender.send(buf);
+        _sender.send(QpidByteBuffer.wrap(buf));
         if(flush)
         {
             _sender.flush();

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Fri Aug  7 00:28:17 2015
@@ -24,6 +24,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -33,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession_0_8;
 import org.apache.qpid.client.AMQTopic;
@@ -76,7 +78,7 @@ public abstract class AbstractJMSMessage
                     _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")");
                 }
 
-                data = ((ContentBody) bodies.get(0)).getPayload().duplicate();
+                data = ((ContentBody) bodies.get(0)).getPayload().getNativeBuffer().duplicate();
             }
             else if (bodies != null)
             {
@@ -91,7 +93,7 @@ public abstract class AbstractJMSMessage
                 while (it.hasNext())
                 {
                     ContentBody cb = (ContentBody) it.next();
-                    final ByteBuffer payload = cb.getPayload().duplicate();
+                    final ByteBuffer payload = cb.getPayload().getNativeBuffer().duplicate();
                     if (payload.isDirect() || payload.isReadOnly())
                     {
                         data.put(payload);
@@ -131,15 +133,25 @@ public abstract class AbstractJMSMessage
 
     protected AbstractJMSMessage create010MessageWithBody(long messageNbr, MessageProperties msgProps,
                                                           DeliveryProperties deliveryProps,
-                                                          java.nio.ByteBuffer body) throws QpidException
+                                                          Collection<QpidByteBuffer> body) throws QpidException
     {
         ByteBuffer data;
         final boolean debug = _logger.isDebugEnabled();
 
 
-        if (body != null)
+        if (body != null && body.size() != 0)
         {
-            data = body;
+            int size = 0;
+            for(QpidByteBuffer b : body)
+            {
+                size += b.remaining();
+            }
+            data = ByteBuffer.allocate(size);
+            for(QpidByteBuffer b : body)
+            {
+                b.get(data);
+            }
+            data.flip();
         }
         else // body == null
         {
@@ -180,7 +192,7 @@ public abstract class AbstractJMSMessage
     }
 
     public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, MessageProperties msgProps,
-                                            DeliveryProperties deliveryProps, java.nio.ByteBuffer body)
+                                            DeliveryProperties deliveryProps, Collection<QpidByteBuffer> body)
             throws JMSException, QpidException
     {
         final AbstractJMSMessage msg =
@@ -193,7 +205,7 @@ public abstract class AbstractJMSMessage
     private class BodyInputStream extends InputStream
     {
         private final Iterator<ContentBody> _bodiesIter;
-        private ByteBuffer _currentBuffer;
+        private QpidByteBuffer _currentBuffer;
         public BodyInputStream(final List<ContentBody> bodies)
         {
             _bodiesIter = bodies.iterator();

Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/MockSender.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/MockSender.java (original)
+++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/MockSender.java Fri Aug  7 00:28:17 2015
@@ -22,12 +22,19 @@ package org.apache.qpid.client.transport
 
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.transport.ByteBufferSender;
 
 public class MockSender implements ByteBufferSender
 {
 
-    public void send(ByteBuffer msg)
+    private void send(ByteBuffer msg)
+    {
+
+    }
+
+    @Override
+    public void send(final QpidByteBuffer msg)
     {
 
     }

Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java?rev=1694594&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java Fri Aug  7 00:28:17 2015
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.bytebuffer;
+
+import java.nio.ByteBuffer;
+
+public interface ByteBufferRef
+{
+    void incrementRef();
+
+    void decrementRef();
+
+    ByteBuffer getBuffer();
+}

Propchange: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java?rev=1694594&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java Fri Aug  7 00:28:17 2015
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.bytebuffer;
+
+import java.nio.ByteBuffer;
+
+class NonPooledByteBufferRef implements ByteBufferRef
+{
+    private final ByteBuffer _buffer;
+
+    NonPooledByteBufferRef(final ByteBuffer buffer)
+    {
+        _buffer = buffer;
+    }
+
+    @Override
+    public void incrementRef()
+    {
+
+    }
+
+    @Override
+    public void decrementRef()
+    {
+
+    }
+
+    @Override
+    public ByteBuffer getBuffer()
+    {
+        return _buffer;
+    }
+}

Propchange: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java?rev=1694594&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java Fri Aug  7 00:28:17 2015
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.bytebuffer;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+class PooledByteBufferRef implements ByteBufferRef
+{
+    private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> UPDATER = AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, "_refCount");
+
+    private final ByteBuffer _buffer;
+    private volatile int _refCount;
+
+    PooledByteBufferRef(final ByteBuffer buffer)
+    {
+        _buffer = buffer;
+    }
+
+    @Override
+    public void incrementRef()
+    {
+        UPDATER.incrementAndGet(this);
+    }
+
+    @Override
+    public void decrementRef()
+    {
+        if(UPDATER.decrementAndGet(this) == 0)
+        {
+            returnToPool(this);
+        }
+    }
+
+    @Override
+    public ByteBuffer getBuffer()
+    {
+        return _buffer.duplicate();
+    }
+
+    private void returnToPool(final PooledByteBufferRef byteBufferRef)
+    {
+
+    }
+
+}

Propchange: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1694594&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Fri Aug  7 00:28:17 2015
@@ -0,0 +1,769 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.bytebuffer;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.framing.AMQShortString;
+
+public final class QpidByteBuffer
+{
+    private static final AtomicIntegerFieldUpdater<QpidByteBuffer> DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(QpidByteBuffer.class, "_disposed");
+
+    private final ByteBuffer _buffer;
+    private final ByteBufferRef _ref;
+    private volatile int _disposed;
+
+    QpidByteBuffer(ByteBufferRef ref)
+    {
+        this(ref.getBuffer(), ref);
+    }
+
+    private QpidByteBuffer(ByteBuffer buf, ByteBufferRef ref)
+    {
+        _buffer = buf;
+        _ref = ref;
+        ref.incrementRef();
+    }
+
+
+    public boolean hasRemaining()
+    {
+        return _buffer.hasRemaining();
+    }
+
+    public QpidByteBuffer putInt(final int index, final int value)
+    {
+        _buffer.putInt(index, value);
+        return this;
+    }
+
+    public boolean isDirect()
+    {
+        return _buffer.isDirect();
+    }
+
+    public QpidByteBuffer putShort(final int index, final short value)
+    {
+        _buffer.putShort(index, value);
+        return this;
+    }
+
+    public QpidByteBuffer putChar(final int index, final char value)
+    {
+        _buffer.putChar(index, value);
+        return this;
+
+    }
+
+    public QpidByteBuffer put(final byte b)
+    {
+        _buffer.put(b);
+        return this;
+    }
+
+    public QpidByteBuffer put(final int index, final byte b)
+    {
+        _buffer.put(index, b);
+        return this;
+    }
+
+    public short getShort(final int index)
+    {
+        return _buffer.getShort(index);
+    }
+
+
+    public QpidByteBuffer mark()
+    {
+        _buffer.mark();
+        return this;
+    }
+
+    public long getLong()
+    {
+        return _buffer.getLong();
+    }
+
+    public QpidByteBuffer putFloat(final int index, final float value)
+    {
+        _buffer.putFloat(index, value);
+        return this;
+    }
+
+    public double getDouble(final int index)
+    {
+        return _buffer.getDouble(index);
+    }
+
+    public boolean hasArray()
+    {
+        return _buffer.hasArray();
+    }
+
+    public QpidByteBuffer asReadOnlyBuffer()
+    {
+        return new QpidByteBuffer(_buffer.asReadOnlyBuffer(), _ref);
+    }
+
+    public double getDouble()
+    {
+        return _buffer.getDouble();
+    }
+
+    public QpidByteBuffer putFloat(final float value)
+    {
+        _buffer.putFloat(value);
+        return this;
+    }
+
+    public QpidByteBuffer putInt(final int value)
+    {
+        _buffer.putInt(value);
+        return this;
+    }
+
+    public byte[] array()
+    {
+        return _buffer.array();
+    }
+
+    public QpidByteBuffer putShort(final short value)
+    {
+        _buffer.putShort(value);
+        return this;
+    }
+
+    public int getInt(final int index)
+    {
+        return _buffer.getInt(index);
+    }
+
+    public int remaining()
+    {
+        return _buffer.remaining();
+    }
+
+    public QpidByteBuffer put(final byte[] src)
+    {
+        _buffer.put(src);
+        return this;
+    }
+
+    public QpidByteBuffer put(final ByteBuffer src)
+    {
+        _buffer.put(src);
+        return this;
+    }
+
+    public QpidByteBuffer put(final QpidByteBuffer src)
+    {
+        _buffer.put(src._buffer);
+        return this;
+    }
+
+
+
+    public QpidByteBuffer get(final byte[] dst, final int offset, final int length)
+    {
+        _buffer.get(dst, offset, length);
+        return this;
+    }
+
+    public QpidByteBuffer get(final ByteBuffer src)
+    {
+        src.put(_buffer);
+        return this;
+    }
+
+
+    public QpidByteBuffer rewind()
+    {
+        _buffer.rewind();
+        return this;
+    }
+
+    public QpidByteBuffer clear()
+    {
+        _buffer.clear();
+        return this;
+    }
+
+    public QpidByteBuffer putLong(final int index, final long value)
+    {
+        _buffer.putLong(index, value);
+        return this;
+    }
+    public QpidByteBuffer compact()
+    {
+        _buffer.compact();
+        return this;
+    }
+
+    public QpidByteBuffer putDouble(final double value)
+    {
+        _buffer.putDouble(value);
+        return this;
+    }
+
+    public int limit()
+    {
+        return _buffer.limit();
+    }
+
+    public QpidByteBuffer reset()
+    {
+        _buffer.reset();
+        return this;
+    }
+
+    public QpidByteBuffer flip()
+    {
+        _buffer.flip();
+        return this;
+    }
+
+    public short getShort()
+    {
+        return _buffer.getShort();
+    }
+
+    public float getFloat()
+    {
+        return _buffer.getFloat();
+    }
+
+    public QpidByteBuffer limit(final int newLimit)
+    {
+        _buffer.limit(newLimit);
+        return this;
+    }
+
+    public QpidByteBuffer duplicate()
+    {
+        return new QpidByteBuffer(_buffer.duplicate(), _ref);
+    }
+
+    public QpidByteBuffer put(final byte[] src, final int offset, final int length)
+    {
+        _buffer.put(src, offset, length);
+        return this;
+    }
+
+    public long getLong(final int index)
+    {
+        return _buffer.getLong(index);
+    }
+
+    public int capacity()
+    {
+        return _buffer.capacity();
+    }
+
+    public boolean isReadOnly()
+    {
+        return _buffer.isReadOnly();
+    }
+
+    public char getChar(final int index)
+    {
+        return _buffer.getChar(index);
+    }
+
+    public byte get()
+    {
+        return _buffer.get();
+    }
+
+    public byte get(final int index)
+    {
+        return _buffer.get(index);
+    }
+
+    public QpidByteBuffer get(final byte[] dst)
+    {
+        _buffer.get(dst);
+        return this;
+    }
+
+    public QpidByteBuffer putChar(final char value)
+    {
+        _buffer.putChar(value);
+        return this;
+    }
+
+    public QpidByteBuffer position(final int newPosition)
+    {
+        _buffer.position(newPosition);
+        return this;
+    }
+
+    public int arrayOffset()
+    {
+        return _buffer.arrayOffset();
+    }
+
+    public char getChar()
+    {
+        return _buffer.getChar();
+    }
+
+    public int getInt()
+    {
+        return _buffer.getInt();
+    }
+
+    public QpidByteBuffer putLong(final long value)
+    {
+        _buffer.putLong(value);
+        return this;
+    }
+
+    public float getFloat(final int index)
+    {
+        return _buffer.getFloat(index);
+    }
+
+    public QpidByteBuffer slice()
+    {
+        return new QpidByteBuffer(_buffer.slice(), _ref);
+    }
+
+    public QpidByteBuffer view(int offset, int length)
+    {
+        ByteBuffer buf = _buffer.slice();
+        buf.position(offset);
+        buf = buf.slice();
+        buf.limit(length);
+        return new QpidByteBuffer(buf, _ref);
+    }
+
+    public int position()
+    {
+        return _buffer.position();
+    }
+
+    public QpidByteBuffer putDouble(final int index, final double value)
+    {
+        _buffer.putDouble(index, value);
+        return this;
+    }
+
+    @Override
+    protected void finalize() throws Throwable
+    {
+        dispose();
+        super.finalize();
+    }
+
+    public void dispose()
+    {
+        if(DISPOSED_UPDATER.compareAndSet(this,0,1))
+        {
+            _ref.decrementRef();
+        }
+    }
+
+    public InputStream asInputStream()
+    {
+        return new BufferInputStream();
+    }
+
+    public MarkableDataInput asDataInput()
+    {
+        return new BufferDataInput();
+    }
+
+
+    public DataOutput asDataOutput()
+    {
+        return new BufferDataOutput();
+    }
+
+    public static QpidByteBuffer allocate(int size)
+    {
+        return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
+    }
+
+
+    public static QpidByteBuffer allocateDirect(int size)
+    {
+        return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
+    }
+
+
+    public ByteBuffer getNativeBuffer()
+    {
+        return _buffer;
+    }
+
+    public CharBuffer decode(Charset charset)
+    {
+        return charset.decode(_buffer);
+    }
+
+    public static QpidByteBuffer wrap(final ByteBuffer wrap)
+    {
+        return new QpidByteBuffer(new NonPooledByteBufferRef(wrap));
+    }
+
+    public static QpidByteBuffer wrap(final byte[] data)
+    {
+        return wrap(ByteBuffer.wrap(data));
+    }
+
+    public static QpidByteBuffer wrap(final byte[] data, int offset, int length)
+    {
+        return wrap(ByteBuffer.wrap(data, offset, length));
+    }
+
+    private final class BufferInputStream extends InputStream
+    {
+
+        @Override
+        public int read() throws IOException
+        {
+            if (_buffer.hasRemaining())
+            {
+                return _buffer.get() & 0xFF;
+            }
+            return -1;
+        }
+
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException
+        {
+            if (!_buffer.hasRemaining())
+            {
+                return -1;
+            }
+            if(_buffer.remaining() < len)
+            {
+                len = _buffer.remaining();
+            }
+            _buffer.get(b, off, len);
+
+            return len;
+        }
+
+        @Override
+        public void mark(int readlimit)
+        {
+            _buffer.mark();
+        }
+
+        @Override
+        public void reset() throws IOException
+        {
+            _buffer.reset();
+        }
+
+        @Override
+        public boolean markSupported()
+        {
+            return true;
+        }
+
+        @Override
+        public long skip(long n) throws IOException
+        {
+            _buffer.position(_buffer.position()+(int)n);
+            return n;
+        }
+
+        @Override
+        public int available() throws IOException
+        {
+            return _buffer.remaining();
+        }
+
+        @Override
+        public void close()
+        {
+        }
+    }
+
+    private final class BufferDataInput implements MarkableDataInput
+    {
+        private int _mark;
+        private final int _offset;
+
+        public BufferDataInput()
+        {
+            _offset = _buffer.position();
+        }
+
+        public void readFully(byte[] b)
+        {
+            _buffer.get(b);
+        }
+
+        public void readFully(byte[] b, int off, int len)
+        {
+            _buffer.get(b, 0, len);
+        }
+
+        public QpidByteBuffer readAsByteBuffer(int len)
+        {
+            final QpidByteBuffer view = view(0, len);
+            skipBytes(len);
+            return view;
+        }
+
+        public int skipBytes(int n)
+        {
+            _buffer.position(_buffer.position()+n);
+            return _buffer.position()-_offset;
+        }
+
+        public boolean readBoolean()
+        {
+            return _buffer.get() != 0;
+        }
+
+        public byte readByte()
+        {
+            return _buffer.get();
+        }
+
+        public int readUnsignedByte()
+        {
+            return ((int) _buffer.get()) & 0xFF;
+        }
+
+        public short readShort()
+        {
+            return _buffer.getShort();
+        }
+
+        public int readUnsignedShort()
+        {
+            return ((int) _buffer.getShort()) & 0xffff;
+        }
+
+        public char readChar()
+        {
+            return (char) _buffer.getChar();
+        }
+
+        public int readInt()
+        {
+            return _buffer.getInt();
+        }
+
+        public long readLong()
+        {
+            return _buffer.getLong();
+        }
+
+        public float readFloat()
+        {
+            return _buffer.getFloat();
+        }
+
+        public double readDouble()
+        {
+            return _buffer.getDouble();
+        }
+
+        public AMQShortString readAMQShortString()
+        {
+            return AMQShortString.readAMQShortString(_buffer);
+        }
+
+        public String readLine()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public String readUTF()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public int available()
+        {
+            return _buffer.remaining();
+        }
+
+
+        public long skip(long i)
+        {
+            _buffer.position(_buffer.position()+(int)i);
+            return i;
+        }
+
+        public int read(byte[] b)
+        {
+            readFully(b);
+            return b.length;
+        }
+
+        public int position()
+        {
+            return _buffer.position()-_offset;
+        }
+
+        public void position(int position)
+        {
+            _buffer.position(position + _offset);
+        }
+
+        public int length()
+        {
+            return _buffer.limit();
+        }
+
+
+        public void mark(int readAhead)
+        {
+            _mark = position();
+        }
+
+        public void reset()
+        {
+            _buffer.position(_mark);
+        }
+    }
+
+    private final class BufferDataOutput implements DataOutput
+    {
+        public void write(int b)
+        {
+            _buffer.put((byte) b);
+        }
+
+        public void write(byte[] b)
+        {
+            _buffer.put(b);
+        }
+
+
+        public void write(byte[] b, int off, int len)
+        {
+            _buffer.put(b, off, len);
+
+        }
+
+        public void writeBoolean(boolean v)
+        {
+            _buffer.put(v ? (byte) 1 : (byte) 0);
+        }
+
+        public void writeByte(int v)
+        {
+            _buffer.put((byte) v);
+        }
+
+        public void writeShort(int v)
+        {
+            _buffer.putShort((short) v);
+        }
+
+        public void writeChar(int v)
+        {
+            _buffer.put((byte) (v >>> 8));
+            _buffer.put((byte) v);
+        }
+
+        public void writeInt(int v)
+        {
+            _buffer.putInt(v);
+        }
+
+        public void writeLong(long v)
+        {
+            _buffer.putLong(v);
+        }
+
+        public void writeFloat(float v)
+        {
+            writeInt(Float.floatToIntBits(v));
+        }
+
+        public void writeDouble(double v)
+        {
+            writeLong(Double.doubleToLongBits(v));
+        }
+
+        public void writeBytes(String s)
+        {
+            throw new UnsupportedOperationException("writeBytes(String s) not supported");
+        }
+
+        public void writeChars(String s)
+        {
+            int len = s.length();
+            for (int i = 0 ; i < len ; i++)
+            {
+                int v = s.charAt(i);
+                _buffer.put((byte) (v >>> 8));
+                _buffer.put((byte) v);
+            }
+        }
+
+        public void writeUTF(String s)
+        {
+            int strlen = s.length();
+
+            int pos = _buffer.position();
+            _buffer.position(pos + 2);
+
+
+            for (int i = 0; i < strlen; i++)
+            {
+                int c = s.charAt(i);
+                if ((c >= 0x0001) && (c <= 0x007F))
+                {
+                    c = s.charAt(i);
+                    _buffer.put((byte) c);
+
+                }
+                else if (c > 0x07FF)
+                {
+                    _buffer.put((byte) (0xE0 | ((c >> 12) & 0x0F)));
+                    _buffer.put((byte) (0x80 | ((c >> 6) & 0x3F)));
+                    _buffer.put((byte) (0x80 | (c & 0x3F)));
+                }
+                else
+                {
+                    _buffer.put((byte) (0xC0 | ((c >> 6) & 0x1F)));
+                    _buffer.put((byte) (0x80 | (c & 0x3F)));
+                }
+            }
+
+            int len = _buffer.position() - (pos + 2);
+
+            _buffer.put(pos++, (byte) (len >>> 8));
+            _buffer.put(pos, (byte) len);
+        }
+
+    }
+
+}

Propchange: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Fri Aug  7 00:28:17 2015
@@ -95,9 +95,6 @@ public abstract class AMQDecoder<T exten
         return _methodProcessor;
     }
 
-
-    public abstract void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException;
-
     protected void decode(final MarkableDataInput msg) throws IOException, AMQFrameDecodingException
     {
         // If this is the first read then we may be getting a protocol initiation back if we tried to negotiate

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ClientDecoder.java Fri Aug  7 00:28:17 2015
@@ -42,7 +42,6 @@ public class ClientDecoder extends AMQDe
         super(false, methodProcessor);
     }
 
-    @Override
     public void decodeBuffer(ByteBuffer buf)
             throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
     {

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java Fri Aug  7 00:28:17 2015
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.codec;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.framing.AMQShortString;
 
 import java.io.DataInput;
@@ -28,8 +29,8 @@ import java.nio.ByteBuffer;
 
 public interface MarkableDataInput extends DataInput
 {
-    public void mark(int readAhead);
-    public void reset() throws IOException;
+    void mark(int readAhead);
+    void reset() throws IOException;
 
     int available() throws IOException;
 
@@ -37,8 +38,8 @@ public interface MarkableDataInput exten
 
     int read(byte[] b) throws IOException;
 
-    ByteBuffer readAsByteBuffer(int len) throws IOException;
+    QpidByteBuffer readAsByteBuffer(int len) throws IOException;
 
-    public AMQShortString readAMQShortString() throws IOException;
+    AMQShortString readAMQShortString() throws IOException;
 
 }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/ServerDecoder.java Fri Aug  7 00:28:17 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.codec;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.framing.*;
 
 public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends ServerChannelMethodProcessor>>
@@ -38,9 +39,9 @@ public class ServerDecoder extends AMQDe
         super(true, methodProcessor);
     }
 
-    public void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
+    public void decodeBuffer(QpidByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
     {
-        decode(new ByteBufferDataInput(buf));
+        decode(buf.asDataInput());
     }
 
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Fri Aug  7 00:28:17 2015
@@ -24,6 +24,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.util.BytesDataOutput;
 
@@ -62,7 +63,7 @@ public class AMQFrame extends AMQDataBlo
     }
 
     private static final byte[] FRAME_END_BYTE_ARRAY = new byte[] { FRAME_END_BYTE };
-    private static final ByteBuffer FRAME_END_BYTE_BUFFER = ByteBuffer.allocateDirect(1);
+    private static final QpidByteBuffer FRAME_END_BYTE_BUFFER = QpidByteBuffer.allocateDirect(1);
     static
     {
         FRAME_END_BYTE_BUFFER.put(FRAME_END_BYTE);
@@ -72,7 +73,7 @@ public class AMQFrame extends AMQDataBlo
     @Override
     public long writePayload(final ByteBufferSender sender) throws IOException
     {
-        ByteBuffer frameHeader = ByteBuffer.allocate(7);
+        QpidByteBuffer frameHeader = QpidByteBuffer.allocate(7);
 
         frameHeader.put(_bodyFrame.getFrameType());
         EncodingUtils.writeUnsignedShort(frameHeader, _channel);

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Fri Aug  7 00:28:17 2015
@@ -32,6 +32,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.transport.ByteBufferSender;
@@ -118,8 +120,8 @@ public abstract class AMQMethodBodyImpl
     {
 
         final int size = getSize();
-        ByteBuffer buf = ByteBuffer.allocate(size);
-        ByteBufferDataOutput dataOutput = new ByteBufferDataOutput(buf);
+        QpidByteBuffer buf = QpidByteBuffer.allocate(size);
+        DataOutput dataOutput = buf.asDataOutput();
         writePayload(dataOutput);
         buf.flip();
         sender.send(buf);

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Fri Aug  7 00:28:17 2015
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.util.ByteBufferDataOutput;
@@ -86,7 +87,8 @@ public class BasicContentHeaderPropertie
     private static final int USER_ID_MASK = 1 << 4;
     private static final int APPLICATION_ID_MASK = 1 << 3;
     private static final int CLUSTER_ID_MASK = 1 << 2;
-    private ByteBuffer _encodedForm;
+
+    private QpidByteBuffer _encodedForm;
 
 
     public BasicContentHeaderProperties(BasicContentHeaderProperties other)
@@ -482,9 +484,8 @@ public class BasicContentHeaderPropertie
         else
         {
             int propertyListSize = getPropertyListSize();
-            ByteBuffer buf = ByteBuffer.allocateDirect(propertyListSize);
-            ByteBufferDataOutput out = new ByteBufferDataOutput(buf);
-            writePropertyListPayload(out);
+            QpidByteBuffer buf = QpidByteBuffer.allocateDirect(propertyListSize);
+            writePropertyListPayload(buf.asDataOutput());
             buf.flip();
             sender.send(buf);
             return propertyListSize;
@@ -503,9 +504,7 @@ public class BasicContentHeaderPropertie
 
         _encodedForm = buffer.readAsByteBuffer(size);
 
-        ByteBufferDataInput input = new ByteBufferDataInput(_encodedForm.slice());
-
-        decode(input);
+        decode(_encodedForm.slice().asDataInput());
 
     }
 
@@ -529,7 +528,7 @@ public class BasicContentHeaderPropertie
         {
             long length = EncodingUtils.readUnsignedInteger(buffer);
 
-            ByteBuffer buf = _encodedForm.slice();
+            QpidByteBuffer buf = _encodedForm.slice();
             buf.position(headersOffset+4);
             buf = buf.slice();
             buf.limit((int)length);

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java Fri Aug  7 00:28:17 2015
@@ -22,9 +22,10 @@ package org.apache.qpid.framing;
 
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.codec.MarkableDataInput;
 
-public class ByteArrayDataInput implements ExtendedDataInput, MarkableDataInput
+public class ByteArrayDataInput implements MarkableDataInput
 {
     private byte[] _data;
     private int _offset;
@@ -167,11 +168,11 @@ public class ByteArrayDataInput implemen
     }
 
     @Override
-    public ByteBuffer readAsByteBuffer(final int len)
+    public QpidByteBuffer readAsByteBuffer(final int len)
     {
         byte[] data = new byte[len];
         readFully(data);
-        return ByteBuffer.wrap(data);
+        return QpidByteBuffer.wrap(ByteBuffer.wrap(data));
     }
 
     public int position()

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java Fri Aug  7 00:28:17 2015
@@ -22,9 +22,10 @@ package org.apache.qpid.framing;
 
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.codec.MarkableDataInput;
 
-public class ByteBufferDataInput implements ExtendedDataInput, MarkableDataInput
+public class ByteBufferDataInput implements MarkableDataInput
 {
     private final ByteBuffer _underlying;
     private int _mark;
@@ -46,12 +47,12 @@ public class ByteBufferDataInput impleme
         _underlying.get(b,0, len);
     }
 
-    public ByteBuffer readAsByteBuffer(int len)
+    public QpidByteBuffer readAsByteBuffer(int len)
     {
         ByteBuffer buf = _underlying.slice();
         buf.limit(len);
         skipBytes(len);
-        return buf;
+        return QpidByteBuffer.wrap(buf);
     }
 
     public int skipBytes(int n)

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java Fri Aug  7 00:28:17 2015
@@ -23,9 +23,10 @@ package org.apache.qpid.framing;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.codec.MarkableDataInput;
 
-public class ByteBufferListDataInput implements ExtendedDataInput, MarkableDataInput
+public class ByteBufferListDataInput implements MarkableDataInput
 {
     private final List<ByteBuffer> _underlying;
     private int _bufferIndex;
@@ -45,7 +46,7 @@ public class ByteBufferListDataInput imp
         }
         else
         {
-            ByteBuffer buf = readAsByteBuffer(b.length);
+            ByteBuffer buf = readAsNativeByteBuffer(b.length);
             buf.get(b);
         }
     }
@@ -59,13 +60,18 @@ public class ByteBufferListDataInput imp
         }
         else
         {
-            ByteBuffer buf = readAsByteBuffer(len);
+            ByteBuffer buf = readAsNativeByteBuffer(len);
             buf.get(b, off, len);
         }
     }
 
     @Override
-    public ByteBuffer readAsByteBuffer(int len)
+    public QpidByteBuffer readAsByteBuffer(int len)
+    {
+        return QpidByteBuffer.wrap(readAsNativeByteBuffer(len));
+    }
+
+    private ByteBuffer readAsNativeByteBuffer(int len)
     {
         ByteBuffer currentBuffer = getCurrentBuffer();
         if(currentBuffer.remaining()>=len)
@@ -167,7 +173,7 @@ public class ByteBufferListDataInput imp
         }
         else
         {
-            return readAsByteBuffer(size);
+            return readAsNativeByteBuffer(size);
         }
     }
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java Fri Aug  7 00:28:17 2015
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.framing;
 
-import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
 public interface ChannelMethodProcessor
 {
@@ -32,7 +32,7 @@ public interface ChannelMethodProcessor
 
     void receiveChannelCloseOk();
 
-    void receiveMessageContent(ByteBuffer data);
+    void receiveMessageContent(QpidByteBuffer data);
 
     void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize);
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java Fri Aug  7 00:28:17 2015
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.transport.ByteBufferSender;
@@ -33,7 +34,7 @@ public class ContentBody implements AMQB
 {
     public static final byte TYPE = 3;
 
-    private ByteBuffer _payload;
+    private QpidByteBuffer _payload;
 
     public ContentBody()
     {
@@ -42,9 +43,15 @@ public class ContentBody implements AMQB
 
     public ContentBody(ByteBuffer payload)
     {
+        _payload = QpidByteBuffer.wrap(payload);
+    }
+
+    public ContentBody(QpidByteBuffer payload)
+    {
         _payload = payload;
     }
 
+
     public byte getFrameType()
     {
         return TYPE;
@@ -82,7 +89,7 @@ public class ContentBody implements AMQB
         }
     }
 
-    public ByteBuffer getPayload()
+    public QpidByteBuffer getPayload()
     {
         return _payload;
     }
@@ -92,7 +99,7 @@ public class ContentBody implements AMQB
             throws IOException
     {
 
-        ByteBuffer payload = in.readAsByteBuffer((int)bodySize);
+        QpidByteBuffer payload = in.readAsByteBuffer((int) bodySize);
 
         if(!methodProcessor.ignoreAllButCloseOk())
         {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org