You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/06 14:23:00 UTC
svn commit: r1772901 [2/5] - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpi...
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Tue Dec 6 14:22:59 2016
@@ -21,15 +21,23 @@
package org.apache.qpid.server.protocol.v1_0;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.Section;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AbstractSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationPropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -41,7 +49,8 @@ public class Message_1_0 extends Abstrac
.registerMessagingLayer()
.registerTransactionLayer()
.registerSecurityLayer();
- public static final MessageMetaData_1_0 DELETED_MESSAGE_METADATA = new MessageMetaData_1_0(Collections.<Section>emptyList(), new SectionEncoderImpl(DESCRIBED_TYPE_REGISTRY));
+ public static final MessageMetaData_1_0 DELETED_MESSAGE_METADATA = new MessageMetaData_1_0(Collections.<Section>emptyList(), new SectionEncoderImpl(DESCRIBED_TYPE_REGISTRY),
+ new ArrayList<AbstractSection<?>>());
private long _arrivalTime;
@@ -110,4 +119,33 @@ public class Message_1_0 extends Abstrac
return getContent(0, (int) getSize());
}
+ public HeaderSection getHeaderSection()
+ {
+ return getMessageMetaData().getHeaderSection();
+ }
+
+ public PropertiesSection getPropertiesSection()
+ {
+ return getMessageMetaData().getPropertiesSection();
+ }
+
+ public DeliveryAnnotationsSection getDeliveryAnnotationsSection()
+ {
+ return getMessageMetaData().getDeliveryAnnotationsSection();
+ }
+
+ public MessageAnnotationsSection getMessageAnnotationsSection()
+ {
+ return getMessageMetaData().getMessageAnnotationsSection();
+ }
+
+ public ApplicationPropertiesSection getApplicationPropertiesSection()
+ {
+ return getMessageMetaData().getApplicationPropertiesSection();
+ }
+
+ public FooterSection getFooterSection()
+ {
+ return getMessageMetaData().getFooterSection();
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Tue Dec 6 14:22:59 2016
@@ -492,6 +492,7 @@ public class SendingLink_1_0 implements
xfr.setState(accepted);
xfr.setResume(Boolean.TRUE);
getEndpoint().transfer(xfr, true);
+ xfr.dispose();
}
if(_resumeAcceptedTransfers.isEmpty())
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Tue Dec 6 14:22:59 2016
@@ -70,6 +70,7 @@ import org.apache.qpid.server.model.Sess
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -376,17 +377,17 @@ public class Session_1_0 implements AMQS
}
+
try
{
- QpidByteBuffer payload = xfr.getPayload();
+ List<QpidByteBuffer> payload = xfr.getPayload();
+ final long remaining = QpidByteBufferUtils.remaining(payload);
int payloadSent = _connection.sendFrame(_sendingChannel, xfr, payload);
- if(payload != null && payloadSent < payload.remaining() && payloadSent >= 0)
+ if(payload != null && payloadSent < remaining && payloadSent >= 0)
{
- payload = payload.duplicate();
- try
- {
- payload.position(payload.position()+payloadSent);
+ // TODO - should make this iterative and not recursive
+
Transfer secondTransfer = new Transfer();
@@ -398,12 +399,17 @@ public class Session_1_0 implements AMQS
secondTransfer.setPayload(payload);
sendTransfer(secondTransfer, endpoint, false);
- }
- finally
+
+ secondTransfer.dispose();
+
+ }
+
+ if(payload != null)
+ {
+ for(QpidByteBuffer buf : payload)
{
- payload.dispose();
+ buf.dispose();
}
-
}
}
catch(OversizeFrameException e)
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java Tue Dec 6 14:22:59 2016
@@ -27,12 +27,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AbstractSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
@@ -41,12 +44,11 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class StandardReceivingLink_1_0 implements ReceivingLink_1_0
{
@@ -75,7 +77,7 @@ public class StandardReceivingLink_1_0 i
_receivingSettlementMode = receivingLinkAttachment.getEndpoint().getReceivingSettlementMode();
_durability = ((Target)receivingLinkAttachment.getTarget()).getDurable();
- _sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getDescribedTypeRegistry());
+ _sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getSectionDecoderRegistry());
}
@@ -106,11 +108,11 @@ public class StandardReceivingLink_1_0 i
return;
}
- fragments = new ArrayList<QpidByteBuffer>(_incompleteMessage.size());
+ fragments = new ArrayList<>(_incompleteMessage.size());
for(Transfer t : _incompleteMessage)
{
- fragments.add(t.getPayload().duplicate());
+ fragments.addAll(t.getPayload());
t.dispose();
}
_incompleteMessage=null;
@@ -120,7 +122,7 @@ public class StandardReceivingLink_1_0 i
{
_resumedMessage = Boolean.TRUE.equals(xfr.getResume());
_messageDeliveryTag = deliveryTag;
- fragments = Collections.singletonList(xfr.getPayload().duplicate());
+ fragments = xfr.getPayload();
xfr.dispose();
}
@@ -138,24 +140,26 @@ public class StandardReceivingLink_1_0 i
}
else
{
- System.err.println("UNEXPECTED!!");
- System.err.println("Delivery Tag: " + _messageDeliveryTag);
- System.err.println("_unsettledMap: " + _unsettledMap);
-
+ throw new ServerScopedRuntimeException("Unexpected delivery Tag: " + _messageDeliveryTag + "_unsettledMap: " + _unsettledMap);
}
}
else
{
MessageMetaData_1_0 mmd = null;
- List<QpidByteBuffer> immutableSections = new ArrayList<>(3);
+ List<AbstractSection<?>> dataSections = new ArrayList<>();
mmd = new MessageMetaData_1_0(fragments.toArray(new QpidByteBuffer[fragments.size()]),
- _sectionDecoder,
- immutableSections);
+ _sectionDecoder,
+ dataSections);
MessageHandle<MessageMetaData_1_0> handle = _addressSpace.getMessageStore().addMessage(mmd);
- for(QpidByteBuffer bareMessageBuf : immutableSections)
+ for(AbstractSection<?> dataSection : dataSections)
{
- handle.addContent(bareMessageBuf);
+ for (QpidByteBuffer buf : dataSection.getEncodedForm())
+ {
+ handle.addContent(buf);
+ buf.dispose();
+ }
+
}
final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
Message_1_0 message = new Message_1_0(storedMessage, getSession().getConnection().getReference());
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java Tue Dec 6 14:22:59 2016
@@ -28,16 +28,18 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.Section;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AbstractSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
@@ -45,7 +47,6 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -68,14 +69,13 @@ public class TxnCoordinatorReceivingLink
_namedAddressSpace = namedAddressSpace;
_session = session_1_0;
_endpoint = endpoint;
- _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry());
+ _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry().getSectionDecoderRegistry());
_openTransactions = openTransactions;
}
public void messageTransfer(Transfer xfr)
{
-
- QpidByteBuffer payload = null;
+ List<QpidByteBuffer> payload = new ArrayList<>();
final Binary deliveryTag = xfr.getDeliveryTag();
@@ -96,34 +96,32 @@ public class TxnCoordinatorReceivingLink
int size = 0;
for(Transfer t : _incompleteMessage)
{
- size += t.getPayload().limit();
- }
- payload = QpidByteBuffer.allocateDirect(size);
- for(Transfer t : _incompleteMessage)
- {
- payload.put(t.getPayload().duplicate());
+ final List<QpidByteBuffer> bufs = t.getPayload();
+ if(bufs != null)
+ {
+ size += QpidByteBufferUtils.remaining(bufs);
+ payload.addAll(bufs);
+ }
t.dispose();
}
- payload.flip();
_incompleteMessage=null;
}
else
{
- payload = xfr.getPayload().duplicate();
+ payload.addAll(xfr.getPayload());
xfr.dispose();
}
- // Only interested int the amqp-value section that holds the message to the coordinator
+ // Only interested in the amqp-value section that holds the message to the coordinator
try
{
- List<Section> sections = _sectionDecoder.parseAll(payload);
-
- for(Section section : sections)
+ List<AbstractSection<?>> sections = _sectionDecoder.parseAll(payload);
+ for(AbstractSection section : sections)
{
- if(section instanceof AmqpValue)
+ if(section instanceof AmqpValueSection)
{
- Object command = ((AmqpValue) section).getValue();
+ Object command = section.getValue();
if(command instanceof Declare)
@@ -154,6 +152,12 @@ public class TxnCoordinatorReceivingLink
_endpoint.updateDisposition(deliveryTag, new Accepted(), true);
}
+ else
+ {
+ // TODO error handling
+
+ // also should panic if we receive more than one AmqpValue, or no AmqpValue section
+ }
}
}
@@ -165,9 +169,9 @@ public class TxnCoordinatorReceivingLink
}
finally
{
- if (payload != null)
+ for(QpidByteBuffer buf : payload)
{
- payload.dispose();
+ buf.dispose();
}
}
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java (from r1772899, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java&r1=1772899&r2=1772901&rev=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java Tue Dec 6 14:22:59 2016
@@ -20,21 +20,41 @@
*/
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-public abstract class DescribedTypeConstructor<T extends Object>
+public abstract class AbstractDescribedTypeConstructor<T extends Object> implements DescribedTypeConstructor<T>
{
- public TypeConstructor<T> construct(final TypeConstructor describedConstructor) throws AmqpErrorException
+ @Override
+ public TypeConstructor<T> construct(final Object descriptor,
+ final List<QpidByteBuffer> in,
+ final int[] originalPositions, final ValueHandler valueHandler) throws AmqpErrorException
{
- return new TypeConstructor<T>()
- {
- public T construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
- {
- return DescribedTypeConstructor.this.construct(describedConstructor.construct(in, handler));
- }
- };
+
+ return new TypeConstructorFromUnderlying<>(this, valueHandler.readConstructor(in));
}
- public abstract T construct(Object underlying);
+ protected abstract T construct(Object underlying);
+
+ private static class TypeConstructorFromUnderlying<S extends Object> implements TypeConstructor<S>
+ {
+
+ private final TypeConstructor _describedConstructor;
+ private AbstractDescribedTypeConstructor<S> _describedTypeConstructor;
+
+ public TypeConstructorFromUnderlying(final AbstractDescribedTypeConstructor<S> describedTypeConstructor,
+ final TypeConstructor describedConstructor)
+ {
+ _describedConstructor = describedConstructor;
+ _describedTypeConstructor = describedTypeConstructor;
+ }
+
+ @Override
+ public S construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
+ {
+ return _describedTypeConstructor.construct(_describedConstructor.construct(in, handler));
+ }
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ArrayTypeConstructor.java Tue Dec 6 14:22:59 2016
@@ -18,52 +18,50 @@
*/
package org.apache.qpid.server.protocol.v1_0.codec;
-import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+
public abstract class ArrayTypeConstructor implements TypeConstructor<Object[]>
{
-
- public Object[] construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+ public Object[] construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
int size = read(in);
- if(in.remaining() < size)
+ long remaining = QpidByteBufferUtils.remaining(in);
+ if(remaining < (long) size)
{
throw new AmqpErrorException(AmqpError.DECODE_ERROR,
"Insufficient data to decode array - requires %d octects, only %d remaining.",
- size, in.remaining());
+ size, remaining);
}
- QpidByteBuffer dup = in.slice();
List rval;
- try
+ int count = read(in);
+ TypeConstructor t = handler.readConstructor(in);
+ rval = new ArrayList(count);
+ for(int i = 0; i < count; i++)
{
- dup.limit(size);
- in.position(in.position()+size);
- int count = read(dup);
- TypeConstructor t = handler.readConstructor(dup);
- rval = new ArrayList(count);
- for(int i = 0; i < count; i++)
- {
- rval.add(t.construct(dup, handler));
- }
- if(dup.hasRemaining())
- {
- throw new AmqpErrorException(AmqpError.DECODE_ERROR,
- "Array incorrectly encoded, %d bytes remaining after decoding %d elements",
- dup.remaining(), count);
- }
+ rval.add(t.construct(in, handler));
}
- finally
+ long unconsumedBytes = remaining - (QpidByteBufferUtils.remaining(in) + (long) size);
+
+ if(unconsumedBytes > 0)
{
- dup.dispose();
+ throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+ "Array incorrectly encoded, %d bytes remaining after decoding %d elements",
+ unconsumedBytes, count);
+ }
+ else if (unconsumedBytes < 0)
+ {
+ throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+ "Array incorrectly encoded, %d bytes beyond provided size consumed after decoding %d elements",
+ -unconsumedBytes, count);
}
if(rval.size() == 0)
{
@@ -71,14 +69,13 @@ public abstract class ArrayTypeConstruct
}
else
{
-
-
return rval.toArray((Object[])Array.newInstance(rval.get(0).getClass(), rval.size()));
}
}
abstract int read(QpidByteBuffer in) throws AmqpErrorException;
+ abstract int read(List<QpidByteBuffer> in) throws AmqpErrorException;
private static final ArrayTypeConstructor ONE_BYTE_SIZE_ARRAY = new ArrayTypeConstructor()
@@ -93,6 +90,15 @@ public abstract class ArrayTypeConstruct
return ((int)in.get()) & 0xff;
}
+ @Override
+ int read(final List<QpidByteBuffer> in) throws AmqpErrorException
+ {
+ if(!QpidByteBufferUtils.hasRemaining(in))
+ {
+ throw new AmqpErrorException(AmqpError.DECODE_ERROR, "Insufficient data to decode array");
+ }
+ return ((int)QpidByteBufferUtils.get(in)) & 0xff;
+ }
};
private static final ArrayTypeConstructor FOUR_BYTE_SIZE_ARRAY = new ArrayTypeConstructor()
@@ -106,7 +112,15 @@ public abstract class ArrayTypeConstruct
}
return in.getInt();
}
-
+ @Override
+ int read(final List<QpidByteBuffer> in) throws AmqpErrorException
+ {
+ if(!QpidByteBufferUtils.hasRemaining(in,4))
+ {
+ throw new AmqpErrorException(AmqpError.DECODE_ERROR, "Insufficient data to decode array");
+ }
+ return QpidByteBufferUtils.getInt(in);
+ }
};
public static ArrayTypeConstructor getOneByteSizeTypeConstructor()
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryString.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryString.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryString.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryString.java Tue Dec 6 14:22:59 2016
@@ -29,6 +29,12 @@ final class BinaryString
private int _size;
private int _hashCode;
+ BinaryString(final byte[] data)
+ {
+
+ this(data, 0, data.length);
+ }
+
BinaryString(final byte[] data, final int offset, final int size)
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BinaryTypeConstructor.java Tue Dec 6 14:22:59 2016
@@ -20,9 +20,11 @@
*/
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
public class BinaryTypeConstructor extends VariableWidthTypeConstructor
{
@@ -41,22 +43,30 @@ public class BinaryTypeConstructor exten
}
@Override
- public Object construct(final QpidByteBuffer in, boolean isCopy, ValueHandler handler) throws AmqpErrorException
+ public Object construct(final List in, final ValueHandler handler) throws AmqpErrorException
{
+
int size;
if(getSize() == 1)
{
- size = in.get() & 0xFF;
+ size = QpidByteBufferUtils.get(in) & 0xFF;
}
else
{
- size = in.getInt();
+ size = QpidByteBufferUtils.getInt(in);
}
- byte[] buf = new byte[size];
- in.get(buf);
- return new Binary(buf);
- }
+ if(!QpidByteBufferUtils.hasRemaining(in, size))
+ {
+ org.apache.qpid.server.protocol.v1_0.type.transport.Error error = new org.apache.qpid.server.protocol.v1_0.type.transport.Error();
+ error.setCondition(ConnectionError.FRAMING_ERROR);
+ error.setDescription("Cannot construct binary: insufficient input data");
+ throw new AmqpErrorException(error);
+ }
+ byte[] data = new byte[size];
+ QpidByteBufferUtils.get(in,data);
+ return new Binary(data);
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/BooleanConstructor.java Tue Dec 6 14:22:59 2016
@@ -19,6 +19,8 @@
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
@@ -28,7 +30,9 @@ public class BooleanConstructor
{
private static final TypeConstructor<Boolean> TRUE_INSTANCE = new TypeConstructor<Boolean>()
{
- public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+
+ @Override
+ public Boolean construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
return Boolean.TRUE;
}
@@ -36,18 +40,23 @@ public class BooleanConstructor
private static final TypeConstructor<Boolean> FALSE_INSTANCE = new TypeConstructor<Boolean>()
{
- public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+
+ @Override
+ public Boolean construct(final List<QpidByteBuffer> in, final ValueHandler handler)
+ throws AmqpErrorException
{
return Boolean.FALSE;
}
};
private static final TypeConstructor<Boolean> BYTE_INSTANCE = new TypeConstructor<Boolean>()
{
- public Boolean construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+
+ @Override
+ public Boolean construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
- if(in.hasRemaining())
+ if(QpidByteBufferUtils.hasRemaining(in))
{
- byte b = in.get();
+ byte b = QpidByteBufferUtils.get(in);
return b != (byte) 0;
}
else
@@ -58,7 +67,6 @@ public class BooleanConstructor
throw new AmqpErrorException(error);
}
}
-
};
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ByteTypeConstructor.java Tue Dec 6 14:22:59 2016
@@ -20,12 +20,14 @@
*/
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-public class ByteTypeConstructor implements TypeConstructor
+public class ByteTypeConstructor implements TypeConstructor<Byte>
{
private static final ByteTypeConstructor INSTANCE = new ByteTypeConstructor();
@@ -38,11 +40,12 @@ public class ByteTypeConstructor impleme
{
}
- public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+ @Override
+ public Byte construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
- if(in.hasRemaining())
+ if(QpidByteBufferUtils.hasRemaining(in))
{
- return in.get();
+ return QpidByteBufferUtils.get(in);
}
else
{
@@ -54,5 +57,4 @@ public class ByteTypeConstructor impleme
}
}
-
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CharTypeConstructor.java Tue Dec 6 14:22:59 2016
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.*;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-public class CharTypeConstructor implements TypeConstructor
+public class CharTypeConstructor implements TypeConstructor<String>
{
private static final CharTypeConstructor INSTANCE = new CharTypeConstructor();
@@ -38,20 +40,14 @@ public class CharTypeConstructor impleme
{
}
- public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+ @Override
+ public String construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
- if(in.remaining()>=4)
+ if(QpidByteBufferUtils.hasRemaining(in,4))
{
- int codePoint = in.getInt();
+ int codePoint = QpidByteBufferUtils.getInt(in);
char[] chars = Character.toChars(codePoint);
- if(chars.length == 1)
- {
- return chars[0];
- }
- else
- {
- return chars;
- }
+ return new String(chars);
}
else
{
@@ -62,5 +58,4 @@ public class CharTypeConstructor impleme
}
}
-
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeAssembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeAssembler.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeAssembler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeAssembler.java Tue Dec 6 14:22:59 2016
@@ -22,15 +22,15 @@ package org.apache.qpid.server.protocol.
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-public interface CompoundTypeAssembler
+public interface CompoundTypeAssembler<X>
{
- public static interface Factory
+ interface Factory<X>
{
- CompoundTypeAssembler newInstance();
+ CompoundTypeAssembler<X> newInstance();
}
void init(int count) throws AmqpErrorException;
void addItem(Object obj) throws AmqpErrorException;
- Object complete() throws AmqpErrorException;
+ X complete() throws AmqpErrorException;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java Tue Dec 6 14:22:59 2016
@@ -20,20 +20,20 @@
*/
package org.apache.qpid.server.protocol.v1_0.codec;
-import org.apache.qpid.server.protocol.v1_0.type.*;
-import org.apache.qpid.server.protocol.v1_0.type.transport.*;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-
import java.util.ArrayList;
import java.util.Formatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class CompoundTypeConstructor extends VariableWidthTypeConstructor
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+
+public class CompoundTypeConstructor<T> extends VariableWidthTypeConstructor<T>
{
- private final CompoundTypeAssembler.Factory _assemblerFactory;
+ private final CompoundTypeAssembler.Factory<T> _assemblerFactory;
public static final CompoundTypeAssembler.Factory LIST_ASSEMBLER_FACTORY =
new CompoundTypeAssembler.Factory()
@@ -47,7 +47,7 @@ public class CompoundTypeConstructor ext
- private static class ListAssembler implements CompoundTypeAssembler
+ private static class ListAssembler implements CompoundTypeAssembler<List>
{
private List _list;
@@ -61,7 +61,7 @@ public class CompoundTypeConstructor ext
_list.add(obj);
}
- public Object complete() throws AmqpErrorException
+ public List complete() throws AmqpErrorException
{
return _list;
}
@@ -77,16 +77,16 @@ public class CompoundTypeConstructor ext
public static final CompoundTypeAssembler.Factory MAP_ASSEMBLER_FACTORY =
- new CompoundTypeAssembler.Factory()
+ new CompoundTypeAssembler.Factory<Map>()
{
- public CompoundTypeAssembler newInstance()
+ public CompoundTypeAssembler<Map> newInstance()
{
return new MapAssembler();
}
};
- private static class MapAssembler implements CompoundTypeAssembler
+ private static class MapAssembler implements CompoundTypeAssembler<Map>
{
private Map _map;
private Object _lastKey;
@@ -132,56 +132,53 @@ public class CompoundTypeConstructor ext
}
- public Object complete() throws AmqpErrorException
+ public Map complete() throws AmqpErrorException
{
return _map;
}
}
- public static CompoundTypeConstructor getInstance(int i,
- CompoundTypeAssembler.Factory assemblerFactory)
+ public static <X> CompoundTypeConstructor<X> getInstance(int i,
+ CompoundTypeAssembler.Factory<X> assemblerFactory)
{
- return new CompoundTypeConstructor(i, assemblerFactory);
+ return new CompoundTypeConstructor<>(i, assemblerFactory);
}
private CompoundTypeConstructor(int size,
- final CompoundTypeAssembler.Factory assemblerFactory)
+ final CompoundTypeAssembler.Factory<T> assemblerFactory)
{
super(size);
_assemblerFactory = assemblerFactory;
}
@Override
- public Object construct(final QpidByteBuffer in, boolean isCopy, ValueHandler delegate) throws AmqpErrorException
+ public T construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
int size;
int count;
if(getSize() == 1)
{
- size = in.get() & 0xFF;
- count = in.get() & 0xFF;
+ size = QpidByteBufferUtils.get(in) & 0xFF;
+ count = QpidByteBufferUtils.get(in) & 0xFF;
}
else
{
- size = in.getInt();
- count = in.getInt();
+ size = QpidByteBufferUtils.getInt(in);
+ count = QpidByteBufferUtils.getInt(in);
}
- CompoundTypeAssembler assembler = _assemblerFactory.newInstance();
+ CompoundTypeAssembler<T> assembler = _assemblerFactory.newInstance();
assembler.init(count);
for(int i = 0; i < count; i++)
{
- assembler.addItem(delegate.parse(in));
+ assembler.addItem(handler.parse(in));
}
return assembler.complete();
-
}
-
-
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DecimalConstructor.java Tue Dec 6 14:22:59 2016
@@ -19,11 +19,12 @@
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.math.BigDecimal;
+import java.util.List;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-
-import java.math.BigDecimal;
public abstract class DecimalConstructor implements TypeConstructor<BigDecimal>
{
@@ -31,36 +32,37 @@ public abstract class DecimalConstructor
private static final DecimalConstructor DECIMAL_32 = new DecimalConstructor()
{
- public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+ @Override
+ public BigDecimal construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
-
-
int val;
- if(in.remaining()>=4)
+ if(QpidByteBufferUtils.hasRemaining(in, 4))
{
- val = in.getInt();
+ val = QpidByteBufferUtils.getInt(in);
}
else
{
throw new AmqpErrorException(ConnectionError.FRAMING_ERROR, "Cannot construct decimal32: insufficient input data");
}
- return constructFrom32(val);}
+ return constructFrom32(val);
+ }
};
private static final DecimalConstructor DECIMAL_64 = new DecimalConstructor()
{
- public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+ @Override
+ public BigDecimal construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
long val;
- if(in.remaining()>=8)
+ if(QpidByteBufferUtils.hasRemaining(in, 8))
{
- val = in.getLong();
+ val = QpidByteBufferUtils.getLong(in);
}
else
{
@@ -70,22 +72,23 @@ public abstract class DecimalConstructor
return constructFrom64(val);
}
-
};
private static final DecimalConstructor DECIMAL_128 = new DecimalConstructor()
{
- public BigDecimal construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+ @Override
+ public BigDecimal construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
+
long high;
long low;
- if(in.remaining()>=16)
+ if(QpidByteBufferUtils.hasRemaining(in, 16))
{
- high = in.getLong();
- low = in.getLong();
+ high = QpidByteBufferUtils.getLong(in);
+ low = QpidByteBufferUtils.getLong(in);
}
else
{
@@ -93,9 +96,7 @@ public abstract class DecimalConstructor
}
return constructFrom128(high, low);
-
}
-
};
private static final BigDecimal TWO_TO_THE_SIXTY_FOUR = new BigDecimal(2).pow(64);
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DefaultDescribedTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DefaultDescribedTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DefaultDescribedTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DefaultDescribedTypeConstructor.java Tue Dec 6 14:22:59 2016
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.protocol.v1_0.codec;
-public class DefaultDescribedTypeConstructor extends DescribedTypeConstructor
+public class DefaultDescribedTypeConstructor extends AbstractDescribedTypeConstructor
{
private Object _descriptor;
Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java?rev=1772901&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java Tue Dec 6 14:22:59 2016
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.codec;
+
+import java.util.List;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+
+public interface DescribedTypeConstructor<T extends Object>
+{
+ TypeConstructor<T> construct(Object descriptor,
+ List<QpidByteBuffer> in,
+ final int[] originalPositions,
+ ValueHandler valueHandler) throws AmqpErrorException;
+}
Propchange: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructor.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructorRegistry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructorRegistry.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructorRegistry.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DescribedTypeConstructorRegistry.java Tue Dec 6 14:22:59 2016
@@ -23,9 +23,9 @@ package org.apache.qpid.server.protocol.
public interface DescribedTypeConstructorRegistry
{
- public static interface Source
+ interface Source
{
- public DescribedTypeConstructorRegistry getDescribedTypeRegistry();
+ DescribedTypeConstructorRegistry getDescribedTypeRegistry();
}
void register(Object descriptor, DescribedTypeConstructor constructor);
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/DoubleTypeConstructor.java Tue Dec 6 14:22:59 2016
@@ -20,12 +20,14 @@
*/
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-public class DoubleTypeConstructor implements TypeConstructor
+public class DoubleTypeConstructor implements TypeConstructor<Double>
{
private static final DoubleTypeConstructor INSTANCE = new DoubleTypeConstructor();
@@ -39,11 +41,12 @@ public class DoubleTypeConstructor imple
{
}
- public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+ @Override
+ public Double construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
- if(in.remaining()>=8)
+ if(QpidByteBufferUtils.hasRemaining(in, 8))
{
- return in.getDouble();
+ return QpidByteBufferUtils.getDouble(in);
}
else
{
@@ -53,5 +56,4 @@ public class DoubleTypeConstructor imple
throw new AmqpErrorException(error);
}
}
-
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FloatTypeConstructor.java Tue Dec 6 14:22:59 2016
@@ -20,12 +20,14 @@
*/
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-public class FloatTypeConstructor implements TypeConstructor
+public class FloatTypeConstructor implements TypeConstructor<Float>
{
private static final FloatTypeConstructor INSTANCE = new FloatTypeConstructor();
@@ -39,11 +41,12 @@ public class FloatTypeConstructor implem
{
}
- public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+ @Override
+ public Float construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
- if(in.remaining()>=4)
+ if(QpidByteBufferUtils.hasRemaining(in, 4))
{
- return in.getFloat();
+ return QpidByteBufferUtils.getFloat(in);
}
else
{
@@ -53,5 +56,4 @@ public class FloatTypeConstructor implem
throw new AmqpErrorException(error);
}
}
-
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java Tue Dec 6 14:22:59 2016
@@ -21,8 +21,10 @@
package org.apache.qpid.server.protocol.v1_0.codec;
-import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
+import java.util.List;
+
import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
import org.apache.qpid.transport.ByteBufferSender;
public class FrameWriter
@@ -39,9 +41,9 @@ public class FrameWriter
public <T> int send(AMQFrame<T> frame)
{
- final QpidByteBuffer payload = frame.getPayload() == null ? null : frame.getPayload().duplicate();
+ final List<QpidByteBuffer> payload = frame.getPayload();
- final int payloadLength = payload == null ? 0 : payload.remaining();
+ final int payloadLength = payload == null ? 0 : (int) QpidByteBufferUtils.remaining(payload);
final T frameBody = frame.getFrameBody();
final ValueWriter<T> typeWriter = frameBody == null ? null : _registry.getValueWriter(frameBody);
@@ -73,8 +75,10 @@ public class FrameWriter
body.dispose();
if(payload != null)
{
- _sender.send(payload);
- payload.dispose();
+ for(QpidByteBuffer buf : payload)
+ {
+ _sender.send(buf);
+ }
}
return totalSize;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/IntTypeConstructor.java Tue Dec 6 14:22:59 2016
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-public class IntTypeConstructor implements TypeConstructor
+public class IntTypeConstructor implements TypeConstructor<Integer>
{
private static final IntTypeConstructor INSTANCE = new IntTypeConstructor();
@@ -38,11 +40,12 @@ public class IntTypeConstructor implemen
{
}
- public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+ @Override
+ public Integer construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
- if(in.remaining()>=4)
+ if(QpidByteBufferUtils.hasRemaining(in, 4))
{
- return in.getInt();
+ return QpidByteBufferUtils.getInt(in);
}
else
{
@@ -53,5 +56,4 @@ public class IntTypeConstructor implemen
}
}
-
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/LongTypeConstructor.java Tue Dec 6 14:22:59 2016
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-public class LongTypeConstructor implements TypeConstructor
+public class LongTypeConstructor implements TypeConstructor<Long>
{
private static final LongTypeConstructor INSTANCE = new LongTypeConstructor();
@@ -38,11 +40,12 @@ public class LongTypeConstructor impleme
{
}
- public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+ @Override
+ public Long construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
- if(in.remaining()>=8)
+ if(QpidByteBufferUtils.hasRemaining(in, 8))
{
- return in.getLong();
+ return QpidByteBufferUtils.getLong(in);
}
else
{
@@ -53,5 +56,4 @@ public class LongTypeConstructor impleme
}
}
-
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/NullTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/NullTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/NullTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/NullTypeConstructor.java Tue Dec 6 14:22:59 2016
@@ -19,6 +19,8 @@
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
@@ -30,7 +32,8 @@ class NullTypeConstructor implements Typ
{
}
- public Void construct(final QpidByteBuffer in, final ValueHandler handler) throws AmqpErrorException
+ @Override
+ public Void construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
return null;
}
Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java?rev=1772901&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java Tue Dec 6 14:22:59 2016
@@ -0,0 +1,274 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.codec;
+
+import java.nio.BufferUnderflowException;
+import java.util.List;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
+public class QpidByteBufferUtils
+{
+ public static boolean hasRemaining(List<QpidByteBuffer> in)
+ {
+ if (in.isEmpty())
+ {
+ return false;
+ }
+ for (int i = 0; i < in.size(); i++)
+ {
+ if (in.get(i).hasRemaining())
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static long remaining(List<QpidByteBuffer> in)
+ {
+ long remaining = 0L;
+ for (int i = 0; i < in.size(); i++)
+ {
+ remaining += in.get(i).remaining();
+ }
+ return remaining;
+ }
+
+ public static byte get(List<QpidByteBuffer> in)
+ {
+ for (int i = 0; i < in.size(); i++)
+ {
+ final QpidByteBuffer buffer = in.get(i);
+ if (buffer.hasRemaining())
+ {
+ return buffer.get();
+ }
+ }
+ throw new BufferUnderflowException();
+ }
+
+ public static boolean hasRemaining(final List<QpidByteBuffer> in, int len)
+ {
+ for (int i = 0; i < in.size(); i++)
+ {
+ final QpidByteBuffer buffer = in.get(i);
+ int remaining = buffer.remaining();
+ if (remaining >= len)
+ {
+ return true;
+ }
+ len -= remaining;
+ }
+
+ return false;
+ }
+
+ public static long getLong(final List<QpidByteBuffer> in)
+ {
+ boolean bytewise = false;
+ int consumed = 0;
+ long result = 0L;
+ for (int i = 0; i < in.size(); i++)
+ {
+ final QpidByteBuffer buffer = in.get(i);
+ int remaining = buffer.remaining();
+ if (bytewise)
+ {
+ while (buffer.hasRemaining() && consumed < 8)
+ {
+ result <<= 1;
+ result |= (0xFF & buffer.get());
+ consumed++;
+ }
+ if (consumed == 8)
+ {
+ return result;
+ }
+ }
+ else
+ {
+ if (remaining >= 8)
+ {
+ return buffer.getLong();
+ }
+ else if (remaining != 0)
+ {
+ bytewise = true;
+ while (buffer.hasRemaining())
+ {
+ result <<= 1;
+ result |= (0xFF & buffer.get());
+ consumed++;
+ }
+ }
+ }
+ }
+ throw new BufferUnderflowException();
+ }
+
+ public static int getInt(final List<QpidByteBuffer> in)
+ {
+ boolean bytewise = false;
+ int consumed = 0;
+ int result = 0;
+ for (int i = 0; i < in.size(); i++)
+ {
+ final QpidByteBuffer buffer = in.get(i);
+ int remaining = buffer.remaining();
+ if (bytewise)
+ {
+ while (buffer.hasRemaining() && consumed < 4)
+ {
+ result <<= 1;
+ result |= (0xFF & buffer.get());
+ consumed++;
+ }
+ if (consumed == 4)
+ {
+ return result;
+ }
+ }
+ else
+ {
+ if (remaining >= 4)
+ {
+ return buffer.getInt();
+ }
+ else if (remaining != 0)
+ {
+ bytewise = true;
+ while (buffer.hasRemaining())
+ {
+ result <<= 1;
+ result |= (0xFF & buffer.get());
+ consumed++;
+ }
+ }
+ }
+ }
+ throw new BufferUnderflowException();
+ }
+
+ public static float getFloat(final List<QpidByteBuffer> in)
+ {
+ return Float.intBitsToFloat(getInt(in));
+ }
+
+ public static double getDouble(final List<QpidByteBuffer> in)
+ {
+ return Double.longBitsToDouble(getLong(in));
+ }
+
+ public static Short getShort(final List<QpidByteBuffer> in)
+ {
+ boolean bytewise = false;
+ int consumed = 0;
+ short result = 0;
+ for (int i = 0; i < in.size(); i++)
+ {
+ final QpidByteBuffer buffer = in.get(i);
+ int remaining = buffer.remaining();
+ if (bytewise)
+ {
+ while (buffer.hasRemaining() && consumed < 2)
+ {
+ result <<= 1;
+ result |= (0xFF & buffer.get());
+ consumed++;
+ }
+ if (consumed == 2)
+ {
+ return result;
+ }
+ }
+ else
+ {
+ if (remaining >= 2)
+ {
+ return buffer.getShort();
+ }
+ else if (remaining != 0)
+ {
+ bytewise = true;
+ while (buffer.hasRemaining())
+ {
+ result <<= 1;
+ result |= (0xFF & buffer.get());
+ consumed++;
+ }
+ }
+ }
+ }
+ throw new BufferUnderflowException();
+ }
+
+ public static int get(final List<QpidByteBuffer> in, final byte[] data)
+ {
+ int copied = 0;
+ int i = 0;
+ while (copied < data.length && i < in.size())
+ {
+ QpidByteBuffer buf = in.get(i);
+ if (buf.hasRemaining())
+ {
+ int remaining = buf.remaining();
+ if (remaining >= data.length - copied)
+ {
+ buf.get(data, copied, data.length - copied);
+ return data.length;
+ }
+ else
+ {
+ buf.get(data, copied, remaining);
+ copied += remaining;
+ }
+ }
+ i++;
+ }
+ return copied;
+ }
+
+ public static void skip(final List<QpidByteBuffer> in, int length)
+ {
+ int skipped = 0;
+ int i = 0;
+ while (skipped < length && i < in.size())
+ {
+ QpidByteBuffer buf = in.get(i);
+ if (buf.hasRemaining())
+ {
+ int remaining = buf.remaining();
+ if (remaining >= length - skipped)
+ {
+ buf.position(buf.position() + length - skipped);
+ return;
+ }
+ else
+ {
+ buf.position(buf.position() + remaining);
+ skipped += remaining;
+ }
+ }
+ i++;
+ }
+ }
+}
Propchange: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/QpidByteBufferUtils.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SectionDecoderRegistry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SectionDecoderRegistry.java?rev=1772901&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SectionDecoderRegistry.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SectionDecoderRegistry.java Tue Dec 6 14:22:59 2016
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.codec;
+
+public interface SectionDecoderRegistry extends DescribedTypeConstructorRegistry
+{
+ DescribedTypeConstructorRegistry getUnderlyingRegistry();
+}
Propchange: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SectionDecoderRegistry.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ShortTypeConstructor.java Tue Dec 6 14:22:59 2016
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.*;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-public class ShortTypeConstructor implements TypeConstructor
+public class ShortTypeConstructor implements TypeConstructor<Short>
{
private static final ShortTypeConstructor INSTANCE = new ShortTypeConstructor();
@@ -38,11 +40,12 @@ public class ShortTypeConstructor implem
{
}
- public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+ @Override
+ public Short construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
- if(in.remaining()>=2)
+ if(QpidByteBufferUtils.hasRemaining(in, 2))
{
- return in.getShort();
+ return QpidByteBufferUtils.getShort(in);
}
else
{
@@ -52,6 +55,6 @@ public class ShortTypeConstructor implem
throw new AmqpErrorException(error);
}
- }
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallIntConstructor.java Tue Dec 6 14:22:59 2016
@@ -19,12 +19,14 @@
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-public class SmallIntConstructor implements TypeConstructor
+public class SmallIntConstructor implements TypeConstructor<Integer>
{
private static final SmallIntConstructor INSTANCE = new SmallIntConstructor();
@@ -38,11 +40,12 @@ public class SmallIntConstructor impleme
{
}
- public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+ @Override
+ public Integer construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
- if(in.hasRemaining())
+ if(QpidByteBufferUtils.hasRemaining(in))
{
- byte b = in.get();
+ byte b = QpidByteBufferUtils.get(in);
return (int) b;
}
else
@@ -53,5 +56,4 @@ public class SmallIntConstructor impleme
throw new AmqpErrorException(error);
}
}
-
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallLongConstructor.java Tue Dec 6 14:22:59 2016
@@ -19,12 +19,14 @@
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-public class SmallLongConstructor implements TypeConstructor
+public class SmallLongConstructor implements TypeConstructor<Long>
{
private static final SmallLongConstructor INSTANCE = new SmallLongConstructor();
@@ -38,11 +40,12 @@ public class SmallLongConstructor implem
{
}
- public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+ @Override
+ public Long construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
- if(in.hasRemaining())
+ if(QpidByteBufferUtils.hasRemaining(in))
{
- byte b = in.get();
+ byte b = QpidByteBufferUtils.get(in);
return (long) b;
}
else
@@ -53,5 +56,4 @@ public class SmallLongConstructor implem
throw new AmqpErrorException(error);
}
}
-
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallUIntConstructor.java Tue Dec 6 14:22:59 2016
@@ -18,13 +18,15 @@
*/
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-public class SmallUIntConstructor implements TypeConstructor
+public class SmallUIntConstructor implements TypeConstructor<UnsignedInteger>
{
private static final SmallUIntConstructor INSTANCE = new SmallUIntConstructor();
@@ -38,11 +40,13 @@ public class SmallUIntConstructor implem
{
}
- public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+ @Override
+ public UnsignedInteger construct(final List<QpidByteBuffer> in, final ValueHandler handler)
+ throws AmqpErrorException
{
- if(in.hasRemaining())
+ if(QpidByteBufferUtils.hasRemaining(in))
{
- byte b = in.get();
+ byte b = QpidByteBufferUtils.get(in);
return UnsignedInteger.valueOf(((int) b) & 0xff);
}
else
@@ -53,5 +57,4 @@ public class SmallUIntConstructor implem
throw new AmqpErrorException(error);
}
}
-
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/SmallULongConstructor.java Tue Dec 6 14:22:59 2016
@@ -19,13 +19,15 @@
package org.apache.qpid.server.protocol.v1_0.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-public class SmallULongConstructor implements TypeConstructor
+public class SmallULongConstructor implements TypeConstructor<UnsignedLong>
{
private static final SmallULongConstructor INSTANCE = new SmallULongConstructor();
@@ -39,11 +41,12 @@ public class SmallULongConstructor imple
{
}
- public Object construct(final QpidByteBuffer in, ValueHandler handler) throws AmqpErrorException
+ @Override
+ public UnsignedLong construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
{
- if(in.hasRemaining())
+ if(QpidByteBufferUtils.hasRemaining(in))
{
- byte b = in.get();
+ byte b = QpidByteBufferUtils.get(in);
return UnsignedLong.valueOf(((long) b) & 0xffL);
}
else
@@ -54,5 +57,4 @@ public class SmallULongConstructor imple
throw new AmqpErrorException(error);
}
}
-
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java?rev=1772901&r1=1772900&r2=1772901&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/StringTypeConstructor.java Tue Dec 6 14:22:59 2016
@@ -20,13 +20,15 @@
*/
package org.apache.qpid.server.protocol.v1_0.codec;
-import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-
import java.nio.CharBuffer;
import java.nio.charset.Charset;
+import java.util.List;
-public class StringTypeConstructor extends VariableWidthTypeConstructor
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
+
+public class StringTypeConstructor extends VariableWidthTypeConstructor<String>
{
private Charset _charSet;
@@ -43,20 +45,8 @@ public class StringTypeConstructor exten
_charSet = c;
}
- @Override
- public Object construct(final QpidByteBuffer in, boolean isCopy, ValueHandler handler) throws AmqpErrorException
+ private String constructFromSingleBuffer(final QpidByteBuffer in, final int size)
{
- int size;
-
- if(getSize() == 1)
- {
- size = in.get() & 0xFF;
- }
- else
- {
- size = in.getInt();
- }
-
int origPosition = in.position();
QpidByteBuffer dup = in.duplicate();
@@ -81,4 +71,44 @@ public class StringTypeConstructor exten
}
}
+ @Override
+ public String construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException
+ {
+ int size;
+
+ if(getSize() == 1)
+ {
+ size = QpidByteBufferUtils.get(in) & 0xFF;
+ }
+ else
+ {
+ size = QpidByteBufferUtils.getInt(in);
+ }
+
+ if(!QpidByteBufferUtils.hasRemaining(in, size))
+ {
+ org.apache.qpid.server.protocol.v1_0.type.transport.Error error = new org.apache.qpid.server.protocol.v1_0.type.transport.Error();
+ error.setCondition(ConnectionError.FRAMING_ERROR);
+ error.setDescription("Cannot construct string: insufficient input data");
+ throw new AmqpErrorException(error);
+ }
+
+ for(int i = 0; i<in.size(); i++)
+ {
+ QpidByteBuffer buf = in.get(i);
+ if(buf.hasRemaining())
+ {
+ if(buf.remaining() >= size)
+ {
+ return constructFromSingleBuffer(buf, size);
+ }
+ break;
+ }
+ }
+
+ byte[] data = new byte[size];
+ QpidByteBufferUtils.get(in, data);
+
+ return new String(data, _charSet);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org