You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/06 14:23:00 UTC

svn commit: r1772901 [2/5] - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/queue/ broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpi...

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Tue Dec  6 14:22:59 2016
@@ -21,15 +21,23 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
 import org.apache.qpid.server.protocol.v1_0.type.Section;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AbstractSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationPropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
 
@@ -41,7 +49,8 @@ public class Message_1_0 extends Abstrac
             .registerMessagingLayer()
             .registerTransactionLayer()
             .registerSecurityLayer();
-    public static final MessageMetaData_1_0 DELETED_MESSAGE_METADATA = new MessageMetaData_1_0(Collections.<Section>emptyList(), new SectionEncoderImpl(DESCRIBED_TYPE_REGISTRY));
+    public static final MessageMetaData_1_0 DELETED_MESSAGE_METADATA = new MessageMetaData_1_0(Collections.<Section>emptyList(), new SectionEncoderImpl(DESCRIBED_TYPE_REGISTRY),
+                                                                                               new ArrayList<AbstractSection<?>>());
 
     private long _arrivalTime;
 
@@ -110,4 +119,33 @@ public class Message_1_0 extends Abstrac
         return getContent(0, (int) getSize());
     }
 
+    public HeaderSection getHeaderSection()
+    {
+        return getMessageMetaData().getHeaderSection();
+    }
+
+    public PropertiesSection getPropertiesSection()
+    {
+        return getMessageMetaData().getPropertiesSection();
+    }
+
+    public DeliveryAnnotationsSection getDeliveryAnnotationsSection()
+    {
+        return getMessageMetaData().getDeliveryAnnotationsSection();
+    }
+
+    public MessageAnnotationsSection getMessageAnnotationsSection()
+    {
+        return getMessageMetaData().getMessageAnnotationsSection();
+    }
+
+    public ApplicationPropertiesSection getApplicationPropertiesSection()
+    {
+        return getMessageMetaData().getApplicationPropertiesSection();
+    }
+
+    public FooterSection getFooterSection()
+    {
+        return getMessageMetaData().getFooterSection();
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Tue Dec  6 14:22:59 2016
@@ -492,6 +492,7 @@ public class SendingLink_1_0 implements
             xfr.setState(accepted);
             xfr.setResume(Boolean.TRUE);
             getEndpoint().transfer(xfr, true);
+            xfr.dispose();
         }
         if(_resumeAcceptedTransfers.isEmpty())
         {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Tue Dec  6 14:22:59 2016
@@ -70,6 +70,7 @@ import org.apache.qpid.server.model.Sess
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.ConsumerListener;
 import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -376,17 +377,17 @@ public class Session_1_0 implements AMQS
 
         }
 
+
         try
         {
-            QpidByteBuffer payload = xfr.getPayload();
+            List<QpidByteBuffer> payload = xfr.getPayload();
+            final long remaining = QpidByteBufferUtils.remaining(payload);
             int payloadSent = _connection.sendFrame(_sendingChannel, xfr, payload);
 
-            if(payload != null && payloadSent < payload.remaining() && payloadSent >= 0)
+            if(payload != null && payloadSent < remaining && payloadSent >= 0)
             {
-                payload = payload.duplicate();
-                try
-                {
-                    payload.position(payload.position()+payloadSent);
+                // TODO - should make this iterative and not recursive
+
 
                     Transfer secondTransfer = new Transfer();
 
@@ -398,12 +399,17 @@ public class Session_1_0 implements AMQS
                     secondTransfer.setPayload(payload);
 
                     sendTransfer(secondTransfer, endpoint, false);
-                }
-                finally
+
+                    secondTransfer.dispose();
+
+            }
+
+            if(payload != null)
+            {
+                for(QpidByteBuffer buf : payload)
                 {
-                    payload.dispose();
+                    buf.dispose();
                 }
-
             }
         }
         catch(OversizeFrameException e)

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java Tue Dec  6 14:22:59 2016
@@ -27,12 +27,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AbstractSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
@@ -41,12 +44,11 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.store.MessageHandle;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public class StandardReceivingLink_1_0 implements ReceivingLink_1_0
 {
@@ -75,7 +77,7 @@ public class StandardReceivingLink_1_0 i
         _receivingSettlementMode = receivingLinkAttachment.getEndpoint().getReceivingSettlementMode();
         _durability = ((Target)receivingLinkAttachment.getTarget()).getDurable();
 
-        _sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getDescribedTypeRegistry());
+        _sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getSectionDecoderRegistry());
 
 
     }
@@ -106,11 +108,11 @@ public class StandardReceivingLink_1_0 i
                 return;
             }
 
-            fragments = new ArrayList<QpidByteBuffer>(_incompleteMessage.size());
+            fragments = new ArrayList<>(_incompleteMessage.size());
 
             for(Transfer t : _incompleteMessage)
             {
-                fragments.add(t.getPayload().duplicate());
+                fragments.addAll(t.getPayload());
                 t.dispose();
             }
             _incompleteMessage=null;
@@ -120,7 +122,7 @@ public class StandardReceivingLink_1_0 i
         {
             _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
             _messageDeliveryTag = deliveryTag;
-            fragments = Collections.singletonList(xfr.getPayload().duplicate());
+            fragments = xfr.getPayload();
             xfr.dispose();
         }
 
@@ -138,24 +140,26 @@ public class StandardReceivingLink_1_0 i
             }
             else
             {
-                System.err.println("UNEXPECTED!!");
-                System.err.println("Delivery Tag: " + _messageDeliveryTag);
-                System.err.println("_unsettledMap: " + _unsettledMap);
-
+                throw new ServerScopedRuntimeException("Unexpected delivery Tag: " + _messageDeliveryTag + "_unsettledMap: " + _unsettledMap);
             }
         }
         else
         {
             MessageMetaData_1_0 mmd = null;
-            List<QpidByteBuffer> immutableSections = new ArrayList<>(3);
+            List<AbstractSection<?>> dataSections = new ArrayList<>();
             mmd = new MessageMetaData_1_0(fragments.toArray(new QpidByteBuffer[fragments.size()]),
-                    _sectionDecoder,
-                    immutableSections);
+                                          _sectionDecoder,
+                                          dataSections);
             MessageHandle<MessageMetaData_1_0> handle = _addressSpace.getMessageStore().addMessage(mmd);
 
-            for(QpidByteBuffer bareMessageBuf : immutableSections)
+            for(AbstractSection<?> dataSection : dataSections)
             {
-                handle.addContent(bareMessageBuf);
+                for (QpidByteBuffer buf : dataSection.getEncodedForm())
+                {
+                    handle.addContent(buf);
+                    buf.dispose();
+                }
+
             }
             final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
             Message_1_0 message = new Message_1_0(storedMessage, getSession().getConnection().getReference());

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java Tue Dec  6 14:22:59 2016
@@ -28,16 +28,18 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.Section;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AbstractSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
@@ -45,7 +47,6 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 
@@ -68,14 +69,13 @@ public class TxnCoordinatorReceivingLink
         _namedAddressSpace = namedAddressSpace;
         _session = session_1_0;
         _endpoint  = endpoint;
-        _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry());
+        _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry().getSectionDecoderRegistry());
         _openTransactions = openTransactions;
     }
 
     public void messageTransfer(Transfer xfr)
     {
-
-        QpidByteBuffer payload = null;
+        List<QpidByteBuffer> payload = new ArrayList<>();
 
         final Binary deliveryTag = xfr.getDeliveryTag();
 
@@ -96,34 +96,32 @@ public class TxnCoordinatorReceivingLink
             int size = 0;
             for(Transfer t : _incompleteMessage)
             {
-                size += t.getPayload().limit();
-            }
-            payload = QpidByteBuffer.allocateDirect(size);
-            for(Transfer t : _incompleteMessage)
-            {
-                payload.put(t.getPayload().duplicate());
+                final List<QpidByteBuffer> bufs = t.getPayload();
+                if(bufs != null)
+                {
+                    size += QpidByteBufferUtils.remaining(bufs);
+                    payload.addAll(bufs);
+                }
                 t.dispose();
             }
-            payload.flip();
             _incompleteMessage=null;
 
         }
         else
         {
-            payload = xfr.getPayload().duplicate();
+            payload.addAll(xfr.getPayload());
             xfr.dispose();
         }
 
-        // Only interested int the amqp-value section that holds the message to the coordinator
+        // Only interested in the amqp-value section that holds the message to the coordinator
         try
         {
-            List<Section> sections = _sectionDecoder.parseAll(payload);
-
-            for(Section section : sections)
+            List<AbstractSection<?>> sections = _sectionDecoder.parseAll(payload);
+            for(AbstractSection section : sections)
             {
-                if(section instanceof AmqpValue)
+                if(section instanceof AmqpValueSection)
                 {
-                    Object command = ((AmqpValue) section).getValue();
+                    Object command = section.getValue();
 
 
                     if(command instanceof Declare)
@@ -154,6 +152,12 @@ public class TxnCoordinatorReceivingLink
                         _endpoint.updateDisposition(deliveryTag, new Accepted(), true);
 
                     }
+                    else
+                    {
+                        // TODO error handling
+
+                        // also should panic if we receive more than one AmqpValue, or no AmqpValue section
+                    }
                 }
             }
 
@@ -165,9 +169,9 @@ public class TxnCoordinatorReceivingLink
         }
         finally
         {
-            if (payload != null)
+            for(QpidByteBuffer buf : payload)
             {
-                payload.dispose();
+                buf.dispose();
             }
         }
 

Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java (from r1772899, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java&r1=1772899&r2=1772901&rev=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java Tue Dec  6 14:22:59 2016
@@ -20,21 +20,41 @@
  */
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
-public abstract class DescribedTypeConstructor<T extends Object>
+public abstract class AbstractDescribedTypeConstructor<T extends Object> implements DescribedTypeConstructor<T>
 {
-    public TypeConstructor<T> construct(final TypeConstructor describedConstructor) throws AmqpErrorException
+    @Override
+    public TypeConstructor<T> construct(final Object descriptor,
+                                        final List<QpidByteBuffer> in,
+                                        final int[] originalPositions, final ValueHandler valueHandler) throws AmqpErrorException
     {
-        return new TypeConstructor<T>()
-        {
-            public T construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
-            {
-                return DescribedTypeConstructor.this.construct(describedConstructor.construct(in, handler));
-            }
-        };
+
+        return new TypeConstructorFromUnderlying<>(this, valueHandler.readConstructor(in));
     }
 
-    public abstract T construct(Object underlying);
+    protected abstract T construct(Object underlying);
+
+    private static class TypeConstructorFromUnderlying<S extends Object> implements TypeConstructor<S>
+    {
+
+        private final TypeConstructor _describedConstructor;
+        private AbstractDescribedTypeConstructor<S> _describedTypeConstructor;
+
+        public TypeConstructorFromUnderlying(final AbstractDescribedTypeConstructor<S> describedTypeConstructor,
+                                             final TypeConstructor describedConstructor)
+        {
+            _describedConstructor = describedConstructor;
+            _describedTypeConstructor = describedTypeConstructor;
+        }
+
+        @Override
+        public S construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
+        {
+            return _describedTypeConstructor.construct(_describedConstructor.construct(in, handler));
+        }
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java Tue Dec  6 14:22:59 2016
@@ -18,52 +18,50 @@
  */
 package org.apache.qpid.server.protocol.v1_0.codec;
 
-import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-
 import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+
 public abstract class ArrayTypeConstructor implements TypeConstructor<Object[]>
 {
 
 
-
-    public Object[] construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+    public Object[] construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
     {
         int size = read(in);
-        if(in.remaining() < size)
+        long remaining = QpidByteBufferUtils.remaining(in);
+        if(remaining < (long) size)
         {
             throw new AmqpErrorException(AmqpError.DECODE_ERROR,
                                          "Insufficient data to decode array - requires %d octects, only %d remaining.",
-                                         size, in.remaining());
+                                         size, remaining);
         }
-        QpidByteBuffer dup = in.slice();
 
         List rval;
-        try
+        int count = read(in);
+        TypeConstructor t = handler.readConstructor(in);
+        rval = new ArrayList(count);
+        for(int i = 0; i < count; i++)
         {
-            dup.limit(size);
-            in.position(in.position()+size);
-            int count = read(dup);
-            TypeConstructor t = handler.readConstructor(dup);
-            rval = new ArrayList(count);
-            for(int i = 0; i < count; i++)
-            {
-                rval.add(t.construct(dup, handler));
-            }
-            if(dup.hasRemaining())
-            {
-                throw new AmqpErrorException(AmqpError.DECODE_ERROR,
-                                             "Array incorrectly encoded, %d bytes remaining after decoding %d elements",
-                                             dup.remaining(), count);
-            }
+            rval.add(t.construct(in, handler));
         }
-        finally
+        long unconsumedBytes = remaining - (QpidByteBufferUtils.remaining(in) + (long) size);
+
+        if(unconsumedBytes > 0)
         {
-            dup.dispose();
+            throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                         "Array incorrectly encoded, %d bytes remaining after decoding %d elements",
+                                         unconsumedBytes, count);
+        }
+        else if (unconsumedBytes < 0)
+        {
+            throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                         "Array incorrectly encoded, %d bytes beyond provided size consumed after decoding %d elements",
+                                         -unconsumedBytes, count);
         }
         if(rval.size() == 0)
         {
@@ -71,14 +69,13 @@ public abstract class ArrayTypeConstruct
         }
         else
         {
-
-
             return rval.toArray((Object[])Array.newInstance(rval.get(0).getClass(), rval.size()));
         }
     }
 
 
     abstract int read(QpidByteBuffer in) throws AmqpErrorException;
+    abstract int read(List<QpidByteBuffer> in) throws AmqpErrorException;
 
 
     private static final ArrayTypeConstructor ONE_BYTE_SIZE_ARRAY = new ArrayTypeConstructor()
@@ -93,6 +90,15 @@ public abstract class ArrayTypeConstruct
             return ((int)in.get()) & 0xff;
         }
 
+        @Override
+        int read(final List<QpidByteBuffer> in) throws AmqpErrorException
+        {
+            if(!QpidByteBufferUtils.hasRemaining(in))
+            {
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR, "Insufficient data to decode array");
+            }
+            return ((int)QpidByteBufferUtils.get(in)) & 0xff;
+        }
     };
 
     private static final ArrayTypeConstructor FOUR_BYTE_SIZE_ARRAY = new ArrayTypeConstructor()
@@ -106,7 +112,15 @@ public abstract class ArrayTypeConstruct
             }
             return in.getInt();
         }
-
+        @Override
+        int read(final List<QpidByteBuffer> in) throws AmqpErrorException
+        {
+            if(!QpidByteBufferUtils.hasRemaining(in,4))
+            {
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR, "Insufficient data to decode array");
+            }
+            return QpidByteBufferUtils.getInt(in);
+        }
     };
 
     public static ArrayTypeConstructor getOneByteSizeTypeConstructor()

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryString.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryString.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryString.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryString.java Tue Dec  6 14:22:59 2016
@@ -29,6 +29,12 @@ final class BinaryString
     private int _size;
     private int _hashCode;
 
+    BinaryString(final byte[] data)
+    {
+
+        this(data, 0, data.length);
+    }
+
     BinaryString(final byte[] data, final int offset, final int size)
     {
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java Tue Dec  6 14:22:59 2016
@@ -20,9 +20,11 @@
  */
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 
 public class BinaryTypeConstructor extends VariableWidthTypeConstructor
 {
@@ -41,22 +43,30 @@ public class BinaryTypeConstructor exten
     }
 
     @Override
-    public Object construct(final QpidByteBuffer in, boolean isCopy, ValueHandler handler) throws AmqpErrorException
+    public Object construct(final List in, final ValueHandler handler) throws AmqpErrorException
     {
+
         int size;
 
         if(getSize() == 1)
         {
-            size = in.get() & 0xFF;
+            size = QpidByteBufferUtils.get(in) & 0xFF;
         }
         else
         {
-            size = in.getInt();
+            size = QpidByteBufferUtils.getInt(in);
         }
 
-        byte[] buf = new byte[size];
-        in.get(buf);
-        return new Binary(buf);
-    }
+        if(!QpidByteBufferUtils.hasRemaining(in, size))
+        {
+            org.apache.qpid.server.protocol.v1_0.type.transport.Error error = new org.apache.qpid.server.protocol.v1_0.type.transport.Error();
+            error.setCondition(ConnectionError.FRAMING_ERROR);
+            error.setDescription("Cannot construct binary: insufficient input data");
+            throw new AmqpErrorException(error);
+        }
 
+        byte[] data = new byte[size];
+        QpidByteBufferUtils.get(in,data);
+        return new Binary(data);
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java Tue Dec  6 14:22:59 2016
@@ -19,6 +19,8 @@
 
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transport.*;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
@@ -28,7 +30,9 @@ public class BooleanConstructor
 {
     private static final TypeConstructor<Boolean> TRUE_INSTANCE = new TypeConstructor<Boolean>()
     {
-        public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+
+        @Override
+        public Boolean construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
         {
             return Boolean.TRUE;
         }
@@ -36,18 +40,23 @@ public class BooleanConstructor
 
     private static final TypeConstructor<Boolean> FALSE_INSTANCE = new TypeConstructor<Boolean>()
         {
-            public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+
+            @Override
+            public Boolean construct(final List<QpidByteBuffer> in, final ValueHandler handler)
+                    throws AmqpErrorException
             {
                 return Boolean.FALSE;
             }
         };
     private static final TypeConstructor<Boolean> BYTE_INSTANCE = new TypeConstructor<Boolean>()
     {
-        public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+
+        @Override
+        public Boolean construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
         {
-            if(in.hasRemaining())
+            if(QpidByteBufferUtils.hasRemaining(in))
             {
-                byte b = in.get();
+                byte b = QpidByteBufferUtils.get(in);
                 return b != (byte) 0;
             }
             else
@@ -58,7 +67,6 @@ public class BooleanConstructor
                 throw new AmqpErrorException(error);
             }
         }
-
     };
 
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java Tue Dec  6 14:22:59 2016
@@ -20,12 +20,14 @@
  */
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.*;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
-public class ByteTypeConstructor implements TypeConstructor
+public class ByteTypeConstructor implements TypeConstructor<Byte>
 {
     private static final ByteTypeConstructor INSTANCE = new ByteTypeConstructor();
 
@@ -38,11 +40,12 @@ public class ByteTypeConstructor impleme
     {
     }
 
-    public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+    @Override
+    public Byte construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
     {
-        if(in.hasRemaining())
+        if(QpidByteBufferUtils.hasRemaining(in))
         {
-            return in.get();
+            return QpidByteBufferUtils.get(in);
         }
         else
         {
@@ -54,5 +57,4 @@ public class ByteTypeConstructor impleme
         }
 
     }
-
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java Tue Dec  6 14:22:59 2016
@@ -20,11 +20,13 @@
  */
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.*;
 import org.apache.qpid.server.protocol.v1_0.type.transport.*;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
-public class CharTypeConstructor implements TypeConstructor
+public class CharTypeConstructor implements TypeConstructor<String>
 {
     private static final CharTypeConstructor INSTANCE = new CharTypeConstructor();
 
@@ -38,20 +40,14 @@ public class CharTypeConstructor impleme
     {
     }
 
-    public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+    @Override
+    public String construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
     {
-        if(in.remaining()>=4)
+        if(QpidByteBufferUtils.hasRemaining(in,4))
         {
-            int codePoint = in.getInt();
+            int codePoint = QpidByteBufferUtils.getInt(in);
             char[] chars = Character.toChars(codePoint);
-            if(chars.length == 1)
-            {
-                return chars[0];
-            }
-            else
-            {
-                return chars;
-            }
+            return new String(chars);
         }
         else
         {
@@ -62,5 +58,4 @@ public class CharTypeConstructor impleme
 
         }
     }
-
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeAssembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeAssembler.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeAssembler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeAssembler.java Tue Dec  6 14:22:59 2016
@@ -22,15 +22,15 @@ package org.apache.qpid.server.protocol.
 
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 
-public interface CompoundTypeAssembler
+public interface CompoundTypeAssembler<X>
 {
 
-    public static interface Factory
+    interface Factory<X>
     {
-        CompoundTypeAssembler newInstance();
+        CompoundTypeAssembler<X> newInstance();
     }
 
     void init(int count) throws AmqpErrorException;
     void addItem(Object obj) throws AmqpErrorException;
-    Object complete()  throws AmqpErrorException;
+    X complete()  throws AmqpErrorException;
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java Tue Dec  6 14:22:59 2016
@@ -20,20 +20,20 @@
  */
 package org.apache.qpid.server.protocol.v1_0.codec;
 
-import org.apache.qpid.server.protocol.v1_0.type.*;
-import org.apache.qpid.server.protocol.v1_0.type.transport.*;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-
 import java.util.ArrayList;
 import java.util.Formatter;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class CompoundTypeConstructor extends VariableWidthTypeConstructor
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+
+public class CompoundTypeConstructor<T> extends VariableWidthTypeConstructor<T>
 {
-    private final CompoundTypeAssembler.Factory _assemblerFactory;
+    private final CompoundTypeAssembler.Factory<T> _assemblerFactory;
 
     public static final CompoundTypeAssembler.Factory LIST_ASSEMBLER_FACTORY =
             new CompoundTypeAssembler.Factory()
@@ -47,7 +47,7 @@ public class CompoundTypeConstructor ext
 
 
 
-    private static class ListAssembler implements CompoundTypeAssembler
+    private static class ListAssembler implements CompoundTypeAssembler<List>
     {
         private List _list;
 
@@ -61,7 +61,7 @@ public class CompoundTypeConstructor ext
             _list.add(obj);
         }
 
-        public Object complete() throws AmqpErrorException
+        public List complete() throws AmqpErrorException
         {
             return _list;
         }
@@ -77,16 +77,16 @@ public class CompoundTypeConstructor ext
 
 
     public static final CompoundTypeAssembler.Factory MAP_ASSEMBLER_FACTORY =
-            new CompoundTypeAssembler.Factory()
+            new CompoundTypeAssembler.Factory<Map>()
             {
 
-                public CompoundTypeAssembler newInstance()
+                public CompoundTypeAssembler<Map> newInstance()
                 {
                     return new MapAssembler();
                 }
             };
 
-    private static class MapAssembler implements CompoundTypeAssembler
+    private static class MapAssembler implements CompoundTypeAssembler<Map>
     {
         private Map _map;
         private Object _lastKey;
@@ -132,56 +132,53 @@ public class CompoundTypeConstructor ext
 
         }
 
-        public Object complete() throws AmqpErrorException
+        public Map complete() throws AmqpErrorException
         {
             return _map;
         }
     }
 
 
-    public static CompoundTypeConstructor getInstance(int i,
-                                                      CompoundTypeAssembler.Factory assemblerFactory)
+    public static <X> CompoundTypeConstructor<X> getInstance(int i,
+                                                      CompoundTypeAssembler.Factory<X> assemblerFactory)
     {
-        return new CompoundTypeConstructor(i, assemblerFactory);
+        return new CompoundTypeConstructor<>(i, assemblerFactory);
     }
 
 
     private CompoundTypeConstructor(int size,
-                                    final CompoundTypeAssembler.Factory assemblerFactory)
+                                    final CompoundTypeAssembler.Factory<T> assemblerFactory)
     {
         super(size);
         _assemblerFactory = assemblerFactory;
     }
 
     @Override
-    public Object construct(final QpidByteBuffer in, boolean isCopy, ValueHandler delegate) throws AmqpErrorException
+    public T construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
     {
         int size;
         int count;
 
         if(getSize() == 1)
         {
-            size = in.get() & 0xFF;
-            count = in.get() & 0xFF;
+            size = QpidByteBufferUtils.get(in) & 0xFF;
+            count = QpidByteBufferUtils.get(in) & 0xFF;
         }
         else
         {
-            size = in.getInt();
-            count = in.getInt();
+            size = QpidByteBufferUtils.getInt(in);
+            count = QpidByteBufferUtils.getInt(in);
         }
 
-        CompoundTypeAssembler assembler = _assemblerFactory.newInstance();
+        CompoundTypeAssembler<T> assembler = _assemblerFactory.newInstance();
 
         assembler.init(count);
 
         for(int i = 0; i < count; i++)
         {
-            assembler.addItem(delegate.parse(in));
+            assembler.addItem(handler.parse(in));
         }
 
         return assembler.complete();
-
     }
-
-
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java Tue Dec  6 14:22:59 2016
@@ -19,11 +19,12 @@
 
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.math.BigDecimal;
+import java.util.List;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-
-import java.math.BigDecimal;
 
 public abstract class DecimalConstructor implements TypeConstructor<BigDecimal>
 {
@@ -31,36 +32,37 @@ public abstract class DecimalConstructor
     private static final DecimalConstructor DECIMAL_32 = new DecimalConstructor()
     {
 
-        public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+        @Override
+        public BigDecimal construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
         {
-
-
             int val;
 
-            if(in.remaining()>=4)
+            if(QpidByteBufferUtils.hasRemaining(in, 4))
             {
-                val = in.getInt();
+                val = QpidByteBufferUtils.getInt(in);
             }
             else
             {
                 throw new AmqpErrorException(ConnectionError.FRAMING_ERROR, "Cannot construct decimal32: insufficient input data");
             }
 
-            return constructFrom32(val);}
+            return constructFrom32(val);
 
+        }
     };
 
 
     private static final DecimalConstructor DECIMAL_64 = new DecimalConstructor()
     {
 
-        public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+        @Override
+        public BigDecimal construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
         {
             long val;
 
-            if(in.remaining()>=8)
+            if(QpidByteBufferUtils.hasRemaining(in, 8))
             {
-                val = in.getLong();
+                val = QpidByteBufferUtils.getLong(in);
             }
             else
             {
@@ -70,22 +72,23 @@ public abstract class DecimalConstructor
             return constructFrom64(val);
 
         }
-
     };
 
 
     private static final DecimalConstructor DECIMAL_128 = new DecimalConstructor()
     {
 
-        public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+        @Override
+        public BigDecimal construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
         {
+
             long high;
             long low;
 
-            if(in.remaining()>=16)
+            if(QpidByteBufferUtils.hasRemaining(in, 16))
             {
-                high = in.getLong();
-                low = in.getLong();
+                high = QpidByteBufferUtils.getLong(in);
+                low = QpidByteBufferUtils.getLong(in);
             }
             else
             {
@@ -93,9 +96,7 @@ public abstract class DecimalConstructor
             }
 
             return constructFrom128(high, low);
-
         }
-
     };
 
     private static final BigDecimal TWO_TO_THE_SIXTY_FOUR = new BigDecimal(2).pow(64);

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DefaultDescribedTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DefaultDescribedTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DefaultDescribedTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DefaultDescribedTypeConstructor.java Tue Dec  6 14:22:59 2016
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v1_0.codec;
 
-public class DefaultDescribedTypeConstructor extends DescribedTypeConstructor
+public class DefaultDescribedTypeConstructor extends AbstractDescribedTypeConstructor
 {
     private Object _descriptor;
 

Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java?rev=1772901&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java Tue Dec  6 14:22:59 2016
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.codec;
+
+import java.util.List;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+
+public interface DescribedTypeConstructor<T extends Object>
+{
+    TypeConstructor<T> construct(Object descriptor,
+                                 List<QpidByteBuffer> in,
+                                 final int[] originalPositions,
+                                 ValueHandler valueHandler) throws AmqpErrorException;
+}

Propchange: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructorRegistry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructorRegistry.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructorRegistry.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructorRegistry.java Tue Dec  6 14:22:59 2016
@@ -23,9 +23,9 @@ package org.apache.qpid.server.protocol.
 
 public interface DescribedTypeConstructorRegistry
 {
-    public static interface Source
+    interface Source
     {
-        public DescribedTypeConstructorRegistry getDescribedTypeRegistry();
+        DescribedTypeConstructorRegistry getDescribedTypeRegistry();
     }
 
     void register(Object descriptor, DescribedTypeConstructor constructor);

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java Tue Dec  6 14:22:59 2016
@@ -20,12 +20,14 @@
  */
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
-public class DoubleTypeConstructor implements TypeConstructor
+public class DoubleTypeConstructor implements TypeConstructor<Double>
 {
     private static final DoubleTypeConstructor INSTANCE = new DoubleTypeConstructor();
 
@@ -39,11 +41,12 @@ public class DoubleTypeConstructor imple
     {
     }
 
-    public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+    @Override
+    public Double construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
     {
-        if(in.remaining()>=8)
+        if(QpidByteBufferUtils.hasRemaining(in, 8))
         {
-            return in.getDouble();
+            return QpidByteBufferUtils.getDouble(in);
         }
         else
         {
@@ -53,5 +56,4 @@ public class DoubleTypeConstructor imple
             throw new AmqpErrorException(error);
         }
     }
-
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java Tue Dec  6 14:22:59 2016
@@ -20,12 +20,14 @@
  */
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.*;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
-public class FloatTypeConstructor implements TypeConstructor
+public class FloatTypeConstructor implements TypeConstructor<Float>
 {
     private static final FloatTypeConstructor INSTANCE = new FloatTypeConstructor();
 
@@ -39,11 +41,12 @@ public class FloatTypeConstructor implem
     {
     }
 
-    public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+    @Override
+    public Float construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
     {
-        if(in.remaining()>=4)
+        if(QpidByteBufferUtils.hasRemaining(in, 4))
         {
-            return in.getFloat();
+            return QpidByteBufferUtils.getFloat(in);
         }
         else
         {
@@ -53,5 +56,4 @@ public class FloatTypeConstructor implem
             throw new AmqpErrorException(error);
         }
     }
-
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java Tue Dec  6 14:22:59 2016
@@ -21,8 +21,10 @@
 
 package org.apache.qpid.server.protocol.v1_0.codec;
 
-import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
+import java.util.List;
+
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
 import org.apache.qpid.transport.ByteBufferSender;
 
 public class FrameWriter
@@ -39,9 +41,9 @@ public class FrameWriter
 
     public <T> int send(AMQFrame<T> frame)
     {
-        final QpidByteBuffer payload = frame.getPayload() == null ? null : frame.getPayload().duplicate();
+        final List<QpidByteBuffer> payload = frame.getPayload();
 
-        final int payloadLength = payload == null ? 0 : payload.remaining();
+        final int payloadLength = payload == null ? 0 : (int) QpidByteBufferUtils.remaining(payload);
         final T frameBody = frame.getFrameBody();
 
         final ValueWriter<T> typeWriter = frameBody == null ? null : _registry.getValueWriter(frameBody);
@@ -73,8 +75,10 @@ public class FrameWriter
         body.dispose();
         if(payload != null)
         {
-            _sender.send(payload);
-            payload.dispose();
+            for(QpidByteBuffer buf : payload)
+            {
+                _sender.send(buf);
+            }
         }
         return totalSize;
     }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java Tue Dec  6 14:22:59 2016
@@ -20,11 +20,13 @@
  */
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.*;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
-public class IntTypeConstructor implements TypeConstructor
+public class IntTypeConstructor implements TypeConstructor<Integer>
 {
     private static final IntTypeConstructor INSTANCE = new IntTypeConstructor();
 
@@ -38,11 +40,12 @@ public class IntTypeConstructor implemen
     {
     }
 
-    public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+    @Override
+    public Integer construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
     {
-        if(in.remaining()>=4)
+        if(QpidByteBufferUtils.hasRemaining(in, 4))
         {
-            return in.getInt();
+            return QpidByteBufferUtils.getInt(in);
         }
         else
         {
@@ -53,5 +56,4 @@ public class IntTypeConstructor implemen
 
         }
     }
-
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java Tue Dec  6 14:22:59 2016
@@ -20,11 +20,13 @@
  */
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.*;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
-public class LongTypeConstructor implements TypeConstructor
+public class LongTypeConstructor implements TypeConstructor<Long>
 {
     private static final LongTypeConstructor INSTANCE = new LongTypeConstructor();
 
@@ -38,11 +40,12 @@ public class LongTypeConstructor impleme
     {
     }
 
-    public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+    @Override
+    public Long construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
     {
-        if(in.remaining()>=8)
+        if(QpidByteBufferUtils.hasRemaining(in, 8))
         {
-            return in.getLong();
+            return QpidByteBufferUtils.getLong(in);
         }
         else
         {
@@ -53,5 +56,4 @@ public class LongTypeConstructor impleme
 
         }
     }
-
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/NullTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/NullTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/NullTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/NullTypeConstructor.java Tue Dec  6 14:22:59 2016
@@ -19,6 +19,8 @@
 
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
@@ -30,7 +32,8 @@ class NullTypeConstructor implements Typ
     {
     }
 
-    public Void construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+    @Override
+    public Void construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
     {
         return null;
     }

Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java?rev=1772901&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java Tue Dec  6 14:22:59 2016
@@ -0,0 +1,274 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.codec;
+
+import java.nio.BufferUnderflowException;
+import java.util.List;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
+public class QpidByteBufferUtils
+{
+    public static boolean hasRemaining(List<QpidByteBuffer> in)
+    {
+        if (in.isEmpty())
+        {
+            return false;
+        }
+        for (int i = 0; i < in.size(); i++)
+        {
+            if (in.get(i).hasRemaining())
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static long remaining(List<QpidByteBuffer> in)
+    {
+        long remaining = 0L;
+        for (int i = 0; i < in.size(); i++)
+        {
+            remaining += in.get(i).remaining();
+        }
+        return remaining;
+    }
+
+    public static byte get(List<QpidByteBuffer> in)
+    {
+        for (int i = 0; i < in.size(); i++)
+        {
+            final QpidByteBuffer buffer = in.get(i);
+            if (buffer.hasRemaining())
+            {
+                return buffer.get();
+            }
+        }
+        throw new BufferUnderflowException();
+    }
+
+    public static boolean hasRemaining(final List<QpidByteBuffer> in, int len)
+    {
+        for (int i = 0; i < in.size(); i++)
+        {
+            final QpidByteBuffer buffer = in.get(i);
+            int remaining = buffer.remaining();
+            if (remaining >= len)
+            {
+                return true;
+            }
+            len -= remaining;
+        }
+
+        return false;
+    }
+
+    public static long getLong(final List<QpidByteBuffer> in)
+    {
+        boolean bytewise = false;
+        int consumed = 0;
+        long result = 0L;
+        for (int i = 0; i < in.size(); i++)
+        {
+            final QpidByteBuffer buffer = in.get(i);
+            int remaining = buffer.remaining();
+            if (bytewise)
+            {
+                while (buffer.hasRemaining() && consumed < 8)
+                {
+                    result <<= 1;
+                    result |= (0xFF & buffer.get());
+                    consumed++;
+                }
+                if (consumed == 8)
+                {
+                    return result;
+                }
+            }
+            else
+            {
+                if (remaining >= 8)
+                {
+                    return buffer.getLong();
+                }
+                else if (remaining != 0)
+                {
+                    bytewise = true;
+                    while (buffer.hasRemaining())
+                    {
+                        result <<= 1;
+                        result |= (0xFF & buffer.get());
+                        consumed++;
+                    }
+                }
+            }
+        }
+        throw new BufferUnderflowException();
+    }
+
+    public static int getInt(final List<QpidByteBuffer> in)
+    {
+        boolean bytewise = false;
+        int consumed = 0;
+        int result = 0;
+        for (int i = 0; i < in.size(); i++)
+        {
+            final QpidByteBuffer buffer = in.get(i);
+            int remaining = buffer.remaining();
+            if (bytewise)
+            {
+                while (buffer.hasRemaining() && consumed < 4)
+                {
+                    result <<= 1;
+                    result |= (0xFF & buffer.get());
+                    consumed++;
+                }
+                if (consumed == 4)
+                {
+                    return result;
+                }
+            }
+            else
+            {
+                if (remaining >= 4)
+                {
+                    return buffer.getInt();
+                }
+                else if (remaining != 0)
+                {
+                    bytewise = true;
+                    while (buffer.hasRemaining())
+                    {
+                        result <<= 1;
+                        result |= (0xFF & buffer.get());
+                        consumed++;
+                    }
+                }
+            }
+        }
+        throw new BufferUnderflowException();
+    }
+
+    public static float getFloat(final List<QpidByteBuffer> in)
+    {
+        return Float.intBitsToFloat(getInt(in));
+    }
+
+    public static double getDouble(final List<QpidByteBuffer> in)
+    {
+        return Double.longBitsToDouble(getLong(in));
+    }
+
+    public static Short getShort(final List<QpidByteBuffer> in)
+    {
+        boolean bytewise = false;
+        int consumed = 0;
+        short result = 0;
+        for (int i = 0; i < in.size(); i++)
+        {
+            final QpidByteBuffer buffer = in.get(i);
+            int remaining = buffer.remaining();
+            if (bytewise)
+            {
+                while (buffer.hasRemaining() && consumed < 2)
+                {
+                    result <<= 1;
+                    result |= (0xFF & buffer.get());
+                    consumed++;
+                }
+                if (consumed == 2)
+                {
+                    return result;
+                }
+            }
+            else
+            {
+                if (remaining >= 2)
+                {
+                    return buffer.getShort();
+                }
+                else if (remaining != 0)
+                {
+                    bytewise = true;
+                    while (buffer.hasRemaining())
+                    {
+                        result <<= 1;
+                        result |= (0xFF & buffer.get());
+                        consumed++;
+                    }
+                }
+            }
+        }
+        throw new BufferUnderflowException();
+    }
+
+    public static int get(final List<QpidByteBuffer> in, final byte[] data)
+    {
+        int copied = 0;
+        int i = 0;
+        while (copied < data.length && i < in.size())
+        {
+            QpidByteBuffer buf = in.get(i);
+            if (buf.hasRemaining())
+            {
+                int remaining = buf.remaining();
+                if (remaining >= data.length - copied)
+                {
+                    buf.get(data, copied, data.length - copied);
+                    return data.length;
+                }
+                else
+                {
+                    buf.get(data, copied, remaining);
+                    copied += remaining;
+                }
+            }
+            i++;
+        }
+        return copied;
+    }
+
+    public static void skip(final List<QpidByteBuffer> in, int length)
+    {
+        int skipped = 0;
+        int i = 0;
+        while (skipped < length && i < in.size())
+        {
+            QpidByteBuffer buf = in.get(i);
+            if (buf.hasRemaining())
+            {
+                int remaining = buf.remaining();
+                if (remaining >= length - skipped)
+                {
+                    buf.position(buf.position() + length - skipped);
+                    return;
+                }
+                else
+                {
+                    buf.position(buf.position() + remaining);
+                    skipped += remaining;
+                }
+            }
+            i++;
+        }
+    }
+}

Propchange: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SectionDecoderRegistry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SectionDecoderRegistry.java?rev=1772901&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SectionDecoderRegistry.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SectionDecoderRegistry.java Tue Dec  6 14:22:59 2016
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.codec;
+
+public interface SectionDecoderRegistry extends DescribedTypeConstructorRegistry
+{
+    DescribedTypeConstructorRegistry getUnderlyingRegistry();
+}

Propchange: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SectionDecoderRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java Tue Dec  6 14:22:59 2016
@@ -20,11 +20,13 @@
  */
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.*;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
-public class ShortTypeConstructor implements TypeConstructor
+public class ShortTypeConstructor implements TypeConstructor<Short>
 {
     private static final ShortTypeConstructor INSTANCE = new ShortTypeConstructor();
 
@@ -38,11 +40,12 @@ public class ShortTypeConstructor implem
     {
     }
 
-    public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+    @Override
+    public Short construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
     {
-        if(in.remaining()>=2)
+        if(QpidByteBufferUtils.hasRemaining(in, 2))
         {
-            return in.getShort();
+            return QpidByteBufferUtils.getShort(in);
         }
         else
         {
@@ -52,6 +55,6 @@ public class ShortTypeConstructor implem
             throw new AmqpErrorException(error);
 
         }
-    }
 
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java Tue Dec  6 14:22:59 2016
@@ -19,12 +19,14 @@
 
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
-public class SmallIntConstructor implements TypeConstructor
+public class SmallIntConstructor implements TypeConstructor<Integer>
 {
     private static final SmallIntConstructor INSTANCE = new SmallIntConstructor();
 
@@ -38,11 +40,12 @@ public class SmallIntConstructor impleme
     {
     }
 
-    public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+    @Override
+    public Integer construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
     {
-        if(in.hasRemaining())
+        if(QpidByteBufferUtils.hasRemaining(in))
         {
-            byte b = in.get();
+            byte b = QpidByteBufferUtils.get(in);
             return (int) b;
         }
         else
@@ -53,5 +56,4 @@ public class SmallIntConstructor impleme
             throw new AmqpErrorException(error);
         }
     }
-
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java Tue Dec  6 14:22:59 2016
@@ -19,12 +19,14 @@
 
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
-public class SmallLongConstructor implements TypeConstructor
+public class SmallLongConstructor implements TypeConstructor<Long>
 {
     private static final SmallLongConstructor INSTANCE = new SmallLongConstructor();
 
@@ -38,11 +40,12 @@ public class SmallLongConstructor implem
     {
     }
 
-    public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+    @Override
+    public Long construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
     {
-        if(in.hasRemaining())
+        if(QpidByteBufferUtils.hasRemaining(in))
         {
-            byte b = in.get();
+            byte b = QpidByteBufferUtils.get(in);
             return (long) b;
         }
         else
@@ -53,5 +56,4 @@ public class SmallLongConstructor implem
             throw new AmqpErrorException(error);
         }
     }
-
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java Tue Dec  6 14:22:59 2016
@@ -18,13 +18,15 @@
  */
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
-public class SmallUIntConstructor implements TypeConstructor
+public class SmallUIntConstructor implements TypeConstructor<UnsignedInteger>
 {
     private static final SmallUIntConstructor INSTANCE = new SmallUIntConstructor();
 
@@ -38,11 +40,13 @@ public class SmallUIntConstructor implem
     {
     }
 
-    public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+    @Override
+    public UnsignedInteger construct(final List<QpidByteBuffer> in, final ValueHandler handler)
+            throws AmqpErrorException
     {
-        if(in.hasRemaining())
+        if(QpidByteBufferUtils.hasRemaining(in))
         {
-            byte b = in.get();
+            byte b = QpidByteBufferUtils.get(in);
             return UnsignedInteger.valueOf(((int) b) & 0xff);
         }
         else
@@ -53,5 +57,4 @@ public class SmallUIntConstructor implem
             throw new AmqpErrorException(error);
         }
     }
-
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java Tue Dec  6 14:22:59 2016
@@ -19,13 +19,15 @@
 
 package org.apache.qpid.server.protocol.v1_0.codec;
 
+import java.util.List;
+
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
-public class SmallULongConstructor implements TypeConstructor
+public class SmallULongConstructor implements TypeConstructor<UnsignedLong>
 {
     private static final SmallULongConstructor INSTANCE = new SmallULongConstructor();
 
@@ -39,11 +41,12 @@ public class SmallULongConstructor imple
     {
     }
 
-    public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+    @Override
+    public UnsignedLong construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
     {
-        if(in.hasRemaining())
+        if(QpidByteBufferUtils.hasRemaining(in))
         {
-            byte b = in.get();
+            byte b = QpidByteBufferUtils.get(in);
             return UnsignedLong.valueOf(((long) b) & 0xffL);
         }
         else
@@ -54,5 +57,4 @@ public class SmallULongConstructor imple
             throw new AmqpErrorException(error);
         }
     }
-
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java Tue Dec  6 14:22:59 2016
@@ -20,13 +20,15 @@
  */
 package org.apache.qpid.server.protocol.v1_0.codec;
 
-import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-
 import java.nio.CharBuffer;
 import java.nio.charset.Charset;
+import java.util.List;
 
-public class StringTypeConstructor extends VariableWidthTypeConstructor
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
+
+public class StringTypeConstructor extends VariableWidthTypeConstructor<String>
 {
     private Charset _charSet;
 
@@ -43,20 +45,8 @@ public class StringTypeConstructor exten
         _charSet = c;
     }
 
-    @Override
-    public Object construct(final QpidByteBuffer in, boolean isCopy, ValueHandler handler) throws AmqpErrorException
+    private String constructFromSingleBuffer(final QpidByteBuffer in, final int size)
     {
-        int size;
-
-        if(getSize() == 1)
-        {
-            size = in.get() & 0xFF;
-        }
-        else
-        {
-            size = in.getInt();
-        }
-
         int origPosition = in.position();
 
         QpidByteBuffer dup = in.duplicate();
@@ -81,4 +71,44 @@ public class StringTypeConstructor exten
         }
     }
 
+    @Override
+    public String construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
+    {
+        int size;
+
+        if(getSize() == 1)
+        {
+            size = QpidByteBufferUtils.get(in) & 0xFF;
+        }
+        else
+        {
+            size = QpidByteBufferUtils.getInt(in);
+        }
+
+        if(!QpidByteBufferUtils.hasRemaining(in, size))
+        {
+            org.apache.qpid.server.protocol.v1_0.type.transport.Error error = new org.apache.qpid.server.protocol.v1_0.type.transport.Error();
+            error.setCondition(ConnectionError.FRAMING_ERROR);
+            error.setDescription("Cannot construct string: insufficient input data");
+            throw new AmqpErrorException(error);
+        }
+
+        for(int i = 0; i<in.size(); i++)
+        {
+            QpidByteBuffer buf = in.get(i);
+            if(buf.hasRemaining())
+            {
+                if(buf.remaining() >= size)
+                {
+                    return constructFromSingleBuffer(buf, size);
+                }
+                break;
+            }
+        }
+
+        byte[] data = new byte[size];
+        QpidByteBufferUtils.get(in, data);
+
+        return new String(data, _charSet);
+    }
 }




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org