You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/08/07 02:28:20 UTC

svn commit: r1694594 [2/5] - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ bdbs...

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java Fri Aug  7 00:28:17 2015
@@ -29,6 +29,7 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.VirtualHost;
@@ -115,7 +116,7 @@ public abstract class MessageStoreQuotaE
     {
         StorableMessageMetaData metaData = createMetaData(id, MESSAGE_DATA.length);
         MessageHandle<?> handle = _store.addMessage(metaData);
-        handle.addContent(ByteBuffer.wrap(MESSAGE_DATA));
+        handle.addContent(QpidByteBuffer.wrap(MESSAGE_DATA));
         StoredMessage<? extends StorableMessageMetaData> storedMessage = handle.allContentAdded();
         TestMessage message = new TestMessage(id, storedMessage);
         return message;

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Fri Aug  7 00:28:17 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
@@ -105,7 +106,7 @@ public class TestMessageMetaDataType imp
         }
 
         @Override
-        public Collection<ByteBuffer> getContent(int offset, int size)
+        public Collection<QpidByteBuffer> getContent(int offset, int size)
         {
             return null;
         }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/NetworkConnectionSchedulerTest.java Fri Aug  7 00:28:17 2015
@@ -19,6 +19,7 @@
 
 package org.apache.qpid.server.transport;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.network.AggregateTicker;
@@ -112,7 +113,7 @@ public class NetworkConnectionSchedulerT
         Thread.sleep(500l);
         timidSender.start();
         Thread.sleep(1000l);
-        verify(timidEngine, atLeast(6)).received(any(ByteBuffer.class));
+        verify(timidEngine, atLeast(6)).received(any(QpidByteBuffer.class));
         _keepRunningThreads = false;
         transport.close();
         scheduler.close();

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Fri Aug  7 00:28:17 2015
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
@@ -107,7 +108,7 @@ class MockServerMessage implements Serve
     }
 
 
-    public Collection<ByteBuffer> getContent(int offset, int size)
+    public Collection<QpidByteBuffer> getContent(int offset, int size)
     {
         throw new UnsupportedOperationException();
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Fri Aug  7 00:28:17 2015
@@ -34,6 +34,7 @@ import javax.security.auth.Subject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.model.Broker;
@@ -64,7 +65,7 @@ import org.apache.qpid.transport.network
 public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10>
 {
     private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class);
-    private final InputHandler _inputHandler;
+    private final ServerInputHandler _inputHandler;
 
 
     private final NetworkConnection _network;
@@ -103,7 +104,8 @@ public class AMQPConnection_0_10 extends
         _connection.setRemoteAddress(network.getRemoteAddress());
         _connection.setLocalAddress(network.getLocalAddress());
 
-        _inputHandler = new InputHandler(new ServerAssembler(_connection), true);
+        _inputHandler = new ServerInputHandler(new ServerAssembler(_connection));
+        _connection.addFrameSizeObserver(_inputHandler);
         _network = network;
 
         Subject.doAs(getSubject(), new PrivilegedAction<Object>()
@@ -163,7 +165,7 @@ public class AMQPConnection_0_10 extends
         return new ByteBufferSender()
         {
             @Override
-            public void send(ByteBuffer msg)
+            public void send(final QpidByteBuffer msg)
             {
                 updateLastWriteTime();
                 sender.send(msg);
@@ -184,7 +186,7 @@ public class AMQPConnection_0_10 extends
         };
     }
 
-    public void received(final ByteBuffer buf)
+    public void received(final QpidByteBuffer buf)
     {
         Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>()
         {
@@ -212,10 +214,6 @@ public class AMQPConnection_0_10 extends
                         throw new ConnectionScopedRuntimeException(e);
                     }
                 }
-                finally
-                {
-                    buf.position(buf.limit());
-                }
                 return null;
             }
         });

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Fri Aug  7 00:28:17 2015
@@ -21,13 +21,18 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.flow.FlowCreditManager;
@@ -266,26 +271,26 @@ public class ConsumerTarget_0_10 extends
         boolean msgCompressed = messageProps != null && GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding());
 
 
-        ByteBuffer body = ByteBufferUtils.combine(msg.getBody());
+        Collection<QpidByteBuffer> body = msg.getBody();
 
         boolean compressionSupported = _session.getConnection().getConnectionDelegate().isCompressionSupported();
 
         if(msgCompressed && !compressionSupported)
         {
-            byte[] uncompressed = GZIPUtils.uncompressBufferToArray(body);
+            byte[] uncompressed = GZIPUtils.uncompressBufferToArray(ByteBufferUtils.combine(body));
             if(uncompressed != null)
             {
                 messageProps.setContentEncoding(null);
-                body = ByteBuffer.wrap(uncompressed);
+                body = Collections.singleton(QpidByteBuffer.wrap(uncompressed));
             }
         }
         else if(!msgCompressed
                 && compressionSupported
                 && (messageProps == null || messageProps.getContentEncoding()==null)
                 && body != null
-                && body.remaining() > _session.getConnection().getMessageCompressionThreshold())
+                && ByteBufferUtils.remaining(body) > _session.getConnection().getMessageCompressionThreshold())
         {
-            byte[] compressed = GZIPUtils.compressBufferToArray(body);
+            byte[] compressed = GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(body));
             if(compressed != null)
             {
                 if(messageProps == null)
@@ -293,7 +298,7 @@ public class ConsumerTarget_0_10 extends
                     messageProps = new MessageProperties();
                 }
                 messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
-                body = ByteBuffer.wrap(compressed);
+                body = Collections.singleton(QpidByteBuffer.wrap(compressed));
             }
         }
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java Fri Aug  7 00:28:17 2015
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.plugin.PluggableService;
@@ -91,9 +92,9 @@ public class MessageConverter_Internal_t
                     }
 
                     @Override
-                    public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+                    public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
                     {
-                        return Collections.singleton(ByteBuffer.wrap(messageContent, offsetInMessage, size));
+                        return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size));
                     }
 
                     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java Fri Aug  7 00:28:17 2015
@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.plugin.PluggableService;
@@ -91,7 +92,7 @@ public class MessageConverter_v0_10 impl
                     }
 
                     @Override
-                    public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+                    public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
                     {
                         return serverMsg.getContent(offsetInMessage, size);
                     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Fri Aug  7 00:28:17 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.
 import java.nio.ByteBuffer;
 import java.util.Collection;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.store.StoredMessage;
@@ -77,7 +78,7 @@ public class MessageTransferMessage exte
         return getMetaData().getHeader();
     }
 
-    public Collection<ByteBuffer> getBody()
+    public Collection<QpidByteBuffer> getBody()
     {
         return  getContent(0, (int)getSize());
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java Fri Aug  7 00:28:17 2015
@@ -22,32 +22,61 @@ package org.apache.qpid.server.protocol.
 
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.codec.*;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.NetworkEventReceiver;
+import org.apache.qpid.transport.ProtocolError;
+import org.apache.qpid.transport.ProtocolEvent;
+import org.apache.qpid.transport.ProtocolHeader;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.transport.network.Frame;
+import org.apache.qpid.transport.network.NetworkDelegate;
 import org.apache.qpid.transport.network.NetworkEvent;
 
-public class ServerAssembler extends Assembler
+public class ServerAssembler
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ServerAssembler.class);
 
 
     private final ServerConnection _connection;
 
-    public ServerAssembler(final ServerConnection connection)
+
+
+    // Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge
+    // array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1.
+    private static final int ARRAY_SIZE = 0xFF;
+    private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1];
+    private final Map<Integer, Method> _incompleteMethodMap = new HashMap<>();
+
+    private final Map<Integer,List<ServerFrame>> _segments;
+    private final ServerDecoder _decoder = new ServerDecoder();
+
+    public ServerAssembler(ServerConnection connection)
     {
-        super(connection);
         _connection = connection;
+        _segments = new HashMap<>();
     }
 
-    @Override
-    public void received(final NetworkEvent event)
+    public void received(final ServerFrame event)
     {
         if (!_connection.isIgnoreFutureInput())
         {
-            super.received(event);
+            frame(event);
         }
         else
         {
@@ -55,11 +84,209 @@ public class ServerAssembler extends Ass
         }
     }
 
-    @Override
     protected ByteBuffer allocateByteBuffer(int size)
     {
         return ByteBuffer.allocateDirect(size);
     }
 
 
+    private int segmentKey(ServerFrame frame)
+    {
+        return (frame.getTrack() + 1) * frame.getChannel();
+    }
+
+    private List<ServerFrame> getSegment(ServerFrame frame)
+    {
+        return _segments.get(segmentKey(frame));
+    }
+
+    private void setSegment(ServerFrame frame, List<ServerFrame> segment)
+    {
+        int key = segmentKey(frame);
+        if (_segments.containsKey(key))
+        {
+            error(new ProtocolError(Frame.L2, "segment in progress: %s",
+                                    frame));
+        }
+        _segments.put(segmentKey(frame), segment);
+    }
+
+    private void clearSegment(ServerFrame frame)
+    {
+        _segments.remove(segmentKey(frame));
+    }
+
+    private void emit(int channel, ProtocolEvent event)
+    {
+        event.setChannel(channel);
+        _connection.received(event);
+    }
+
+    public void exception(Throwable t)
+    {
+        _connection.exception(t);
+    }
+
+    public void closed()
+    {
+        _connection.closed();
+    }
+
+    public void init(ProtocolHeader header)
+    {
+        emit(0, header);
+    }
+
+    public void error(ProtocolError error)
+    {
+        emit(0, error);
+    }
+
+    public void frame(ServerFrame frame)
+    {
+        List<QpidByteBuffer> segment;
+        if (frame.isFirstFrame() && frame.isLastFrame())
+        {
+            segment = Collections.singletonList(frame.getBody());
+            assemble(frame, segment);
+        }
+        else
+        {
+            List<ServerFrame> frames;
+            if (frame.isFirstFrame())
+            {
+                frames = new ArrayList<>();
+                setSegment(frame, frames);
+            }
+            else
+            {
+                frames = getSegment(frame);
+            }
+
+            frames.add(frame);
+
+            if (frame.isLastFrame())
+            {
+                clearSegment(frame);
+                segment = new ArrayList<>(frames.size());
+                for (ServerFrame f : frames)
+                {
+                    segment.add(f.getBody());
+                }
+                assemble(frame, segment);
+            }
+        }
+
+    }
+
+    private void assemble(ServerFrame frame, List<QpidByteBuffer> segment)
+    {
+        ServerDecoder dec = _decoder;
+        dec.init(segment);
+
+        int channel = frame.getChannel();
+        Method command;
+
+        switch (frame.getType())
+        {
+            case CONTROL:
+                int controlType = dec.readUint16();
+                Method control = Method.create(controlType);
+                control.read(dec);
+                emit(channel, control);
+                break;
+            case COMMAND:
+                int commandType = dec.readUint16();
+                // read in the session header, right now we don't use it
+                int hdr = dec.readUint16();
+                command = Method.create(commandType);
+                command.setSync((0x0001 & hdr) != 0);
+                command.read(dec);
+                if (command.hasPayload() && !frame.isLastSegment())
+                {
+                    setIncompleteCommand(channel, command);
+                }
+                else
+                {
+                    emit(channel, command);
+                }
+                break;
+            case HEADER:
+                command = getIncompleteCommand(channel);
+                List<Struct> structs = null;
+                DeliveryProperties deliveryProps = null;
+                MessageProperties messageProps = null;
+
+                while (dec.hasRemaining())
+                {
+                    Struct struct = dec.readStruct32();
+                    if(struct instanceof  DeliveryProperties && deliveryProps == null)
+                    {
+                        deliveryProps = (DeliveryProperties) struct;
+                    }
+                    else if(struct instanceof MessageProperties && messageProps == null)
+                    {
+                        messageProps = (MessageProperties) struct;
+                    }
+                    else
+                    {
+                        if(structs == null)
+                        {
+                            structs = new ArrayList<>(2);
+                        }
+                        structs.add(struct);
+                    }
+
+                }
+                command.setHeader(new Header(deliveryProps,messageProps,structs));
+
+                if (frame.isLastSegment())
+                {
+                    setIncompleteCommand(channel, null);
+                    emit(channel, command);
+                }
+                break;
+            case BODY:
+                command = getIncompleteCommand(channel);
+                command.setBody(segment);
+                setIncompleteCommand(channel, null);
+                emit(channel, command);
+                break;
+            default:
+                throw new IllegalStateException("unknown frame type: " + frame.getType());
+        }
+
+        dec.releaseBuffer();
+    }
+
+    private void setIncompleteCommand(int channelId, Method incomplete)
+    {
+        if ((channelId & ARRAY_SIZE) == channelId)
+        {
+            _incompleteMethodArray[channelId] = incomplete;
+        }
+        else
+        {
+            if(incomplete != null)
+            {
+                _incompleteMethodMap.put(channelId, incomplete);
+            }
+            else
+            {
+                _incompleteMethodMap.remove(channelId);
+            }
+        }
+    }
+
+    private Method getIncompleteCommand(int channelId)
+    {
+        if ((channelId & ARRAY_SIZE) == channelId)
+        {
+            return _incompleteMethodArray[channelId];
+        }
+        else
+        {
+            return _incompleteMethodMap.get(channelId);
+        }
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Fri Aug  7 00:28:17 2015
@@ -294,7 +294,6 @@ public class ServerConnectionDelegate ex
         }
 
         setConnectionTuneOkChannelMax(sconn, okChannelMax);
-
         conn.setMaxFrameSize(okMaxFrameSize);
     }
 

Copied: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java (from r1693306, qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java?p2=qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java&p1=qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java&r1=1693306&r2=1694594&rev=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java Fri Aug  7 00:28:17 2015
@@ -18,82 +18,138 @@
  * under the License.
  *
  */
-package org.apache.qpid.transport.codec;
-
-import org.apache.qpid.transport.Binary;
+package org.apache.qpid.server.protocol.v0_10;
 
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
+import java.util.List;
 
-/**
- * Byte Buffer Decoder.
- * Decoder concrete implementor using a backing byte buffer for decoding data.
- *
- * @author Rafael H. Schloming
- */
-public final class BBDecoder extends AbstractDecoder
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.transport.codec.AbstractDecoder;
+
+public final class ServerDecoder extends AbstractDecoder
 {
-    private ByteBuffer in;
+    private List<QpidByteBuffer> _underlying;
+    private int _bufferIndex;
 
-    public void init(ByteBuffer in)
-    {
-        this.in = in;
-        this.in.order(ByteOrder.BIG_ENDIAN);
-    }
 
-    public void releaseBuffer()
+    public void init(List<QpidByteBuffer> in)
     {
-        in = null;
+        _underlying = in;
     }
 
-    protected byte doGet()
+    private void advanceIfNecessary()
     {
-        return in.get();
+        while(!getCurrentBuffer().hasRemaining() && _bufferIndex != _underlying.size()-1)
+        {
+            _bufferIndex++;
+        }
     }
 
-    protected void doGet(byte[] bytes)
+    private QpidByteBuffer getBuffer(int size)
     {
-        in.get(bytes);
+        advanceIfNecessary();
+        final QpidByteBuffer currentBuffer = getCurrentBuffer();
+        if(currentBuffer.remaining()>= size)
+        {
+            return currentBuffer;
+        }
+        else
+        {
+            return readAsNativeByteBuffer(size);
+        }
     }
 
-    protected Binary get(int size)
+    private QpidByteBuffer readAsNativeByteBuffer(int len)
     {
-        if (in.hasArray())
+        QpidByteBuffer currentBuffer = getCurrentBuffer();
+        if(currentBuffer.remaining()>=len)
         {
-            byte[] bytes = in.array();
-            Binary bin = new Binary(bytes, in.arrayOffset() + in.position(), size);
-            in.position(in.position() + size);
-            return bin;
+            QpidByteBuffer buf = currentBuffer.slice();
+            buf.limit(len);
+            currentBuffer.position(currentBuffer.position()+len);
+            return buf;
         }
         else
         {
-            return super.get(size);
+            QpidByteBuffer dest = currentBuffer.isDirect() ? QpidByteBuffer.allocateDirect(len) : QpidByteBuffer.allocate(len);
+            while(dest.hasRemaining() && available()>0)
+            {
+                advanceIfNecessary();
+                currentBuffer = getCurrentBuffer();
+                final int remaining = dest.remaining();
+                if(currentBuffer.remaining()>= remaining)
+                {
+                    QpidByteBuffer buf = currentBuffer.slice();
+                    buf.limit(remaining);
+                    currentBuffer.position(currentBuffer.position()+remaining);
+                    dest.put(buf);
+                }
+                else
+                {
+                    dest.put(currentBuffer);
+                }
+            }
+
+            dest.flip();
+            return dest;
         }
     }
 
+    private int available()
+    {
+        int remaining = 0;
+        for(int i = _bufferIndex; i < _underlying.size(); i++)
+        {
+            remaining += _underlying.get(i).remaining();
+        }
+        return remaining;
+    }
+
+
+    private QpidByteBuffer getCurrentBuffer()
+    {
+        return _underlying.get(_bufferIndex);
+    }
+
+
+    public void releaseBuffer()
+    {
+        _underlying = null;
+    }
+
+    protected byte doGet()
+    {
+        return getBuffer(1).get();
+    }
+
+    protected void doGet(byte[] bytes)
+    {
+        getBuffer(bytes.length).get(bytes);
+    }
+
     public boolean hasRemaining()
     {
-        return in.hasRemaining();
+        return available() != 0;
     }
 
     public short readUint8()
     {
-        return (short) (0xFF & in.get());
+        return (short) (0xFF & getBuffer(1).get());
     }
 
     public int readUint16()
     {
-        return 0xFFFF & in.getShort();
+        return 0xFFFF & getBuffer(2).getShort();
     }
 
     public long readUint32()
     {
-        return 0xFFFFFFFFL & in.getInt();
+        return 0xFFFFFFFFL & getBuffer(4).getInt();
     }
 
     public long readUint64()
     {
-        return in.getLong();
+        return getBuffer(8).getLong();
     }
 
 	public byte[] readBin128()
@@ -112,38 +168,38 @@ public final class BBDecoder extends Abs
 	
 	public double readDouble()
 	{
-		return in.getDouble();
+		return getBuffer(8).getDouble();
 	}
 
 	public float readFloat()
 	{
-		return in.getFloat();
+		return getBuffer(4).getFloat();
 	}
 
 	public short readInt16()
 	{
-		return in.getShort();
+		return getBuffer(2).getShort();
 	}
 
 	public int readInt32()
 	{
-		return in.getInt();
+		return getBuffer(4).getInt();
 	}
 
 	public byte readInt8()
 	{
-		return in.get();
+		return getBuffer(1).get();
 	}
 
 	public byte[] readReaminingBytes()
 	{
-      byte[] result = new byte[in.limit() - in.position()];
+      byte[] result = new byte[available()];
       get(result);
       return result;		
 	}
 
 	public long readInt64()
 	{
-		return in.getLong();
+		return getBuffer(8).getLong();
 	}
-}
\ No newline at end of file
+}

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java Fri Aug  7 00:28:17 2015
@@ -28,10 +28,14 @@ import static org.apache.qpid.transport.
 import static org.apache.qpid.transport.network.Frame.LAST_SEG;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.FrameSizeObserver;
 import org.apache.qpid.transport.Header;
@@ -91,9 +95,9 @@ public final class ServerDisassembler im
         }
     }
 
-    private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
+    private void frame(byte flags, byte type, byte track, int channel, int size, Collection<QpidByteBuffer> buf)
     {
-        ByteBuffer data = ByteBuffer.allocate(HEADER_SIZE);
+        QpidByteBuffer data = QpidByteBuffer.allocate(HEADER_SIZE);
 
         data.put(0, flags);
         data.put(1, type);
@@ -102,21 +106,43 @@ public final class ServerDisassembler im
         data.putShort(6, (short) channel);
 
 
-        ByteBuffer dup = buf.duplicate();
-        dup.limit(dup.position() + size);
-        buf.position(buf.position() + size);
         _sender.send(data);
-        _sender.send(dup);
-
+        if(size > 0)
+        {
+            int residual = size;
+            for(QpidByteBuffer b : buf)
+            {
+                final int remaining = b.remaining();
+                if(remaining > 0 )
+                {
+                    if(remaining >= residual)
+                    {
+                        _sender.send(b.view(0,residual));
+                        b.position(b.position()+residual);
+                        break;
+                    }
+                    else
+                    {
+                        _sender.send(b.duplicate());
+                        b.position(b.limit());
+                        residual-=remaining;
+                    }
+                }
+            }
+        }
 
     }
 
-    private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf)
+    private void fragment(byte flags, SegmentType type, ProtocolEvent event, Collection<QpidByteBuffer> buf)
     {
         byte typeb = (byte) type.getValue();
         byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
 
-        int remaining = buf.remaining();
+        int remaining = 0;
+        for(QpidByteBuffer b : buf)
+        {
+            remaining += b.remaining();
+        }
         boolean first = true;
         while (true)
         {
@@ -145,7 +171,7 @@ public final class ServerDisassembler im
 
     public void init(Void v, ProtocolHeader header)
     {
-        _sender.send(header.toByteBuffer(true));
+        _sender.send(header.toByteBuffer());
         _sender.flush();
 }
 
@@ -217,17 +243,22 @@ public final class ServerDisassembler im
             buf.position(0);
             buf.limit(methodLimit);
 
-            fragment(flags, type, method, buf.duplicate());
+            fragment(flags, type, method, Collections.singletonList(QpidByteBuffer.wrap(buf.duplicate())));
             if (payload)
             {
-                ByteBuffer body = method.getBody();
+                Collection<QpidByteBuffer> body = method.getBody();
                 buf.limit(headerLimit);
                 buf.position(methodLimit);
 
-                fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, buf.duplicate());
+                fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, Collections.singletonList(QpidByteBuffer.wrap(buf.duplicate())));
                 if (body != null)
                 {
-                    fragment(LAST_SEG, SegmentType.BODY, method, body.duplicate());
+                    Collection<QpidByteBuffer> dup = new ArrayList<>();
+                    for(QpidByteBuffer b : body)
+                    {
+                        dup.add(b.duplicate());
+                    }
+                    fragment(LAST_SEG, SegmentType.BODY, method, dup);
                 }
 
             }

Copied: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerFrame.java (from r1693306, qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Frame.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerFrame.java?p2=qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerFrame.java&p1=qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Frame.java&r1=1693306&r2=1694594&rev=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Frame.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerFrame.java Fri Aug  7 00:28:17 2015
@@ -18,23 +18,25 @@
  * under the License.
  *
  */
-package org.apache.qpid.transport.network;
+package org.apache.qpid.server.protocol.v0_10;
 
 import static org.apache.qpid.transport.util.Functions.str;
 
 import java.nio.ByteBuffer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.transport.SegmentType;
+import org.apache.qpid.transport.network.NetworkDelegate;
+import org.apache.qpid.transport.network.NetworkEvent;
 
 
-/**
- * Frame
- *
- * @author Rafael H. Schloming
- */
-
-public final class Frame implements NetworkEvent
+public final class ServerFrame
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ServerFrame.class);
+
     public static final int HEADER_SIZE = 12;
 
     // XXX: enums?
@@ -43,10 +45,6 @@ public final class Frame implements Netw
     public static final byte L3 = 2;
     public static final byte L4 = 3;
 
-    public static final byte RESERVED = 0x0;
-
-    public static final byte VERSION = 0x0;
-
     public static final byte FIRST_SEG = 0x8;
     public static final byte LAST_SEG = 0x4;
     public static final byte FIRST_FRAME = 0x2;
@@ -56,10 +54,10 @@ public final class Frame implements Netw
     final private SegmentType type;
     final private byte track;
     final private int channel;
-    final private ByteBuffer body;
+    final private QpidByteBuffer body;
 
-    public Frame(byte flags, SegmentType type, byte track, int channel,
-                 ByteBuffer body)
+    public ServerFrame(byte flags, SegmentType type, byte track, int channel,
+                       QpidByteBuffer body)
     {
         this.flags = flags;
         this.type = type;
@@ -68,7 +66,7 @@ public final class Frame implements Netw
         this.body = body;
     }
 
-    public ByteBuffer getBody()
+    public QpidByteBuffer getBody()
     {
         return body.slice();
     }
@@ -123,11 +121,6 @@ public final class Frame implements Netw
         return flag(LAST_FRAME);
     }
 
-    public void delegate(NetworkDelegate delegate)
-    {
-        delegate.frame(this);
-    }
-
     public String toString()
     {
         StringBuilder str = new StringBuilder();

Copied: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java (from r1693325, qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java?p2=qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java&p1=qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java&r1=1693325&r2=1694594&rev=1694594&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerInputHandler.java Fri Aug  7 00:28:17 2015
@@ -18,40 +18,26 @@
  * under the License.
  *
  */
-package org.apache.qpid.transport.network;
+package org.apache.qpid.server.protocol.v0_10;
 
-import static org.apache.qpid.transport.network.InputHandler.State.ERROR;
-import static org.apache.qpid.transport.network.InputHandler.State.FRAME_BODY;
-import static org.apache.qpid.transport.network.InputHandler.State.FRAME_HDR;
-import static org.apache.qpid.transport.network.InputHandler.State.PROTO_HDR;
 import static org.apache.qpid.transport.util.Functions.str;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
+import static org.apache.qpid.server.protocol.v0_10.ServerInputHandler.State.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.transport.Constant;
-import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
 import org.apache.qpid.transport.FrameSizeObserver;
-import org.apache.qpid.transport.NetworkEventReceiver;
 import org.apache.qpid.transport.ProtocolError;
 import org.apache.qpid.transport.ProtocolHeader;
 import org.apache.qpid.transport.SegmentType;
 
 
-/**
- * InputHandler
- *
- * @author Rafael H. Schloming
- */
-
-public class InputHandler implements ExceptionHandlingByteBufferReceiver, FrameSizeObserver
+public class ServerInputHandler implements FrameSizeObserver
 {
-    private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
-    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer
-            .allocate(0);
+    private static final Logger LOGGER = LoggerFactory.getLogger(ServerInputHandler.class);
+    private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.allocate(0);
 
     private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
 
@@ -60,15 +46,11 @@ public class InputHandler implements Exc
     {
         PROTO_HDR,
         FRAME_HDR,
-        FRAME_BODY,
         ERROR;
     }
-    private final NetworkEventReceiver receiver;
+    private final ServerAssembler _serverAssembler;
 
-    private final boolean _useDirect;
-    private State state;
-    private ByteBuffer input = null;
-    private int needed;
+    private State _state = PROTO_HDR;
 
     private byte flags;
     private SegmentType type;
@@ -76,21 +58,10 @@ public class InputHandler implements Exc
     private int channel;
 
 
-    public InputHandler(NetworkEventReceiver receiver, final boolean useDirect)
+    public ServerInputHandler(ServerAssembler serverAssembler)
     {
-        this.receiver = receiver;
-        this.state = PROTO_HDR;
-        _useDirect = useDirect;
-
-        switch (state)
-        {
-            case PROTO_HDR:
-                needed = 8;
-                break;
-            case FRAME_HDR:
-                needed = Frame.HEADER_SIZE;
-                break;
-        }
+        _serverAssembler = serverAssembler;
+        _state = PROTO_HDR;
     }
 
     public void setMaxFrameSize(final int maxFrameSize)
@@ -100,125 +71,127 @@ public class InputHandler implements Exc
 
     private void error(String fmt, Object ... args)
     {
-        receiver.received(new ProtocolError(Frame.L1, fmt, args));
+        _serverAssembler.error(new ProtocolError(ServerFrame.L1, fmt, args));
     }
 
-    @Override
-    public void received(ByteBuffer buf)
+    public void received(QpidByteBuffer buf)
     {
-        int limit = buf.limit();
-        int remaining = buf.remaining();
-        while (remaining > 0)
+        int position = buf.position();
+        while(buf.hasRemaining() && _state != ERROR)
         {
-            if (remaining >= needed)
-            {
-                int consumed = needed;
-                int pos = buf.position();
-                if (input == null)
-                {
-                    buf.limit(pos + needed);
-                    input = buf;
-                    state = next(pos);
-                    buf.limit(limit);
-                    buf.position(pos + consumed);
-                }
-                else
-                {
-                    buf.limit(pos + needed);
-                    input.put(buf);
-                    buf.limit(limit);
-                    input.flip();
-                    state = next(0);
-                }
+            parse(buf);
 
-                remaining -= consumed;
-                input = null;
+            int newPosition = buf.position();
+            if(position == newPosition)
+            {
+                break;
             }
             else
             {
-                if (input == null)
-                {
-                    input = _useDirect ? ByteBuffer.allocateDirect(needed) : ByteBuffer.allocate(needed);
-                }
-                input.put(buf);
-                needed -= remaining;
-                remaining = 0;
+                position = newPosition;
             }
         }
     }
 
-    private State next(int pos)
+    private void parse(QpidByteBuffer buffer)
     {
-        input.order(ByteOrder.BIG_ENDIAN);
-
-        switch (state) {
-        case PROTO_HDR:
-            if (input.get(pos) != 'A' &&
-                input.get(pos + 1) != 'M' &&
-                input.get(pos + 2) != 'Q' &&
-                input.get(pos + 3) != 'P')
-            {
-                error("bad protocol header: %s", str(input));
-                return ERROR;
-            }
+        buffer.mark();
+        switch (_state) {
+            case PROTO_HDR:
+                if(buffer.remaining() < 8)
+                {
+                    break;
+                }
+                if (buffer.get() != 'A' ||
+                    buffer.get() != 'M' ||
+                    buffer.get() != 'Q' ||
+                    buffer.get() != 'P')
+                {
+                    buffer.reset();
+                    error("bad protocol header: %s", str(buffer));
+                    _state = ERROR;
+                }
+                else
+                {
+                    byte protoClass = buffer.get();
+                    byte instance = buffer.get();
+                    byte major = buffer.get();
+                    byte minor = buffer.get();
 
-            byte protoClass = input.get(pos + 4);
-            byte instance = input.get(pos + 5);
-            byte major = input.get(pos + 6);
-            byte minor = input.get(pos + 7);
-            receiver.received(new ProtocolHeader(protoClass, instance, major, minor));
-            needed = Frame.HEADER_SIZE;
-            return FRAME_HDR;
-        case FRAME_HDR:
-            flags = input.get(pos);
-            type = SegmentType.get(input.get(pos + 1));
-            int size = (0xFFFF & input.getShort(pos + 2));
-            size -= Frame.HEADER_SIZE;
-            _maxFrameSize = 64 * 1024;
-            if (size < 0 || size > (_maxFrameSize - 12))
-            {
-                error("bad frame size: %d", size);
-                return ERROR;
-            }
-            byte b = input.get(pos + 5);
-            if ((b & 0xF0) != 0) {
-                error("non-zero reserved bits in upper nibble of " +
-                      "frame header byte 5: '%x'", b);
-                return ERROR;
-            } else {
-                track = (byte) (b & 0xF);
-            }
-            channel = (0xFFFF & input.getShort(pos + 6));
-            if (size == 0)
-            {
-                Frame frame = new Frame(flags, type, track, channel, EMPTY_BYTE_BUFFER);
-                receiver.received(frame);
-                needed = Frame.HEADER_SIZE;
-                return FRAME_HDR;
-            }
-            else
-            {
-                needed = size;
-                return FRAME_BODY;
-            }
-        case FRAME_BODY:
-            Frame frame = new Frame(flags, type, track, channel, input.slice());
-            receiver.received(frame);
-            needed = Frame.HEADER_SIZE;
-            return FRAME_HDR;
-        default:
-            throw new IllegalStateException();
+                    _serverAssembler.init(new ProtocolHeader(protoClass, instance, major, minor));
+                    _state = FRAME_HDR;
+                }
+                break;
+            case FRAME_HDR:
+                if(buffer.remaining() < ServerFrame.HEADER_SIZE)
+                {
+                    buffer.reset();
+                }
+                else
+                {
+                    flags = buffer.get();
+                    type = SegmentType.get(buffer.get());
+                    int size = (0xFFFF & buffer.getShort());
+
+                    size -= ServerFrame.HEADER_SIZE;
+                    if (size < 0 || size > (_maxFrameSize - ServerFrame.HEADER_SIZE))
+                    {
+                        error("bad frame size: %d", size);
+                        _state = ERROR;
+                    }
+                    else
+                    {
+                        buffer.get(); // skip unused byte
+                        byte b = buffer.get();
+                        if ((b & 0xF0) != 0)
+                        {
+                            error("non-zero reserved bits in upper nibble of " +
+                                  "frame header byte 5: '%x'", b);
+                            _state = ERROR;
+                        }
+                        else
+                        {
+                            track = (byte) (b & 0xF);
+
+                            channel = (0xFFFF & buffer.getShort());
+                            buffer.position(buffer.position()+4);
+                            if (size == 0)
+                            {
+                                ServerFrame frame = new ServerFrame(flags, type, track, channel, EMPTY_BYTE_BUFFER);
+                                _serverAssembler.received(frame);
+
+                            }
+                            else if (buffer.remaining() < size)
+                            {
+                                buffer.reset();
+                            }
+                            else
+                            {
+                                final QpidByteBuffer body = buffer.slice();
+                                body.limit(size);
+                                ServerFrame frame = new ServerFrame(flags, type, track, channel, body);
+                                buffer.position(buffer.position() + size);
+
+                                _serverAssembler.received(frame);
+                            }
+                        }
+                    }
+                }
+                break;
+            default:
+                throw new IllegalStateException();
         }
+
     }
 
     public void exception(Throwable t)
     {
-        receiver.exception(t);
+        _serverAssembler.exception(t);
     }
 
     public void closed()
     {
-        receiver.closed();
+        _serverAssembler.closed();
     }
 
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Fri Aug  7 00:28:17 2015
@@ -34,6 +34,7 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.protocol.AMQConstant;
@@ -500,10 +501,13 @@ public class ServerSessionDelegate exten
                                                                    final MessageMetaData_0_10 messageMetaData, final MessageStore store)
     {
         final MessageHandle<MessageMetaData_0_10> addedMessage = store.addMessage(messageMetaData);
-        ByteBuffer body = xfr.getBody();
+        Collection<QpidByteBuffer> body = xfr.getBody();
         if(body != null)
         {
-            addedMessage.addContent(body);
+            for(QpidByteBuffer b : body)
+            {
+                addedMessage.addContent(b);
+            }
         }
         final StoredMessage<MessageMetaData_0_10> storedMessage = addedMessage.allContentAdded();
         return storedMessage;

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Aug  7 00:28:17 2015
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.
 
 import static org.apache.qpid.transport.util.Functions.hex;
 
-import java.nio.ByteBuffer;
 import java.security.AccessControlException;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
@@ -49,6 +48,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.*;
@@ -2511,7 +2511,7 @@ public class AMQChannel
     }
 
     @Override
-    public void receiveMessageContent(final ByteBuffer data)
+    public void receiveMessageContent(final QpidByteBuffer data)
     {
         if(_logger.isDebugEnabled())
         {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Fri Aug  7 00:28:17 2015
@@ -26,7 +26,6 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 import java.security.AccessControlException;
 import java.security.Principal;
 import java.security.PrivilegedAction;
@@ -53,7 +52,9 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.codec.AMQDecoder;
+import org.apache.qpid.codec.ServerDecoder;
 import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.framing.*;
@@ -126,13 +127,13 @@ public class AMQPConnection_0_8
     private ConnectionState _state = ConnectionState.INIT;
 
     /**
-     * The channels that the latest call to {@link #received(ByteBuffer)} applied to.
+     * The channels that the latest call to {@link ProtocolEngine#received(QpidByteBuffer)} applied to.
      * Used so we know which channels we need to call {@link AMQChannel#receivedComplete()}
      * on after handling the frames.
      */
     private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<>();
 
-    private final AMQDecoder _decoder;
+    private final ServerDecoder _decoder;
 
     private SaslServer _saslServer;
 
@@ -277,7 +278,7 @@ public class AMQPConnection_0_8
         return new WriteDeliverMethod(channelId);
     }
 
-    public void received(final ByteBuffer msg)
+    public void received(final QpidByteBuffer msg)
     {
         Subject.doAs(getSubject(), new PrivilegedAction<Void>()
         {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Fri Aug  7 00:28:17 2015
@@ -31,6 +31,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
@@ -103,9 +104,9 @@ public class MessageConverter_Internal_t
             }
 
             @Override
-            public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+            public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
             {
-                return Collections.singleton(ByteBuffer.wrap(messageContent, offsetInMessage, size));
+                return Collections.singleton(QpidByteBuffer.wrap(messageContent, offsetInMessage, size));
             }
 
             @Override

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Fri Aug  7 00:28:17 2015
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
@@ -125,7 +126,8 @@ public class ProtocolOutputConverterImpl
                 && compressionSupported
                 && contentHeaderBody.getProperties().getEncoding()==null
                 && bodySize > _connection.getMessageCompressionThreshold()
-                && (modifiedContent = GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(message.getContent(0, bodySize)))) != null)
+                && (modifiedContent = GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(message.getContent(0,
+                                                                                                                 bodySize)))) != null)
         {
             BasicContentHeaderProperties modifiedProps =
                     new BasicContentHeaderProperties(contentHeaderBody.getProperties());
@@ -163,9 +165,9 @@ public class ProtocolOutputConverterImpl
             }
 
             @Override
-            public Collection<ByteBuffer> getContent(final int offset, final int size)
+            public Collection<QpidByteBuffer> getContent(final int offset, final int size)
             {
-                return Collections.singleton(ByteBuffer.wrap(content, offset, size));
+                return Collections.singleton(QpidByteBuffer.wrap(content, offset, size));
             }
 
             @Override
@@ -247,9 +249,9 @@ public class ProtocolOutputConverterImpl
 
         public void writePayload(DataOutput buffer) throws IOException
         {
-            Collection<ByteBuffer> bufs = _message.getContent(_offset, _length);
+            Collection<QpidByteBuffer> bufs = _message.getContent(_offset, _length);
 
-            for(ByteBuffer buf : bufs)
+            for(QpidByteBuffer buf : bufs)
             {
                 if (buf.hasArray())
                 {
@@ -271,9 +273,9 @@ public class ProtocolOutputConverterImpl
         public long writePayload(final ByteBufferSender sender) throws IOException
         {
 
-            Collection<ByteBuffer> bufs = _message.getContent(_offset, _length);
+            Collection<QpidByteBuffer> bufs = _message.getContent(_offset, _length);
             long size = 0l;
-            for(ByteBuffer buf : bufs)
+            for(QpidByteBuffer buf : bufs)
             {
                 size += buf.remaining();
                 sender.send(buf.duplicate());

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Fri Aug  7 00:28:17 2015
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.QpidException;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.MessagePublishInfo;
@@ -48,7 +49,6 @@ import org.apache.qpid.server.message.In
 import org.apache.qpid.server.message.MessageContentSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
@@ -288,10 +288,16 @@ public class InternalTestProtocolSession
         {
             _sender = new ByteBufferSender()
             {
-                public void send(ByteBuffer msg)
+                private void send(ByteBuffer msg)
                 {
                 }
 
+                @Override
+                public void send(final QpidByteBuffer msg)
+                {
+
+                }
+
                 public void flush()
                 {
                 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java Fri Aug  7 00:28:17 2015
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
@@ -35,7 +36,7 @@ public class MockStoredMessage implement
 {
     private long _messageId;
     private MessageMetaData _metaData;
-    private final ByteBuffer _content;
+    private final QpidByteBuffer _content;
 
     public MockStoredMessage(long messageId)
     {
@@ -62,7 +63,7 @@ public class MockStoredMessage implement
             ( chb.getProperties()).setHeaders(headers);
         }
         _metaData = new MessageMetaData(info, chb);
-        _content = ByteBuffer.allocate(_metaData.getContentSize());
+        _content = QpidByteBuffer.allocate(_metaData.getContentSize());
     }
 
     public MessageMetaData getMetaData()
@@ -75,7 +76,7 @@ public class MockStoredMessage implement
         return _messageId;
     }
 
-    public void addContent(ByteBuffer src)
+    public void addContent(QpidByteBuffer src)
     {
         src = src.duplicate();
         _content.put(src);
@@ -90,24 +91,17 @@ public class MockStoredMessage implement
 
     public int getContent(int offset, ByteBuffer dst)
     {
-        ByteBuffer src = _content.duplicate();
-        src.position(offset);
-        src = src.slice();
-        if(dst.remaining() < src.limit())
-        {
-            src.limit(dst.remaining());
-        }
-        dst.put(src);
-        return src.limit();
+        final int length = Math.min(dst.remaining(), _content.remaining() - offset);
+        QpidByteBuffer src = _content.view(offset, length);
+        src.get(dst);
+        return length;
     }
 
 
 
-    public Collection<ByteBuffer> getContent(int offsetInMessage, int size)
+    public Collection<QpidByteBuffer> getContent(int offsetInMessage, int size)
     {
-        ByteBuffer buf = ByteBuffer.allocate(size);
-        getContent(offsetInMessage, buf);
-        buf.position(0);
+        QpidByteBuffer buf = _content.view(offsetInMessage,size);
         return Collections.singleton(buf);
     }
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/AbstractDescribedTypeWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/AbstractDescribedTypeWriter.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/AbstractDescribedTypeWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/AbstractDescribedTypeWriter.java Fri Aug  7 00:28:17 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.amqp_1_0.codec;
 
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
 public abstract class AbstractDescribedTypeWriter<V> implements ValueWriter<V>
 {
     private int _length;
@@ -45,7 +47,7 @@ public abstract class AbstractDescribedT
 
     private State _state = State.FORMAT_CODE;
 
-    public int writeToBuffer(ByteBuffer buffer)
+    public int writeToBuffer(QpidByteBuffer buffer)
     {
         final int length = _length;
 
@@ -110,7 +112,7 @@ public abstract class AbstractDescribedT
         return _length;
     }
 
-    private void writeFirstPass(ByteBuffer buffer)
+    private void writeFirstPass(QpidByteBuffer buffer)
     {
 
         int length = 1;
@@ -185,4 +187,4 @@ public abstract class AbstractDescribedT
     {
         return false;
     }
-}
\ No newline at end of file
+}

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayTypeConstructor.java Fri Aug  7 00:28:17 2015
@@ -20,9 +20,9 @@ package org.apache.qpid.amqp_1_0.codec;
 
 import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
 import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
 import java.lang.reflect.Array;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -31,7 +31,7 @@ public abstract class ArrayTypeConstruct
 
 
 
-    public Object[] construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+    public Object[] construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
     {
         int size = read(in);
         if(in.remaining() < size)
@@ -40,7 +40,7 @@ public abstract class ArrayTypeConstruct
                                          "Insufficient data to decode array - requires %d octects, only %d remaining.",
                                          size, in.remaining());
         }
-        ByteBuffer dup = in.slice();
+        QpidByteBuffer dup = in.slice();
         dup.limit(size);
         in.position(in.position()+size);
         int count = read(dup);
@@ -69,13 +69,13 @@ public abstract class ArrayTypeConstruct
     }
 
 
-    abstract int read(ByteBuffer in) throws AmqpErrorException;
+    abstract int read(QpidByteBuffer in) throws AmqpErrorException;
 
 
     private static final ArrayTypeConstructor ONE_BYTE_SIZE_ARRAY = new ArrayTypeConstructor()
     {
 
-        @Override int read(final ByteBuffer in) throws AmqpErrorException
+        @Override int read(final QpidByteBuffer in) throws AmqpErrorException
         {
             if(!in.hasRemaining())
             {
@@ -89,7 +89,7 @@ public abstract class ArrayTypeConstruct
     private static final ArrayTypeConstructor FOUR_BYTE_SIZE_ARRAY = new ArrayTypeConstructor()
     {
 
-        @Override int read(final ByteBuffer in) throws AmqpErrorException
+        @Override int read(final QpidByteBuffer in) throws AmqpErrorException
         {
             if(in.remaining()<4)
             {
@@ -110,4 +110,4 @@ public abstract class ArrayTypeConstruct
         return FOUR_BYTE_SIZE_ARRAY;
     }
 
-}
\ No newline at end of file
+}

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayWriter.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ArrayWriter.java Fri Aug  7 00:28:17 2015
@@ -19,7 +19,7 @@
 
 package org.apache.qpid.amqp_1_0.codec;
 
-import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
 public class ArrayWriter  implements ValueWriter<Object[]>
 {
@@ -60,7 +60,7 @@ public class ArrayWriter  implements Val
         //registry.register(List.class, FACTORY);
     }
 
-    public int writeToBuffer(final ByteBuffer buffer)
+    public int writeToBuffer(final QpidByteBuffer buffer)
     {
         return 0;  //TODO change body of implemented methods use File | Settings | File Templates.
     }
@@ -79,4 +79,4 @@ public class ArrayWriter  implements Val
     {
         return false;  //TODO change body of implemented methods use File | Settings | File Templates.
     }
-}
\ No newline at end of file
+}

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BinaryTypeConstructor.java Fri Aug  7 00:28:17 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.amqp_1_0.codec;
 
 import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
 import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
 import java.nio.ByteBuffer;
 
@@ -42,7 +43,7 @@ public class BinaryTypeConstructor exten
     }
 
     @Override
-    public Object construct(final ByteBuffer in, boolean isCopy, ValueHandler handler) throws AmqpErrorException
+    public Object construct(final QpidByteBuffer in, boolean isCopy, ValueHandler handler) throws AmqpErrorException
     {
         int size;
 
@@ -55,7 +56,7 @@ public class BinaryTypeConstructor exten
             size = in.getInt();
         }
 
-        ByteBuffer inDup = in.slice();
+        QpidByteBuffer inDup = in.slice();
         inDup.limit(inDup.position()+size);
 
         Binary binary;
@@ -77,4 +78,4 @@ public class BinaryTypeConstructor exten
 
     }
 
-}
\ No newline at end of file
+}

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanConstructor.java Fri Aug  7 00:28:17 2015
@@ -22,14 +22,13 @@ package org.apache.qpid.amqp_1_0.codec;
 import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
 import org.apache.qpid.amqp_1_0.type.transport.*;
 import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
 public class BooleanConstructor
 {
     private static final TypeConstructor<Boolean> TRUE_INSTANCE = new TypeConstructor<Boolean>()
     {
-        public Boolean construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+        public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
         {
             return Boolean.TRUE;
         }
@@ -37,14 +36,14 @@ public class BooleanConstructor
 
     private static final TypeConstructor<Boolean> FALSE_INSTANCE = new TypeConstructor<Boolean>()
         {
-            public Boolean construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+            public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
             {
                 return Boolean.FALSE;
             }
         };
     private static final TypeConstructor<Boolean> BYTE_INSTANCE = new TypeConstructor<Boolean>()
     {
-        public Boolean construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+        public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
         {
             if(in.hasRemaining())
             {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanWriter.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/BooleanWriter.java Fri Aug  7 00:28:17 2015
@@ -21,14 +21,14 @@
 
 package org.apache.qpid.amqp_1_0.codec;
 
-import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
 public class BooleanWriter implements ValueWriter<Boolean>
 {
     private boolean _complete = true;
     private boolean _value;
 
-    public int writeToBuffer(ByteBuffer buffer)
+    public int writeToBuffer(QpidByteBuffer buffer)
     {
         if(!_complete & buffer.hasRemaining())
         {
@@ -67,4 +67,4 @@ public class BooleanWriter implements Va
     {
         registry.register(Boolean.class, FACTORY);
     }
-}
\ No newline at end of file
+}

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteTypeConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteTypeConstructor.java Fri Aug  7 00:28:17 2015
@@ -23,8 +23,7 @@ package org.apache.qpid.amqp_1_0.codec;
 import org.apache.qpid.amqp_1_0.type.*;
 import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
 import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
 public class ByteTypeConstructor implements TypeConstructor
 {
@@ -39,7 +38,7 @@ public class ByteTypeConstructor impleme
     {
     }
 
-    public Object construct(final ByteBuffer in, ValueHandler handler) throws AmqpErrorException
+    public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
     {
         if(in.hasRemaining())
         {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteWriter.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteWriter.java Fri Aug  7 00:28:17 2015
@@ -21,14 +21,14 @@
 
 package org.apache.qpid.amqp_1_0.codec;
 
-import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
 public class ByteWriter implements ValueWriter<Byte>
 {
     private int _written = 2;
     private byte _value;
 
-    public int writeToBuffer(ByteBuffer buffer)
+    public int writeToBuffer(QpidByteBuffer buffer)
     {
 
         switch(_written)
@@ -87,4 +87,4 @@ public class ByteWriter implements Value
     {
         registry.register(Byte.class, FACTORY);
     }
-}
\ No newline at end of file
+}

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CharTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CharTypeConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CharTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CharTypeConstructor.java Fri Aug  7 00:28:17 2015
@@ -22,8 +22,7 @@ package org.apache.qpid.amqp_1_0.codec;
 
 import org.apache.qpid.amqp_1_0.type.*;
 import org.apache.qpid.amqp_1_0.type.transport.*;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
 public class CharTypeConstructor implements TypeConstructor
 {
@@ -39,7 +38,7 @@ public class CharTypeConstructor impleme
     {
     }
 
-    public Object construct(final ByteBuffer in, ValueHandler handler) throws AmqpErrorException
+    public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
     {
         if(in.remaining()>=4)
         {
@@ -64,4 +63,4 @@ public class CharTypeConstructor impleme
         }
     }
 
-}
\ No newline at end of file
+}

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundTypeConstructor.java Fri Aug  7 00:28:17 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.amqp_1_0.codec;
 import org.apache.qpid.amqp_1_0.type.*;
 import org.apache.qpid.amqp_1_0.type.transport.*;
 import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -154,7 +155,7 @@ public class CompoundTypeConstructor ext
     }
 
     @Override
-    public Object construct(final ByteBuffer in, boolean isCopy, ValueHandler delegate) throws AmqpErrorException
+    public Object construct(final QpidByteBuffer in, boolean isCopy, ValueHandler delegate) throws AmqpErrorException
     {
         int size;
         int count;
@@ -170,8 +171,7 @@ public class CompoundTypeConstructor ext
             count = in.getInt();
         }
 
-        ByteBuffer data;
-        ByteBuffer inDup = in.slice();
+        QpidByteBuffer inDup = in.slice();
 
         inDup.limit(size-getSize());
 
@@ -189,4 +189,4 @@ public class CompoundTypeConstructor ext
     }
 
 
-}
\ No newline at end of file
+}

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/CompoundWriter.java Fri Aug  7 00:28:17 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.amqp_1_0.codec;
 
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
 public abstract class CompoundWriter<V> implements ValueWriter<V>
 {
     private int _length;
@@ -51,12 +53,12 @@ public abstract class CompoundWriter<V>
 
     private State _state = State.FORMAT_CODE;
 
-    public int writeToBuffer(ByteBuffer buffer)
+    public int writeToBuffer(QpidByteBuffer buffer)
     {
         return writeToBuffer(buffer, false);
     }
 
-    public int writeToBuffer(ByteBuffer buffer, boolean large)
+    public int writeToBuffer(QpidByteBuffer buffer, boolean large)
     {
         final int length = _length;
 
@@ -209,7 +211,7 @@ public abstract class CompoundWriter<V>
         return _length;
     }
 
-    private void writeFirstPass(ByteBuffer buffer, int size)
+    private void writeFirstPass(QpidByteBuffer buffer, int size)
     {
 
         State state = State.FORMAT_CODE;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DecimalConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DecimalConstructor.java?rev=1694594&r1=1694593&r2=1694594&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DecimalConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/DecimalConstructor.java Fri Aug  7 00:28:17 2015
@@ -21,9 +21,9 @@ package org.apache.qpid.amqp_1_0.codec;
 
 import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
 import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
 import java.math.BigDecimal;
-import java.nio.ByteBuffer;
 
 public abstract class DecimalConstructor implements TypeConstructor<BigDecimal>
 {
@@ -31,7 +31,7 @@ public abstract class DecimalConstructor
     private static final DecimalConstructor DECIMAL_32 = new DecimalConstructor()
     {
 
-        public BigDecimal construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+        public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
         {
 
 
@@ -54,7 +54,7 @@ public abstract class DecimalConstructor
     private static final DecimalConstructor DECIMAL_64 = new DecimalConstructor()
     {
 
-        public BigDecimal construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+        public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
         {
             long val;
 
@@ -77,7 +77,7 @@ public abstract class DecimalConstructor
     private static final DecimalConstructor DECIMAL_128 = new DecimalConstructor()
     {
 
-        public BigDecimal construct(final ByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+        public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
         {
             long high;
             long low;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org