You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/12/01 21:11:55 UTC
[2/4] qpid-proton-j git commit: PROTON-1708 Optimizations for the
EncoderImpl and DecoderImpl
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
new file mode 100644
index 0000000..01e18e7
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathDispositionType.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec.transport;
+
+import java.util.Collection;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.Disposition;
+import org.apache.qpid.proton.amqp.transport.Role;
+import org.apache.qpid.proton.codec.AMQPType;
+import org.apache.qpid.proton.codec.FastPathDescribedTypeConstructor;
+import org.apache.qpid.proton.codec.DecodeException;
+import org.apache.qpid.proton.codec.Decoder;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.EncodingCodes;
+import org.apache.qpid.proton.codec.TypeEncoding;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+public class FastPathDispositionType implements AMQPType<Disposition>, FastPathDescribedTypeConstructor<Disposition> {
+
+ private static final Object[] DESCRIPTORS =
+ {
+ UnsignedLong.valueOf(0x0000000000000015L), Symbol.valueOf("amqp:disposition:list"),
+ };
+
+ private final DispositionType dispositionType;
+
+ public FastPathDispositionType(EncoderImpl encoder) {
+ this.dispositionType = new DispositionType(encoder);
+ }
+
+ public EncoderImpl getEncoder() {
+ return dispositionType.getEncoder();
+ }
+
+ public DecoderImpl getDecoder() {
+ return dispositionType.getDecoder();
+ }
+
+ @Override
+ public boolean encodesJavaPrimitive() {
+ return false;
+ }
+
+ @Override
+ public Class<Disposition> getTypeClass() {
+ return Disposition.class;
+ }
+
+ @Override
+ public TypeEncoding<Disposition> getEncoding(Disposition disposition) {
+ return dispositionType.getEncoding(disposition);
+ }
+
+ @Override
+ public TypeEncoding<Disposition> getCanonicalEncoding() {
+ return dispositionType.getCanonicalEncoding();
+ }
+
+ @Override
+ public Collection<? extends TypeEncoding<Disposition>> getAllEncodings() {
+ return dispositionType.getAllEncodings();
+ }
+
+ @Override
+ public Disposition readValue() {
+ DecoderImpl decoder = getDecoder();
+ byte typeCode = decoder.getByteBuffer().get();
+
+ @SuppressWarnings("unused")
+ int size = 0;
+ int count = 0;
+
+ switch (typeCode) {
+ case EncodingCodes.LIST0:
+ // TODO - Technically invalid however old decoder also allowed this.
+ break;
+ case EncodingCodes.LIST8:
+ size = ((int)decoder.getByteBuffer().get()) & 0xff;
+ count = ((int)decoder.getByteBuffer().get()) & 0xff;
+ break;
+ case EncodingCodes.LIST32:
+ size = decoder.getByteBuffer().getInt();
+ count = decoder.getByteBuffer().getInt();
+ break;
+ default:
+ throw new DecodeException("Incorrect type found in Disposition encoding: " + typeCode);
+ }
+
+ Disposition disposition = new Disposition();
+
+ for (int index = 0; index < count; ++index) {
+ switch (index) {
+ case 0:
+ disposition.setRole(Boolean.TRUE.equals(decoder.readBoolean()) ? Role.RECEIVER : Role.SENDER);
+ break;
+ case 1:
+ disposition.setFirst(decoder.readUnsignedInteger());
+ break;
+ case 2:
+ disposition.setLast(decoder.readUnsignedInteger());
+ break;
+ case 3:
+ disposition.setSettled(decoder.readBoolean(false));
+ break;
+ case 4:
+ disposition.setState((DeliveryState) decoder.readObject());
+ break;
+ case 5:
+ disposition.setBatchable(decoder.readBoolean(false));
+ break;
+ default:
+ throw new IllegalStateException("To many entries in Disposition encoding");
+ }
+ }
+
+ return disposition;
+ }
+
+ @Override
+ public void skipValue() {
+ getDecoder().readConstructor().skipValue();
+ }
+
+ @Override
+ public void write(Disposition disposition) {
+ WritableBuffer buffer = getEncoder().getBuffer();
+ int count = getElementCount(disposition);
+ byte encodingCode = deduceEncodingCode(disposition, count);
+
+ buffer.put(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
+ getEncoder().writeUnsignedLong(dispositionType.getDescriptor());
+
+ final int fieldWidth;
+
+ if (encodingCode == EncodingCodes.LIST8) {
+ fieldWidth = 1;
+ buffer.put(EncodingCodes.LIST8);
+ } else {
+ fieldWidth = 4;
+ buffer.put(EncodingCodes.LIST32);
+ }
+
+ int startIndex = buffer.position();
+
+ // Reserve space for the size and write the count of list elements.
+ if (fieldWidth == 1) {
+ buffer.put((byte) 0);
+ buffer.put((byte) count);
+ } else {
+ buffer.putInt(0);
+ buffer.putInt(count);
+ }
+
+ // Write the list elements and then compute total size written.
+ for (int i = 0; i < count; ++i) {
+ writeElement(disposition, i);
+ }
+
+ // Move back and write the size
+ int endIndex = buffer.position();
+ int writeSize = endIndex - startIndex - fieldWidth;
+
+ buffer.position(startIndex);
+ if (fieldWidth == 1) {
+ buffer.put((byte) writeSize);
+ } else {
+ buffer.putInt(writeSize);
+ }
+ buffer.position(endIndex);
+ }
+
+ private void writeElement(Disposition disposition, int index) {
+ switch (index) {
+ case 0:
+ getEncoder().writeBoolean(disposition.getRole().getValue());
+ break;
+ case 1:
+ getEncoder().writeUnsignedInteger(disposition.getFirst());
+ break;
+ case 2:
+ getEncoder().writeUnsignedInteger(disposition.getLast());
+ break;
+ case 3:
+ getEncoder().writeBoolean(disposition.getSettled());
+ break;
+ case 4:
+ getEncoder().writeObject(disposition.getState());
+ break;
+ case 5:
+ getEncoder().writeBoolean(disposition.getBatchable());
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown Disposition value index: " + index);
+ }
+ }
+
+ private int getElementCount(Disposition disposition) {
+ if (disposition.getBatchable()) {
+ return 6;
+ } else if (disposition.getState() != null) {
+ return 5;
+ } else if (disposition.getSettled()) {
+ return 4;
+ } else if (disposition.getLast() != null) {
+ return 3;
+ } else {
+ return 2;
+ }
+ }
+
+ private byte deduceEncodingCode(Disposition value, int elementCount) {
+ if (value.getState() == null) {
+ return EncodingCodes.LIST8;
+ } else if (value.getState() == Accepted.getInstance() || value.getState() == Released.getInstance()) {
+ return EncodingCodes.LIST8;
+ } else {
+ return EncodingCodes.LIST32;
+ }
+ }
+
+ public static void register(Decoder decoder, EncoderImpl encoder) {
+ FastPathDispositionType type = new FastPathDispositionType(encoder);
+ for(Object descriptor : DESCRIPTORS) {
+ decoder.register(descriptor, (FastPathDescribedTypeConstructor<?>) type);
+ }
+ encoder.register(type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
new file mode 100644
index 0000000..78abc5c
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathFlowType.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec.transport;
+
+import java.util.Collection;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.apache.qpid.proton.amqp.transport.Flow;
+import org.apache.qpid.proton.codec.AMQPType;
+import org.apache.qpid.proton.codec.FastPathDescribedTypeConstructor;
+import org.apache.qpid.proton.codec.DecodeException;
+import org.apache.qpid.proton.codec.Decoder;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.EncodingCodes;
+import org.apache.qpid.proton.codec.TypeEncoding;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+public class FastPathFlowType implements AMQPType<Flow>, FastPathDescribedTypeConstructor<Flow> {
+
+ private static final Object[] DESCRIPTORS =
+ {
+ UnsignedLong.valueOf(0x0000000000000013L), Symbol.valueOf("amqp:flow:list"),
+ };
+
+ private final FlowType flowType;
+
+ public FastPathFlowType(EncoderImpl encoder) {
+ this.flowType = new FlowType(encoder);
+ }
+
+ public EncoderImpl getEncoder() {
+ return flowType.getEncoder();
+ }
+
+ public DecoderImpl getDecoder() {
+ return flowType.getDecoder();
+ }
+
+ @Override
+ public boolean encodesJavaPrimitive() {
+ return false;
+ }
+
+ @Override
+ public Class<Flow> getTypeClass() {
+ return Flow.class;
+ }
+
+ @Override
+ public TypeEncoding<Flow> getEncoding(Flow flow) {
+ return flowType.getEncoding(flow);
+ }
+
+ @Override
+ public TypeEncoding<Flow> getCanonicalEncoding() {
+ return flowType.getCanonicalEncoding();
+ }
+
+ @Override
+ public Collection<? extends TypeEncoding<Flow>> getAllEncodings() {
+ return flowType.getAllEncodings();
+ }
+
+ @Override
+ public Flow readValue() {
+ DecoderImpl decoder = getDecoder();
+ byte typeCode = decoder.getByteBuffer().get();
+
+ @SuppressWarnings("unused")
+ int size = 0;
+ int count = 0;
+
+ switch (typeCode) {
+ case EncodingCodes.LIST0:
+ // TODO - Technically invalid however old decoder also allowed this.
+ break;
+ case EncodingCodes.LIST8:
+ size = ((int)decoder.getByteBuffer().get()) & 0xff;
+ count = ((int)decoder.getByteBuffer().get()) & 0xff;
+ break;
+ case EncodingCodes.LIST32:
+ size = decoder.getByteBuffer().getInt();
+ count = decoder.getByteBuffer().getInt();
+ break;
+ default:
+ throw new DecodeException("Incorrect type found in Flow encoding: " + typeCode);
+ }
+
+ Flow flow = new Flow();
+
+ for (int index = 0; index < count; ++index) {
+ switch (index) {
+ case 0:
+ flow.setNextIncomingId(decoder.readUnsignedInteger());
+ break;
+ case 1:
+ flow.setIncomingWindow(decoder.readUnsignedInteger());
+ break;
+ case 2:
+ flow.setNextOutgoingId(decoder.readUnsignedInteger());
+ break;
+ case 3:
+ flow.setOutgoingWindow(decoder.readUnsignedInteger());
+ break;
+ case 4:
+ flow.setHandle(decoder.readUnsignedInteger());
+ break;
+ case 5:
+ flow.setDeliveryCount(decoder.readUnsignedInteger());
+ break;
+ case 6:
+ flow.setLinkCredit(decoder.readUnsignedInteger());
+ break;
+ case 7:
+ flow.setAvailable(decoder.readUnsignedInteger());
+ break;
+ case 8:
+ flow.setDrain(decoder.readBoolean(false));
+ break;
+ case 9:
+ flow.setEcho(decoder.readBoolean(false));
+ break;
+ case 10:
+ flow.setProperties(decoder.readMap());
+ break;
+ default:
+ throw new IllegalStateException("To many entries in Flow encoding");
+ }
+ }
+
+ return flow;
+ }
+
+ @Override
+ public void skipValue() {
+ getDecoder().readConstructor().skipValue();
+ }
+
+ @Override
+ public void write(Flow flow) {
+ WritableBuffer buffer = getEncoder().getBuffer();
+ int count = getElementCount(flow);
+ byte encodingCode = deduceEncodingCode(flow, count);
+
+ buffer.put(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
+ getEncoder().writeUnsignedLong(flowType.getDescriptor());
+
+ final int fieldWidth;
+
+ if (encodingCode == EncodingCodes.LIST8) {
+ fieldWidth = 1;
+ buffer.put(EncodingCodes.LIST8);
+ } else {
+ fieldWidth = 4;
+ buffer.put(EncodingCodes.LIST32);
+ }
+
+ int startIndex = buffer.position();
+
+ // Reserve space for the size and write the count of list elements.
+ if (fieldWidth == 1) {
+ buffer.put((byte) 0);
+ buffer.put((byte) count);
+ } else {
+ buffer.putInt(0);
+ buffer.putInt(count);
+ }
+
+ // Write the list elements and then compute total size written.
+ for (int i = 0; i < count; ++i) {
+ writeElement(flow, i);
+ }
+
+ // Move back and write the size
+ int endIndex = buffer.position();
+ int writeSize = endIndex - startIndex - fieldWidth;
+
+ buffer.position(startIndex);
+ if (fieldWidth == 1) {
+ buffer.put((byte) writeSize);
+ } else {
+ buffer.putInt(writeSize);
+ }
+ buffer.position(endIndex);
+ }
+
+ private void writeElement(Flow flow, int index) {
+ switch (index) {
+ case 0:
+ getEncoder().writeUnsignedInteger(flow.getNextIncomingId());
+ break;
+ case 1:
+ getEncoder().writeUnsignedInteger(flow.getIncomingWindow());
+ break;
+ case 2:
+ getEncoder().writeUnsignedInteger(flow.getNextOutgoingId());
+ break;
+ case 3:
+ getEncoder().writeUnsignedInteger(flow.getOutgoingWindow());
+ break;
+ case 4:
+ getEncoder().writeUnsignedInteger(flow.getHandle());
+ break;
+ case 5:
+ getEncoder().writeUnsignedInteger(flow.getDeliveryCount());
+ break;
+ case 6:
+ getEncoder().writeUnsignedInteger(flow.getLinkCredit());
+ break;
+ case 7:
+ getEncoder().writeUnsignedInteger(flow.getAvailable());
+ break;
+ case 8:
+ getEncoder().writeBoolean(flow.getDrain());
+ break;
+ case 9:
+ getEncoder().writeBoolean(flow.getEcho());
+ break;
+ case 10:
+ getEncoder().writeMap(flow.getProperties());
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown Flow value index: " + index);
+ }
+ }
+
+ private int getElementCount(Flow flow) {
+ if (flow.getProperties() != null) {
+ return 11;
+ } else if (flow.getEcho()) {
+ return 10;
+ } else if (flow.getDrain()) {
+ return 9;
+ } else if (flow.getAvailable() != null) {
+ return 8;
+ } else if (flow.getLinkCredit() != null) {
+ return 7;
+ } else if (flow.getDeliveryCount() != null) {
+ return 6;
+ } else if (flow.getHandle() != null) {
+ return 5;
+ } else {
+ return 4;
+ }
+ }
+
+ private byte deduceEncodingCode(Flow value, int elementCount) {
+ if (value.getProperties() == null) {
+ return EncodingCodes.LIST8;
+ } else {
+ return EncodingCodes.LIST32;
+ }
+ }
+
+ public static void register(Decoder decoder, EncoderImpl encoder) {
+ FastPathFlowType type = new FastPathFlowType(encoder);
+ for(Object descriptor : DESCRIPTORS)
+ {
+ decoder.register(descriptor, (FastPathDescribedTypeConstructor<?>) type);
+ }
+ encoder.register(type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
new file mode 100644
index 0000000..685890a
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FastPathTransferType.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec.transport;
+
+import java.util.Collection;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.Transfer;
+import org.apache.qpid.proton.codec.AMQPType;
+import org.apache.qpid.proton.codec.FastPathDescribedTypeConstructor;
+import org.apache.qpid.proton.codec.DecodeException;
+import org.apache.qpid.proton.codec.Decoder;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.EncodingCodes;
+import org.apache.qpid.proton.codec.TypeEncoding;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+/**
+ * Fast TrasnferType encoder
+ */
+public class FastPathTransferType implements AMQPType<Transfer>, FastPathDescribedTypeConstructor<Transfer> {
+
+ private static final Object[] DESCRIPTORS =
+ {
+ UnsignedLong.valueOf(0x0000000000000014L), Symbol.valueOf("amqp:transfer:list"),
+ };
+
+ private final TransferType transferType;
+
+ public FastPathTransferType(EncoderImpl encoder) {
+ this.transferType = new TransferType(encoder);
+ }
+
+ public EncoderImpl getEncoder() {
+ return transferType.getEncoder();
+ }
+
+ public DecoderImpl getDecoder() {
+ return transferType.getDecoder();
+ }
+
+ @Override
+ public Transfer readValue() {
+ DecoderImpl decoder = getDecoder();
+ byte typeCode = decoder.getByteBuffer().get();
+
+ @SuppressWarnings("unused")
+ int size = 0;
+ int count = 0;
+
+ switch (typeCode) {
+ case EncodingCodes.LIST0:
+ // TODO - Technically invalid however old decoder also allowed this.
+ break;
+ case EncodingCodes.LIST8:
+ size = ((int)decoder.getByteBuffer().get()) & 0xff;
+ count = ((int)decoder.getByteBuffer().get()) & 0xff;
+ break;
+ case EncodingCodes.LIST32:
+ size = decoder.getByteBuffer().getInt();
+ count = decoder.getByteBuffer().getInt();
+ break;
+ default:
+ throw new DecodeException("Incorrect type found in Transfer encoding: " + typeCode);
+ }
+
+ Transfer transfer = new Transfer();
+
+ for (int index = 0; index < count; ++index) {
+ switch (index) {
+ case 0:
+ transfer.setHandle(decoder.readUnsignedInteger());
+ break;
+ case 1:
+ transfer.setDeliveryId(decoder.readUnsignedInteger());
+ break;
+ case 2:
+ transfer.setDeliveryTag(decoder.readBinary());
+ break;
+ case 3:
+ transfer.setMessageFormat(decoder.readUnsignedInteger());
+ break;
+ case 4:
+ transfer.setSettled(decoder.readBoolean());
+ break;
+ case 5:
+ transfer.setMore(decoder.readBoolean(false));
+ break;
+ case 6:
+ UnsignedByte rcvSettleMode = decoder.readUnsignedByte();
+ transfer.setRcvSettleMode(rcvSettleMode == null ? null : ReceiverSettleMode.values()[rcvSettleMode.intValue()]);
+ break;
+ case 7:
+ transfer.setState((DeliveryState) decoder.readObject());
+ break;
+ case 8:
+ transfer.setResume(decoder.readBoolean(false));
+ break;
+ case 9:
+ transfer.setAborted(decoder.readBoolean(false));
+ break;
+ case 10:
+ transfer.setBatchable(decoder.readBoolean(false));
+ break;
+ default:
+ throw new IllegalStateException("To many entries in Transfer encoding");
+ }
+ }
+
+ return transfer;
+ }
+
+ @Override
+ public void skipValue() {
+ getDecoder().readConstructor().skipValue();
+ }
+
+ @Override
+ public boolean encodesJavaPrimitive() {
+ return false;
+ }
+
+ @Override
+ public Class<Transfer> getTypeClass() {
+ return Transfer.class;
+ }
+
+ @Override
+ public TypeEncoding<Transfer> getEncoding(Transfer transfer) {
+ return transferType.getEncoding(transfer);
+ }
+
+ @Override
+ public TypeEncoding<Transfer> getCanonicalEncoding() {
+ return transferType.getCanonicalEncoding();
+ }
+
+ @Override
+ public Collection<? extends TypeEncoding<Transfer>> getAllEncodings() {
+ return transferType.getAllEncodings();
+ }
+
+ @Override
+ public void write(Transfer value) {
+ WritableBuffer buffer = getEncoder().getBuffer();
+ int count = getElementCount(value);
+ byte encodingCode = deduceEncodingCode(value, count);
+
+ buffer.put(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
+ getEncoder().writeUnsignedLong(transferType.getDescriptor());
+
+ final int fieldWidth;
+
+ if (encodingCode == EncodingCodes.LIST8) {
+ fieldWidth = 1;
+ buffer.put(EncodingCodes.LIST8);
+ } else {
+ fieldWidth = 4;
+ buffer.put(EncodingCodes.LIST32);
+ }
+
+ int startIndex = buffer.position();
+
+ // Reserve space for the size and write the count of list elements.
+ if (fieldWidth == 1) {
+ buffer.put((byte) 0);
+ buffer.put((byte) count);
+ } else {
+ buffer.putInt(0);
+ buffer.putInt(count);
+ }
+
+ // Write the list elements and then compute total size written.
+ for (int i = 0; i < count; ++i) {
+ writeElement(value, i);
+ }
+
+ // Move back and write the size
+ int endIndex = buffer.position();
+ int writeSize = endIndex - startIndex - fieldWidth;
+
+ buffer.position(startIndex);
+ if (fieldWidth == 1) {
+ buffer.put((byte) writeSize);
+ } else {
+ buffer.putInt(writeSize);
+ }
+ buffer.position(endIndex);
+ }
+
+ private void writeElement(Transfer transfer, int index) {
+ switch (index) {
+ case 0:
+ getEncoder().writeUnsignedInteger(transfer.getHandle());
+ break;
+ case 1:
+ getEncoder().writeUnsignedInteger(transfer.getDeliveryId());
+ break;
+ case 2:
+ getEncoder().writeBinary(transfer.getDeliveryTag());
+ break;
+ case 3:
+ getEncoder().writeUnsignedInteger(transfer.getMessageFormat());
+ break;
+ case 4:
+ getEncoder().writeBoolean(transfer.getSettled());
+ break;
+ case 5:
+ getEncoder().writeBoolean(transfer.getMore());
+ break;
+ case 6:
+ ReceiverSettleMode rcvSettleMode = transfer.getRcvSettleMode();
+ getEncoder().writeObject(rcvSettleMode == null ? null : rcvSettleMode.getValue());
+ break;
+ case 7:
+ getEncoder().writeObject(transfer.getState());
+ break;
+ case 8:
+ getEncoder().writeBoolean(transfer.getResume());
+ break;
+ case 9:
+ getEncoder().writeBoolean(transfer.getAborted());
+ break;
+ case 10:
+ getEncoder().writeBoolean(transfer.getBatchable());
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown Transfer value index: " + index);
+ }
+ }
+
+ private byte deduceEncodingCode(Transfer value, int elementCount) {
+ if (value.getState() != null) {
+ return EncodingCodes.LIST32;
+ } else if (value.getDeliveryTag() != null && value.getDeliveryTag().getLength() > 200) {
+ return EncodingCodes.LIST32;
+ } else {
+ return EncodingCodes.LIST8;
+ }
+ }
+
+ private int getElementCount(Transfer transfer) {
+ if (transfer.getBatchable()) {
+ return 11;
+ } else if (transfer.getAborted()) {
+ return 10;
+ } else if (transfer.getResume()) {
+ return 9;
+ } else if (transfer.getState() != null) {
+ return 8;
+ } else if (transfer.getRcvSettleMode() != null) {
+ return 7;
+ } else if (transfer.getMore()) {
+ return 6;
+ } else if (transfer.getSettled() != null) {
+ return 5;
+ } else if (transfer.getMessageFormat() != null) {
+ return 4;
+ } else if (transfer.getDeliveryTag() != null) {
+ return 3;
+ } else if (transfer.getDeliveryId() != null) {
+ return 2;
+ } else {
+ return 1;
+ }
+ }
+
+ public static void register(Decoder decoder, EncoderImpl encoder) {
+ FastPathTransferType type = new FastPathTransferType(encoder);
+ for(Object descriptor : DESCRIPTORS)
+ {
+ decoder.register(descriptor, (FastPathDescribedTypeConstructor<?>) type);
+ }
+ encoder.register(type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FlowType.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FlowType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FlowType.java
index f9e91dc..499a397 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FlowType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/FlowType.java
@@ -36,7 +36,6 @@ import org.apache.qpid.proton.codec.Decoder;
import org.apache.qpid.proton.codec.DescribedTypeConstructor;
import org.apache.qpid.proton.codec.EncoderImpl;
-
public final class FlowType extends AbstractDescribedType<Flow,List> implements DescribedTypeConstructor<Flow>
{
private static final Object[] DESCRIPTORS =
@@ -46,7 +45,7 @@ public final class FlowType extends AbstractDescribedType<Flow,List> implements
private static final UnsignedLong DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000013L);
- private FlowType(EncoderImpl encoder)
+ FlowType(EncoderImpl encoder)
{
super(encoder);
}
@@ -176,7 +175,6 @@ public final class FlowType extends AbstractDescribedType<Flow,List> implements
return Flow.class;
}
-
public static void register(Decoder decoder, EncoderImpl encoder)
{
FlowType type = new FlowType(encoder);
@@ -186,6 +184,4 @@ public final class FlowType extends AbstractDescribedType<Flow,List> implements
}
encoder.register(type);
}
-
}
-
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/TransferType.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/TransferType.java b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/TransferType.java
index 4ddbd49..6d18a07 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/TransferType.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/codec/transport/TransferType.java
@@ -39,7 +39,6 @@ import org.apache.qpid.proton.codec.Decoder;
import org.apache.qpid.proton.codec.DescribedTypeConstructor;
import org.apache.qpid.proton.codec.EncoderImpl;
-
public final class TransferType extends AbstractDescribedType<Transfer,List> implements DescribedTypeConstructor<Transfer>
{
private static final Object[] DESCRIPTORS =
@@ -49,7 +48,7 @@ public final class TransferType extends AbstractDescribedType<Transfer,List> imp
private static final UnsignedLong DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000014L);
- private TransferType(EncoderImpl encoder)
+ TransferType(EncoderImpl encoder)
{
super(encoder);
}
@@ -190,9 +189,6 @@ public final class TransferType extends AbstractDescribedType<Transfer,List> imp
return Transfer.class;
}
-
-
-
public static void register(Decoder decoder, EncoderImpl encoder)
{
TransferType type = new TransferType(encoder);
@@ -202,6 +198,4 @@ public final class TransferType extends AbstractDescribedType<Transfer,List> imp
}
encoder.register(type);
}
-
}
-
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/test/java/org/apache/qpid/proton/codec/AmqpValueTypeCodecTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/AmqpValueTypeCodecTest.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/AmqpValueTypeCodecTest.java
new file mode 100644
index 0000000..f2154e3
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/AmqpValueTypeCodecTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.junit.Test;
+
+/**
+ * Test for decoder of the AmqpValue type.
+ */
+public class AmqpValueTypeCodecTest extends CodecTestSupport {
+
+ private final int LARGE_SIZE = 1024;
+ private final int SMALL_SIZE = 32;
+
+ @Test
+ public void testDecodeAmqpValueString() throws IOException {
+ doTestDecodeAmqpValueSeries(1, new AmqpValue("test"));
+ }
+
+ @Test
+ public void testDecodeAmqpValueNull() throws IOException {
+ doTestDecodeAmqpValueSeries(1, new AmqpValue(null));
+ }
+
+ @Test
+ public void testDecodeAmqpValueUUID() throws IOException {
+ doTestDecodeAmqpValueSeries(1, new AmqpValue(UUID.randomUUID()));
+ }
+
+ @Test
+ public void testDecodeSmallSeriesOfAmqpValue() throws IOException {
+ doTestDecodeAmqpValueSeries(SMALL_SIZE, new AmqpValue("test"));
+ }
+
+ @Test
+ public void testDecodeLargeSeriesOfAmqpValue() throws IOException {
+ doTestDecodeAmqpValueSeries(LARGE_SIZE, new AmqpValue("test"));
+ }
+
+ private void doTestDecodeAmqpValueSeries(int size, AmqpValue value) throws IOException {
+
+ for (int i = 0; i < size; ++i) {
+ encoder.writeObject(value);
+ }
+
+ buffer.clear();
+
+ for (int i = 0; i < size; ++i) {
+ final Object result = decoder.readObject();
+
+ assertNotNull(result);
+ assertTrue(result instanceof AmqpValue);
+
+ AmqpValue decoded = (AmqpValue) result;
+
+ assertEquals(value.getValue(), decoded.getValue());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/test/java/org/apache/qpid/proton/codec/ApplicationPropertiesCodecTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/ApplicationPropertiesCodecTest.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/ApplicationPropertiesCodecTest.java
new file mode 100644
index 0000000..66f15bd
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/ApplicationPropertiesCodecTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.junit.Test;
+
+public class ApplicationPropertiesCodecTest extends CodecTestSupport {
+
+ private final int LARGE_SIZE = 1024;
+ private final int SMALL_SIZE = 32;
+
+ @Test
+ public void testDecodeSmallSeriesOfApplicationProperties() throws IOException {
+ doTestDecodeHeaderSeries(SMALL_SIZE);
+ }
+
+ @Test
+ public void testDecodeLargeSeriesOfApplicationProperties() throws IOException {
+ doTestDecodeHeaderSeries(LARGE_SIZE);
+ }
+
+ private void doTestDecodeHeaderSeries(int size) throws IOException {
+
+ Map<String, Object> propertiesMap = new LinkedHashMap<>();
+ ApplicationProperties properties = new ApplicationProperties(propertiesMap);
+
+ propertiesMap.put("key-1", "1");
+ propertiesMap.put("key-2", "2");
+ propertiesMap.put("key-3", "3");
+ propertiesMap.put("key-4", "4");
+ propertiesMap.put("key-5", "5");
+ propertiesMap.put("key-6", "6");
+ propertiesMap.put("key-7", "7");
+ propertiesMap.put("key-8", "8");
+
+ for (int i = 0; i < size; ++i) {
+ encoder.writeObject(properties);
+ }
+
+ buffer.clear();
+
+ for (int i = 0; i < size; ++i) {
+ final Object result = decoder.readObject();
+
+ assertNotNull(result);
+ assertTrue(result instanceof ApplicationProperties);
+
+ ApplicationProperties decoded = (ApplicationProperties) result;
+
+ assertEquals(8, decoded.getValue().size());
+ assertTrue(decoded.getValue().equals(propertiesMap));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/test/java/org/apache/qpid/proton/codec/ApplicationPropertiesTypeTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/ApplicationPropertiesTypeTest.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/ApplicationPropertiesTypeTest.java
new file mode 100644
index 0000000..6f11956
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/ApplicationPropertiesTypeTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.junit.Test;
+
+public class ApplicationPropertiesTypeTest extends CodecTestSupport {
+
+ private final int LARGE_SIZE = 1024;
+ private final int SMALL_SIZE = 32;
+
+ @Test
+ public void testDecodeSmallSeriesOfApplicationProperties() throws IOException {
+ doTestDecodeHeaderSeries(SMALL_SIZE);
+ }
+
+ @Test
+ public void testDecodeLargeSeriesOfApplicationProperties() throws IOException {
+ doTestDecodeHeaderSeries(LARGE_SIZE);
+ }
+
+ private void doTestDecodeHeaderSeries(int size) throws IOException {
+
+ Map<String, Object> propertiesMap = new LinkedHashMap<>();
+ ApplicationProperties properties = new ApplicationProperties(propertiesMap);
+
+ long currentTime = System.currentTimeMillis();
+
+ propertiesMap.put("long-1", currentTime);
+ propertiesMap.put("date-1", new Date(currentTime));
+ propertiesMap.put("long-2", currentTime + 100);
+ propertiesMap.put("date-2", new Date(currentTime + 100));
+ propertiesMap.put("key-1", "1");
+ propertiesMap.put("key-2", "2");
+ propertiesMap.put("key-3", "3");
+ propertiesMap.put("key-4", "4");
+ propertiesMap.put("key-5", "5");
+ propertiesMap.put("key-6", "6");
+ propertiesMap.put("key-7", "7");
+ propertiesMap.put("key-8", "8");
+
+ for (int i = 0; i < size; ++i) {
+ encoder.writeObject(properties);
+ }
+
+ buffer.clear();
+
+ for (int i = 0; i < size; ++i) {
+ final Object result = decoder.readObject();
+
+ assertNotNull(result);
+ assertTrue(result instanceof ApplicationProperties);
+
+ ApplicationProperties decoded = (ApplicationProperties) result;
+
+ assertEquals(properties.getValue().size(), decoded.getValue().size());
+ assertTrue(decoded.getValue().equals(propertiesMap));
+
+ assertEquals(currentTime, decoded.getValue().get("long-1"));
+ assertEquals(new Date(currentTime), decoded.getValue().get("date-1"));
+ assertEquals(currentTime + 100, decoded.getValue().get("long-2"));
+ assertEquals(new Date(currentTime + 100), decoded.getValue().get("date-2"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/test/java/org/apache/qpid/proton/codec/ArrayTypeCodecTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/ArrayTypeCodecTest.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/ArrayTypeCodecTest.java
new file mode 100644
index 0000000..2411b7e
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/ArrayTypeCodecTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+/**
+ * Test decoding of AMQP Array types
+ */
+public class ArrayTypeCodecTest extends CodecTestSupport {
+
+ private final int LARGE_ARRAY_SIZE = 2048;
+ private final int SMALL_ARRAY_SIZE = 32;
+
+ @Test
+ public void testArrayOfPrimitiveBooleanObjects() throws IOException {
+ final int size = 10;
+
+ boolean[] source = new boolean[size];
+ for (int i = 0; i < size; ++i) {
+ source[i] = i % 2 == 0;
+ }
+
+ encoder.writeArray(source);
+
+ buffer.clear();
+
+ Object result = decoder.readObject(buffer);
+ assertNotNull(result);
+ assertTrue(result.getClass().isArray());
+ assertTrue(result.getClass().getComponentType().isPrimitive());
+
+ boolean[] array = (boolean[]) result;
+ assertEquals(size, array.length);
+
+ for (int i = 0; i < size; ++i) {
+ assertEquals(source[i], array[i]);
+ }
+ }
+
+ @Test
+ public void testZeroSizedArrayOfPrimitiveBooleanObjects() throws IOException {
+ boolean[] source = new boolean[0];
+
+ encoder.writeArray(source);
+
+ buffer.clear();
+
+ Object result = decoder.readObject();
+ assertNotNull(result);
+ assertTrue(result.getClass().isArray());
+ assertTrue(result.getClass().getComponentType().isPrimitive());
+
+ boolean[] array = (boolean[]) result;
+ assertEquals(source.length, array.length);
+ }
+
+ @Test
+ public void testArrayOfBooleanObjects() throws IOException {
+ final int size = 10;
+
+ Boolean[] source = new Boolean[size];
+ for (int i = 0; i < size; ++i) {
+ source[i] = i % 2 == 0;
+ }
+
+ encoder.writeArray(source);
+
+ buffer.clear();
+
+ Object result = decoder.readObject();
+ assertNotNull(result);
+ assertTrue(result.getClass().isArray());
+ assertTrue(result.getClass().getComponentType().isPrimitive());
+
+ boolean[] array = (boolean[]) result;
+ assertEquals(size, array.length);
+
+ for (int i = 0; i < size; ++i) {
+ assertEquals(source[i], array[i]);
+ }
+ }
+
+ @Test
+ public void testZeroSizedArrayOfBooleanObjects() throws IOException {
+ Boolean[] source = new Boolean[0];
+
+ encoder.writeArray(source);
+
+ buffer.clear();
+
+ Object result = decoder.readObject();
+ assertNotNull(result);
+ assertTrue(result.getClass().isArray());
+ assertTrue(result.getClass().getComponentType().isPrimitive());
+
+ boolean[] array = (boolean[]) result;
+ assertEquals(source.length, array.length);
+ }
+
+ @Test
+ public void testDecodeSmallBooleanArray() throws IOException {
+ doTestDecodeBooleanArrayType(SMALL_ARRAY_SIZE);
+ }
+
+ @Test
+ public void testDecodeLargeBooleanArray() throws IOException {
+ doTestDecodeBooleanArrayType(LARGE_ARRAY_SIZE);
+ }
+
+ private void doTestDecodeBooleanArrayType(int size) throws IOException {
+ boolean[] source = new boolean[size];
+ for (int i = 0; i < size; ++i) {
+ source[i] = i % 2 == 0;
+ }
+
+ encoder.writeArray(source);
+
+ buffer.clear();
+
+ Object result = decoder.readObject();
+ assertNotNull(result);
+ assertTrue(result.getClass().isArray());
+ assertTrue(result.getClass().getComponentType().isPrimitive());
+
+ boolean[] array = (boolean[]) result;
+ assertEquals(size, array.length);
+
+ for (int i = 0; i < size; ++i) {
+ assertEquals(source[i], array[i]);
+ }
+ }
+
+ @Test
+ public void testDecodeSmallSymbolArray() throws IOException {
+ doTestDecodeSymbolArrayType(SMALL_ARRAY_SIZE);
+ }
+
+ @Test
+ public void testDecodeLargeSymbolArray() throws IOException {
+ doTestDecodeSymbolArrayType(LARGE_ARRAY_SIZE);
+ }
+
+ private void doTestDecodeSymbolArrayType(int size) throws IOException {
+ Symbol[] source = new Symbol[size];
+ for (int i = 0; i < size; ++i) {
+ source[i] = Symbol.valueOf("test->" + i);
+ }
+
+ encoder.writeArray(source);
+
+ buffer.clear();
+
+ Object result = decoder.readObject();
+ assertNotNull(result);
+ assertTrue(result.getClass().isArray());
+
+ Symbol[] array = (Symbol[]) result;
+ assertEquals(size, array.length);
+
+ for (int i = 0; i < size; ++i) {
+ assertEquals(source[i], array[i]);
+ }
+ }
+
+ @Test
+ public void testDecodeSmallUUIDArray() throws IOException {
+ doTestDecodeUUDIArrayType(SMALL_ARRAY_SIZE);
+ }
+
+ @Test
+ public void testDecodeLargeUUDIArray() throws IOException {
+ doTestDecodeUUDIArrayType(LARGE_ARRAY_SIZE);
+ }
+
+ private void doTestDecodeUUDIArrayType(int size) throws IOException {
+ UUID[] source = new UUID[size];
+ for (int i = 0; i < size; ++i) {
+ source[i] = UUID.randomUUID();
+ }
+
+ encoder.writeArray(source);
+
+ buffer.clear();
+
+ Object result = decoder.readObject();
+ assertNotNull(result);
+ assertTrue(result.getClass().isArray());
+
+ UUID[] array = (UUID[]) result;
+ assertEquals(size, array.length);
+
+ for (int i = 0; i < size; ++i) {
+ assertEquals(source[i], array[i]);
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testArrayOfListsOfUUIDs() throws IOException {
+ ArrayList<UUID>[] source = new ArrayList[2];
+ for (int i = 0; i < source.length; ++i) {
+ source[i] = new ArrayList<UUID>(3);
+ source[i].add(UUID.randomUUID());
+ source[i].add(UUID.randomUUID());
+ source[i].add(UUID.randomUUID());
+ }
+
+ encoder.writeArray(source);
+
+ buffer.clear();
+
+ Object result = decoder.readObject();
+ assertNotNull(result);
+ assertTrue(result.getClass().isArray());
+
+ List[] list = (List[]) result;
+ assertEquals(source.length, list.length);
+
+ for (int i = 0; i < list.length; ++i) {
+ assertEquals(source[i], list[i]);
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testArrayOfMApsOfStringToUUIDs() throws IOException {
+ Map<String, UUID>[] source = new LinkedHashMap[2];
+ for (int i = 0; i < source.length; ++i) {
+ source[i] = new LinkedHashMap<String, UUID>();
+ source[i].put("1", UUID.randomUUID());
+ source[i].put("2", UUID.randomUUID());
+ source[i].put("3", UUID.randomUUID());
+ }
+
+ encoder.writeArray(source);
+
+ buffer.clear();
+
+ Object result = decoder.readObject();
+ assertNotNull(result);
+ assertTrue(result.getClass().isArray());
+
+ Map[] map = (Map[]) result;
+ assertEquals(source.length, map.length);
+
+ for (int i = 0; i < map.length; ++i) {
+ assertEquals(source[i], map[i]);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/test/java/org/apache/qpid/proton/codec/Benchmark.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/Benchmark.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/Benchmark.java
new file mode 100644
index 0000000..529a808
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/Benchmark.java
@@ -0,0 +1,420 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.codec;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.transport.Disposition;
+import org.apache.qpid.proton.amqp.transport.Flow;
+import org.apache.qpid.proton.amqp.transport.Role;
+import org.apache.qpid.proton.amqp.transport.Transfer;
+
+public class Benchmark implements Runnable {
+
+ private static final int ITERATIONS = 10 * 1024 * 1024;
+
+ private ByteBuffer byteBuf = ByteBuffer.allocate(8192);
+ private BenchmarkResult resultSet = new BenchmarkResult();
+ private boolean warming = true;
+
+ private final DecoderImpl decoder = new DecoderImpl();
+ private final EncoderImpl encoder = new EncoderImpl(decoder);
+
+ public static final void main(String[] args) throws IOException, InterruptedException {
+ System.out.println("Current PID: " + ManagementFactory.getRuntimeMXBean().getName());
+ Benchmark benchmark = new Benchmark();
+ benchmark.run();
+ }
+
+ @Override
+ public void run() {
+ AMQPDefinedTypes.registerAllTypes(decoder, encoder);
+
+ encoder.setByteBuffer(byteBuf);
+ decoder.setByteBuffer(byteBuf);
+
+ try {
+ doBenchmarks();
+ warming = false;
+ doBenchmarks();
+ } catch (IOException e) {
+ System.out.println("Unexpected error: " + e.getMessage());
+ }
+ }
+
+ private void time(String message, BenchmarkResult resultSet) {
+ if (!warming) {
+ System.out.println("Benchamrk of type: " + message + ": ");
+ System.out.println(" Encode time = " + resultSet.getEncodeTimeMills());
+ System.out.println(" Decode time = " + resultSet.getDecodeTimeMills());
+ }
+ }
+
+ private final void doBenchmarks() throws IOException {
+ benchmarkListOfInts();
+ benchmarkUUIDs();
+ benchmarkHeader();
+ benchmarkProperties();
+ benchmarkMessageAnnotations();
+ benchmarkApplicationProperties();
+ benchmarkSymbols();
+ benchmarkTransfer();
+ benchmarkFlow();
+ benchmarkDisposition();
+ benchmarkString();
+ benchmarkData();
+ warming = false;
+ }
+
+ private void benchmarkListOfInts() throws IOException {
+ ArrayList<Object> list = new ArrayList<>(10);
+ for (int j = 0; j < 10; j++) {
+ list.add(0);
+ }
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.clear();
+ encoder.writeList(list);
+ }
+ resultSet.encodesComplete();
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.flip();
+ decoder.readList();
+ }
+ resultSet.decodesComplete();
+
+ time("List<Integer>", resultSet);
+ }
+
+ private void benchmarkUUIDs() throws IOException {
+ UUID uuid = UUID.randomUUID();
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.clear();
+ encoder.writeUUID(uuid);
+ }
+ resultSet.encodesComplete();
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.flip();
+ decoder.readUUID();
+ }
+ resultSet.decodesComplete();
+
+ time("UUID", resultSet);
+ }
+
+ private void benchmarkHeader() throws IOException {
+ Header header = new Header();
+ header.setDurable(true);
+ header.setFirstAcquirer(true);
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.clear();
+ encoder.writeObject(header);
+ }
+ resultSet.encodesComplete();
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.flip();
+ decoder.readObject();
+ }
+ resultSet.decodesComplete();
+
+ time("Header", resultSet);
+ }
+
+ private void benchmarkTransfer() throws IOException {
+ Transfer transfer = new Transfer();
+ transfer.setDeliveryTag(new Binary(new byte[] {1, 2, 3}));
+ transfer.setHandle(UnsignedInteger.valueOf(10));
+ transfer.setMessageFormat(UnsignedInteger.ZERO);
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.clear();
+ encoder.writeObject(transfer);
+ }
+ resultSet.encodesComplete();
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.flip();
+ decoder.readObject();
+ }
+ resultSet.decodesComplete();
+
+ time("Transfer", resultSet);
+ }
+
+ private void benchmarkFlow() throws IOException {
+ Flow flow = new Flow();
+ flow.setNextIncomingId(UnsignedInteger.valueOf(1));
+ flow.setIncomingWindow(UnsignedInteger.valueOf(2047));
+ flow.setNextOutgoingId(UnsignedInteger.valueOf(1));
+ flow.setOutgoingWindow(UnsignedInteger.MAX_VALUE);
+ flow.setHandle(UnsignedInteger.ZERO);
+ flow.setDeliveryCount(UnsignedInteger.valueOf(10));
+ flow.setLinkCredit(UnsignedInteger.valueOf(1000));
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.clear();
+ encoder.writeObject(flow);
+ }
+ resultSet.encodesComplete();
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.flip();
+ decoder.readObject();
+ }
+ resultSet.decodesComplete();
+
+ time("Flow", resultSet);
+ }
+
+ private void benchmarkProperties() throws IOException {
+ Properties properties = new Properties();
+ properties.setTo("queue:1");
+ properties.setMessageId("ID:Message:1");
+ properties.setCreationTime(new Date(System.currentTimeMillis()));
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.clear();
+ encoder.writeObject(properties);
+ }
+ resultSet.encodesComplete();
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.flip();
+ decoder.readObject();
+ }
+ resultSet.decodesComplete();
+
+ time("Properties", resultSet);
+ }
+
+ private void benchmarkMessageAnnotations() throws IOException {
+ MessageAnnotations annotations = new MessageAnnotations(new HashMap<Symbol, Object>());
+ annotations.getValue().put(Symbol.valueOf("test1"), UnsignedByte.valueOf((byte) 128));
+ annotations.getValue().put(Symbol.valueOf("test2"), UnsignedShort.valueOf((short) 128));
+ annotations.getValue().put(Symbol.valueOf("test3"), UnsignedInteger.valueOf((byte) 128));
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.clear();
+ encoder.writeObject(annotations);
+ }
+ resultSet.encodesComplete();
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.flip();
+ decoder.readObject();
+ }
+ resultSet.decodesComplete();
+
+ time("MessageAnnotations", resultSet);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void benchmarkApplicationProperties() throws IOException {
+ ApplicationProperties properties = new ApplicationProperties(new HashMap<String, Object>());
+ properties.getValue().put("test1", UnsignedByte.valueOf((byte) 128));
+ properties.getValue().put("test2", UnsignedShort.valueOf((short) 128));
+ properties.getValue().put("test3", UnsignedInteger.valueOf((byte) 128));
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.clear();
+ encoder.writeObject(properties);
+ }
+ resultSet.encodesComplete();
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.flip();
+ decoder.readObject();
+ }
+ resultSet.decodesComplete();
+
+ time("ApplicationProperties", resultSet);
+ }
+
+ private void benchmarkSymbols() throws IOException {
+ Symbol symbol1 = Symbol.valueOf("Symbol-1");
+ Symbol symbol2 = Symbol.valueOf("Symbol-2");
+ Symbol symbol3 = Symbol.valueOf("Symbol-3");
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.clear();
+ encoder.writeSymbol(symbol1);
+ encoder.writeSymbol(symbol2);
+ encoder.writeSymbol(symbol3);
+ }
+ resultSet.encodesComplete();
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.flip();
+ decoder.readSymbol();
+ decoder.readSymbol();
+ decoder.readSymbol();
+ }
+ resultSet.decodesComplete();
+
+ time("Symbol", resultSet);
+ }
+
+ private void benchmarkDisposition() throws IOException {
+ Disposition disposition = new Disposition();
+ disposition.setRole(Role.RECEIVER);
+ disposition.setSettled(true);
+ disposition.setState(Accepted.getInstance());
+ disposition.setFirst(UnsignedInteger.valueOf(2));
+ disposition.setLast(UnsignedInteger.valueOf(2));
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.clear();
+ encoder.writeObject(disposition);
+ }
+ resultSet.encodesComplete();
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.flip();
+ decoder.readObject();
+ }
+ resultSet.decodesComplete();
+
+ time("Disposition", resultSet);
+ }
+
+ private void benchmarkString() throws IOException {
+ String string1 = new String("String-1");
+ String string2 = new String("String-2");
+ String string3 = new String("String-3");
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.clear();
+ encoder.writeString(string1);
+ encoder.writeString(string2);
+ encoder.writeString(string3);
+ }
+ resultSet.encodesComplete();
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.flip();
+ decoder.readString();
+ decoder.readString();
+ decoder.readString();
+ }
+ resultSet.decodesComplete();
+
+ time("String", resultSet);
+ }
+
+ private void benchmarkData() throws IOException {
+ Data data1 = new Data(new Binary(new byte[] {1, 2, 3}));
+ Data data2 = new Data(new Binary(new byte[] {4, 5, 6}));
+ Data data3 = new Data(new Binary(new byte[] {7, 8, 9}));
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.clear();
+ encoder.writeObject(data1);
+ encoder.writeObject(data2);
+ encoder.writeObject(data3);
+ }
+ resultSet.encodesComplete();
+
+ resultSet.start();
+ for (int i = 0; i < ITERATIONS; i++) {
+ byteBuf.flip();
+ decoder.readObject();
+ decoder.readObject();
+ decoder.readObject();
+ }
+ resultSet.decodesComplete();
+
+ time("Data", resultSet);
+ }
+
+ private static class BenchmarkResult {
+
+ private long startTime;
+
+ private long encodeTime;
+ private long decodeTime;
+
+ public void start() {
+ startTime = System.nanoTime();
+ }
+
+ public void encodesComplete() {
+ encodeTime = System.nanoTime() - startTime;
+ }
+
+ public void decodesComplete() {
+ decodeTime = System.nanoTime() - startTime;
+ }
+
+ public long getEncodeTimeMills() {
+ return TimeUnit.NANOSECONDS.toMillis(encodeTime);
+ }
+
+ public long getDecodeTimeMills() {
+ return TimeUnit.NANOSECONDS.toMillis(decodeTime);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/test/java/org/apache/qpid/proton/codec/CodecTestSupport.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/CodecTestSupport.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/CodecTestSupport.java
new file mode 100644
index 0000000..bb33697
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/CodecTestSupport.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Before;
+
+/**
+ * Support class for tests of the type decoders
+ */
+public class CodecTestSupport {
+
+ public static final int DEFAULT_MAX_BUFFER = 256 * 1024;
+
+ final DecoderImpl decoder = new DecoderImpl();
+ final EncoderImpl encoder = new EncoderImpl(decoder);
+
+ ByteBuffer buffer;
+
+ @Before
+ public void setUp() {
+ AMQPDefinedTypes.registerAllTypes(decoder, encoder);
+
+ buffer = ByteBuffer.allocate(getMaxBufferSize());
+
+ encoder.setByteBuffer(buffer);
+ decoder.setByteBuffer(buffer);
+ }
+
+ public int getMaxBufferSize() {
+ return DEFAULT_MAX_BUFFER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/test/java/org/apache/qpid/proton/codec/HeaderTypeCodecTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/HeaderTypeCodecTest.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/HeaderTypeCodecTest.java
new file mode 100644
index 0000000..3b5d93c
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/HeaderTypeCodecTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.junit.Test;
+
+/**
+ * Test for decoder of AMQP Header type.
+ */
+public class HeaderTypeCodecTest extends CodecTestSupport {
+
+ private final int LARGE_SIZE = 1024 * 10;
+ private final int SMALL_SIZE = 32;
+
+ @Test
+ public void testDecodeHeader() throws IOException {
+ doTestDecodeHeaderSeries(1);
+ }
+
+ @Test
+ public void testDecodeSmallSeriesOfHeaders() throws IOException {
+ doTestDecodeHeaderSeries(SMALL_SIZE);
+ }
+
+ @Test
+ public void testDecodeLargeSeriesOfHeaders() throws IOException {
+ doTestDecodeHeaderSeries(LARGE_SIZE);
+ }
+
+ private void doTestDecodeHeaderSeries(int size) throws IOException {
+ Header header = new Header();
+
+ header.setDurable(Boolean.TRUE);
+ header.setPriority(UnsignedByte.valueOf((byte) 3));
+ header.setDeliveryCount(UnsignedInteger.valueOf(10));
+ header.setFirstAcquirer(Boolean.FALSE);
+ header.setTtl(UnsignedInteger.valueOf(500));
+
+ for (int i = 0; i < size; ++i) {
+ encoder.writeObject(header);
+ }
+
+ buffer.clear();
+
+ for (int i = 0; i < size; ++i) {
+ final Object result = decoder.readObject();
+
+ assertNotNull(result);
+ assertTrue(result instanceof Header);
+
+ Header decoded = (Header) result;
+
+ assertEquals(3, decoded.getPriority().intValue());
+ assertTrue(decoded.getDurable().booleanValue());
+ }
+ }
+
+ @Test
+ public void testSkipHeader() throws IOException {
+ Header header1 = new Header();
+ Header header2 = new Header();
+
+ header1.setDurable(Boolean.FALSE);
+ header2.setDurable(Boolean.TRUE);
+
+ encoder.writeObject(header1);
+ encoder.writeObject(header2);
+
+ buffer.clear();
+
+ TypeConstructor<?> headerType = decoder.readConstructor();
+ assertEquals(Header.class, headerType.getTypeClass());
+ headerType.skipValue();
+
+ final Object result = decoder.readObject();
+
+ assertNotNull(result);
+ assertTrue(result instanceof Header);
+
+ Header decoded = (Header) result;
+ assertTrue(decoded.getDurable().booleanValue());
+ }
+
+ @Test
+ public void testDecodeHeaderArray() throws IOException {
+ Header header = new Header();
+
+ header.setDurable(Boolean.TRUE);
+ header.setPriority(UnsignedByte.valueOf((byte) 3));
+ header.setDeliveryCount(UnsignedInteger.valueOf(10));
+ header.setFirstAcquirer(Boolean.FALSE);
+ header.setTtl(UnsignedInteger.valueOf(500));
+
+ Header[] source = new Header[32];
+
+ for (int i = 0; i < source.length; ++i) {
+ source[i] = header;
+ }
+
+ encoder.writeObject(source);
+
+ buffer.clear();
+
+ final Object result = decoder.readObject();
+
+ assertNotNull(result);
+ assertTrue(result.getClass().isArray());
+
+ final Object[] resultArray = (Object[]) result;
+
+ for (int i = 0; i < source.length; ++i) {
+
+ assertTrue(resultArray[i] instanceof Header);
+
+ Header decoded = (Header) resultArray[i];
+
+ assertEquals(3, decoded.getPriority().intValue());
+ assertTrue(decoded.getDurable().booleanValue());
+ assertEquals(header.getDeliveryCount(), decoded.getDeliveryCount());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/test/java/org/apache/qpid/proton/codec/ListTypeCodecTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/ListTypeCodecTest.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/ListTypeCodecTest.java
new file mode 100644
index 0000000..6b5ad5d
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/ListTypeCodecTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.junit.Test;
+
+/**
+ * Test for the Proton List encoder / decoder
+ */
+public class ListTypeCodecTest extends CodecTestSupport {
+
+ private final int LARGE_SIZE = 1024;
+ private final int SMALL_SIZE = 32;
+
+ @Test
+ public void testDecodeSmallSeriesOfLists() throws IOException {
+ doTestDecodeListSeries(SMALL_SIZE);
+ }
+
+ @Test
+ public void testDecodeLargeSeriesOfLists() throws IOException {
+ doTestDecodeListSeries(LARGE_SIZE);
+ }
+
+ @Test
+ public void testDecodeSmallSeriesOfSymbolLists() throws IOException {
+ doTestDecodeSymbolListSeries(SMALL_SIZE);
+ }
+
+ @Test
+ public void testDecodeLargeSeriesOfSymbolLists() throws IOException {
+ doTestDecodeSymbolListSeries(LARGE_SIZE);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void doTestDecodeSymbolListSeries(int size) throws IOException {
+ List<Object> list = new ArrayList<>();
+
+ for (int i = 0; i < 50; ++i) {
+ list.add(Symbol.valueOf(String.valueOf(i)));
+ }
+
+ for (int i = 0; i < size; ++i) {
+ encoder.writeObject(list);
+ }
+
+ buffer.clear();
+
+ for (int i = 0; i < size; ++i) {
+ final Object result = decoder.readObject();
+
+ assertNotNull(result);
+ assertTrue(result instanceof List);
+
+ List<Object> resultList = (List<Object>) result;
+
+ assertEquals(list.size(), resultList.size());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void doTestDecodeListSeries(int size) throws IOException {
+ List<Object> list = new ArrayList<>();
+
+ Date timeNow = new Date(System.currentTimeMillis());
+
+ list.add("ID:Message-1:1:1:0");
+ list.add(new Binary(new byte[1]));
+ list.add("queue:work");
+ list.add(Symbol.valueOf("text/UTF-8"));
+ list.add(Symbol.valueOf("text"));
+ list.add(timeNow);
+ list.add(UnsignedInteger.valueOf(1));
+ list.add(UUID.randomUUID());
+
+ for (int i = 0; i < size; ++i) {
+ encoder.writeObject(list);
+ }
+
+ buffer.clear();
+
+ for (int i = 0; i < size; ++i) {
+ final Object result = decoder.readObject();
+
+ assertNotNull(result);
+ assertTrue(result instanceof List);
+
+ List<Object> resultList = (List<Object>) result;
+
+ assertEquals(list.size(), resultList.size());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/test/java/org/apache/qpid/proton/codec/MapTypeCodecTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/MapTypeCodecTest.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/MapTypeCodecTest.java
new file mode 100644
index 0000000..32a0119
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/MapTypeCodecTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.junit.Test;
+
+public class MapTypeCodecTest extends CodecTestSupport {
+
+ private final int LARGE_SIZE = 1024;
+ private final int SMALL_SIZE = 32;
+
+ @Test
+ public void testDecodeSmallSeriesOfMaps() throws IOException {
+ doTestDecodeMapSeries(SMALL_SIZE);
+ }
+
+ @Test
+ public void testDecodeLargeSeriesOfMaps() throws IOException {
+ doTestDecodeMapSeries(LARGE_SIZE);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void doTestDecodeMapSeries(int size) throws IOException {
+
+ String myBoolKey = "myBool";
+ boolean myBool = true;
+ String myByteKey = "myByte";
+ byte myByte = 4;
+ String myBytesKey = "myBytes";
+ byte[] myBytes = myBytesKey.getBytes();
+ String myCharKey = "myChar";
+ char myChar = 'd';
+ String myDoubleKey = "myDouble";
+ double myDouble = 1234567890123456789.1234;
+ String myFloatKey = "myFloat";
+ float myFloat = 1.1F;
+ String myIntKey = "myInt";
+ int myInt = Integer.MAX_VALUE;
+ String myLongKey = "myLong";
+ long myLong = Long.MAX_VALUE;
+ String myShortKey = "myShort";
+ short myShort = 25;
+ String myStringKey = "myString";
+ String myString = myStringKey;
+
+ Map<String, Object> map = new LinkedHashMap<String, Object>();
+ map.put(myBoolKey, myBool);
+ map.put(myByteKey, myByte);
+ map.put(myBytesKey, new Binary(myBytes));
+ map.put(myCharKey, myChar);
+ map.put(myDoubleKey, myDouble);
+ map.put(myFloatKey, myFloat);
+ map.put(myIntKey, myInt);
+ map.put(myLongKey, myLong);
+ map.put(myShortKey, myShort);
+ map.put(myStringKey, myString);
+
+ for (int i = 0; i < size; ++i) {
+ encoder.writeObject(map);
+ }
+
+ buffer.clear();
+
+ for (int i = 0; i < size; ++i) {
+ final Object result = decoder.readObject();
+
+ assertNotNull(result);
+ assertTrue(result instanceof Map);
+
+ Map<String, Object> resultMap = (Map<String, Object>) result;
+
+ assertEquals(map.size(), resultMap.size());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/test/java/org/apache/qpid/proton/codec/MessageAnnotationsTypeCodecTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/MessageAnnotationsTypeCodecTest.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/MessageAnnotationsTypeCodecTest.java
new file mode 100644
index 0000000..d1f6ba7
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/MessageAnnotationsTypeCodecTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.junit.Test;
+
+public class MessageAnnotationsTypeCodecTest extends CodecTestSupport {
+
+ private final int LARGE_SIZE = 1024;
+ private final int SMALL_SIZE = 32;
+
+ @Test
+ public void testDecodeSmallSeriesOfMessageAnnotations() throws IOException {
+ doTestDecodeMessageAnnotationsSeries(SMALL_SIZE);
+ }
+
+ @Test
+ public void testDecodeLargeSeriesOfMessageAnnotations() throws IOException {
+ doTestDecodeMessageAnnotationsSeries(LARGE_SIZE);
+ }
+
+ @Test
+ public void testDecodeLMessageAnnotations() throws IOException {
+ doTestDecodeMessageAnnotationsSeries(1);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void doTestDecodeMessageAnnotationsSeries(int size) throws IOException {
+
+ final Symbol SYMBOL_1 = Symbol.valueOf("test1");
+ final Symbol SYMBOL_2 = Symbol.valueOf("test2");
+ final Symbol SYMBOL_3 = Symbol.valueOf("test3");
+
+ MessageAnnotations annotations = new MessageAnnotations(new HashMap());
+ annotations.getValue().put(SYMBOL_1, UnsignedByte.valueOf((byte) 128));
+ annotations.getValue().put(SYMBOL_2, UnsignedShort.valueOf((short) 128));
+ annotations.getValue().put(SYMBOL_3, UnsignedInteger.valueOf(128));
+
+ for (int i = 0; i < size; ++i) {
+ encoder.writeObject(annotations);
+ }
+
+ buffer.clear();
+
+ for (int i = 0; i < size; ++i) {
+ final Object result = decoder.readObject();
+
+ assertNotNull(result);
+ assertTrue(result instanceof MessageAnnotations);
+
+ MessageAnnotations readAnnotations = (MessageAnnotations) result;
+
+ Map<Symbol, Object> resultMap = readAnnotations.getValue();
+
+ assertEquals(annotations.getValue().size(), resultMap.size());
+ assertEquals(resultMap.get(SYMBOL_1), UnsignedByte.valueOf((byte) 128));
+ assertEquals(resultMap.get(SYMBOL_2), UnsignedShort.valueOf((short) 128));
+ assertEquals(resultMap.get(SYMBOL_3), UnsignedInteger.valueOf(128));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2a2d3ff2/proton-j/src/test/java/org/apache/qpid/proton/codec/NoLocalType.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/NoLocalType.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/NoLocalType.java
new file mode 100644
index 0000000..4ec6724
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/NoLocalType.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.codec;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+/**
+ * A Described Type wrapper for JMS no local option for MessageConsumer.
+ */
+public class NoLocalType implements DescribedType {
+
+ public static final NoLocalType NO_LOCAL = new NoLocalType();
+
+ public static final UnsignedLong DESCRIPTOR_CODE = UnsignedLong.valueOf(0x0000468C00000003L);
+
+ private final String noLocal;
+
+ public NoLocalType() {
+ this.noLocal = "NoLocalFilter{}";
+ }
+
+ @Override
+ public Object getDescriptor() {
+ return DESCRIPTOR_CODE;
+ }
+
+ @Override
+ public String getDescribed() {
+ return this.noLocal;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org