You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/04/23 19:09:22 UTC

qpid-jms git commit: QPIDJMS-383 Use new APIs to reduce buffer copying

Repository: qpid-jms
Updated Branches:
  refs/heads/master 8f876f926 -> 436d406ff


QPIDJMS-383 Use new APIs to reduce buffer copying

Use the new send and receive methods in proton-j 0.27.0 to reduce buffer copies
when sending and receiving.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/436d406f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/436d406f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/436d406f

Branch: refs/heads/master
Commit: 436d406ffce8d8ee0252bcfbf4aa35d1e36668a4
Parents: 8f876f9
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Mar 29 18:44:08 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Apr 23 15:08:30 2018 -0400

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  28 +-
 .../jms/provider/amqp/AmqpFixedProducer.java    |   3 +-
 .../jms/provider/amqp/message/AmqpCodec.java    |  22 +-
 .../amqp/message/AmqpMessageSupport.java        |   7 +-
 .../amqp/message/AmqpReadableBuffer.java        | 219 +++++++++
 .../amqp/message/AmqpReadableBufferTest.java    | 444 +++++++++++++++++++
 6 files changed, 680 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index be2438e..c39b55d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -47,9 +47,6 @@ import org.apache.qpid.proton.engine.Receiver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
 /**
  * AMQP Consumer object that is used to manage JMS MessageConsumer semantics.
  */
@@ -57,12 +54,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
 
-    private static final int INITIAL_BUFFER_CAPACITY = 1024 * 128;
-
     protected final AmqpSession session;
     protected AsyncResult stopRequest;
     protected AsyncResult pullRequest;
-    protected final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
     protected long incomingSequence;
     protected long deliveredCount;
     protected boolean deferredClose;
@@ -485,7 +479,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
         JmsMessage message = null;
         try {
-            message = AmqpCodec.decodeMessage(this, unwrapIncomingMessage(incoming)).asJmsMessage();
+            message = AmqpCodec.decodeMessage(this, getEndpoint().recv()).asJmsMessage();
         } catch (Exception e) {
             LOG.warn("Error on transform: {}", e.getMessage());
             // TODO - We could signal provider error but not sure we want to fail
@@ -495,8 +489,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             //        a bytes messages as a fall back.
             settleDelivery(incoming, MODIFIED_FAILED_UNDELIVERABLE);
             return false;
-        } finally {
-            incomingBuffer.clear();
         }
 
         try {
@@ -585,24 +577,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         }
     }
 
-    protected ByteBuf unwrapIncomingMessage(Delivery incoming) {
-        int count;
-
-        // Attempt to preemptively size the buffer for the incoming delivery.
-        if (incomingBuffer.capacity() < incoming.available()) {
-            incomingBuffer.capacity(incoming.available());
-        }
-
-        while ((count = getEndpoint().recv(incomingBuffer.array(), incomingBuffer.writerIndex(), incomingBuffer.writableBytes())) > 0) {
-            incomingBuffer.writerIndex(incomingBuffer.writerIndex() + count);
-            if (!incomingBuffer.isWritable()) {
-                incomingBuffer.capacity((int) (incomingBuffer.capacity() * 1.5));
-            }
-        }
-
-        return incomingBuffer;
-    }
-
     public void preCommit() {
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index e93d74e..b2036fe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -32,6 +32,7 @@ import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.amqp.message.AmqpReadableBuffer;
 import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Modified;
@@ -138,7 +139,7 @@ public class AmqpFixedProducer extends AmqpProducer {
 
         // Write the already encoded AMQP message into the Sender
         ByteBuf encoded = (ByteBuf) envelope.getPayload();
-        getEndpoint().send(encoded.array(), encoded.arrayOffset() + encoded.readerIndex(), encoded.readableBytes());
+        getEndpoint().sendNoCopy(new AmqpReadableBuffer(encoded.duplicate()));
 
         AmqpProvider provider = getParent().getProvider();
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
index 85d8d06..733294f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
@@ -28,7 +28,6 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.SERIA
 import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.isContentType;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 
@@ -50,6 +49,7 @@ import org.apache.qpid.proton.amqp.messaging.Section;
 import org.apache.qpid.proton.codec.AMQPDefinedTypes;
 import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
 
 import io.netty.buffer.ByteBuf;
@@ -196,11 +196,10 @@ public final class AmqpCodec {
      *
      * @throws IOException if an error occurs while creating the message objects.
      */
-    public static AmqpJmsMessageFacade decodeMessage(AmqpConsumer consumer, ByteBuf messageBytes) throws IOException {
+    public static AmqpJmsMessageFacade decodeMessage(AmqpConsumer consumer, ReadableBuffer messageBytes) throws IOException {
 
         DecoderImpl decoder = getDecoder();
-        ByteBuffer buffer = messageBytes.nioBuffer();
-        decoder.setByteBuffer(buffer);
+        decoder.setBuffer(messageBytes);
 
         Header header = null;
         DeliveryAnnotations deliveryAnnotations = null;
@@ -211,13 +210,13 @@ public final class AmqpCodec {
         Footer footer = null;
         Section section = null;
 
-        if (buffer.hasRemaining()) {
+        if (messageBytes.hasRemaining()) {
             section = (Section) decoder.readObject();
         }
 
         if (section instanceof Header) {
             header = (Header) section;
-            if (buffer.hasRemaining()) {
+            if (messageBytes.hasRemaining()) {
                 section = (Section) decoder.readObject();
             } else {
                 section = null;
@@ -227,7 +226,7 @@ public final class AmqpCodec {
         if (section instanceof DeliveryAnnotations) {
             deliveryAnnotations = (DeliveryAnnotations) section;
 
-            if (buffer.hasRemaining()) {
+            if (messageBytes.hasRemaining()) {
                 section = (Section) decoder.readObject();
             } else {
                 section = null;
@@ -237,7 +236,7 @@ public final class AmqpCodec {
         if (section instanceof MessageAnnotations) {
             messageAnnotations = (MessageAnnotations) section;
 
-            if (buffer.hasRemaining()) {
+            if (messageBytes.hasRemaining()) {
                 section = (Section) decoder.readObject();
             } else {
                 section = null;
@@ -247,7 +246,7 @@ public final class AmqpCodec {
         if (section instanceof Properties) {
             properties = (Properties) section;
 
-            if (buffer.hasRemaining()) {
+            if (messageBytes.hasRemaining()) {
                 section = (Section) decoder.readObject();
             } else {
                 section = null;
@@ -257,7 +256,7 @@ public final class AmqpCodec {
         if (section instanceof ApplicationProperties) {
             applicationProperties = (ApplicationProperties) section;
 
-            if (buffer.hasRemaining()) {
+            if (messageBytes.hasRemaining()) {
                 section = (Section) decoder.readObject();
             } else {
                 section = null;
@@ -267,7 +266,7 @@ public final class AmqpCodec {
         if (section != null && !(section instanceof Footer)) {
             body = section;
 
-            if (buffer.hasRemaining()) {
+            if (messageBytes.hasRemaining()) {
                 section = (Section) decoder.readObject();
             } else {
                 section = null;
@@ -279,7 +278,6 @@ public final class AmqpCodec {
         }
 
         decoder.setByteBuffer(null);
-        messageBytes.resetReaderIndex();
 
         // First we try the easy way, if the annotation is there we don't have to work hard.
         AmqpJmsMessageFacade result = createFromMsgAnnotation(messageAnnotations);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
index 702870b..3303628 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
@@ -16,14 +16,15 @@
  */
 package org.apache.qpid.jms.provider.amqp.message;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
 
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.message.Message;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 
 /**
  * Support class containing constant values and static methods that are
@@ -179,7 +180,7 @@ public final class AmqpMessageSupport {
      *
      * @return a buffer containing the wire level representation of the input Message.
      */
-    public static ByteBuf encodeMessage(Message message) {
+    public static ReadableBuffer encodeMessage(Message message) {
         final int BUFFER_SIZE = 4096;
         byte[] encodedMessage = new byte[BUFFER_SIZE];
         int encodedSize = 0;
@@ -192,6 +193,6 @@ public final class AmqpMessageSupport {
             }
         }
 
-        return Unpooled.wrappedBuffer(encodedMessage, 0, encodedSize);
+        return ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(encodedMessage, 0, encodedSize));
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java
new file mode 100644
index 0000000..7f14bf4
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * ReadableBuffer implementation that wraps a Netty ByteBuf
+ */
+public class AmqpReadableBuffer implements ReadableBuffer {
+
+    private ByteBuf buffer;
+
+    public AmqpReadableBuffer(ByteBuf buffer) {
+        this.buffer = buffer;
+    }
+
+    public ByteBuf getBuffer() {
+        return buffer;
+    }
+
+    @Override
+    public int capacity() {
+        return buffer.capacity();
+    }
+
+    @Override
+    public boolean hasArray() {
+        return buffer.hasArray();
+    }
+
+    @Override
+    public byte[] array() {
+        return buffer.array();
+    }
+
+    @Override
+    public int arrayOffset() {
+        return buffer.arrayOffset() + buffer.readerIndex();
+    }
+
+    @Override
+    public ReadableBuffer reclaimRead() {
+        return this;
+    }
+
+    @Override
+    public byte get() {
+        return buffer.readByte();
+    }
+
+    @Override
+    public byte get(int index) {
+        return buffer.getByte(index);
+    }
+
+    @Override
+    public int getInt() {
+        return buffer.readInt();
+    }
+
+    @Override
+    public long getLong() {
+        return buffer.readLong();
+    }
+
+    @Override
+    public short getShort() {
+        return buffer.readShort();
+    }
+
+    @Override
+    public float getFloat() {
+        return buffer.readFloat();
+    }
+
+    @Override
+    public double getDouble() {
+        return buffer.readDouble();
+    }
+
+    @Override
+    public ReadableBuffer get(byte[] target, int offset, int length) {
+        buffer.readBytes(target, offset, length);
+        return this;
+    }
+
+    @Override
+    public ReadableBuffer get(byte[] target) {
+        buffer.readBytes(target);
+        return this;
+    }
+
+    @Override
+    public ReadableBuffer get(WritableBuffer target) {
+        int start = target.position();
+
+        if (buffer.hasArray()) {
+            target.put(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
+        } else {
+            target.put(buffer.nioBuffer());
+        }
+
+        int written = target.position() - start;
+
+        buffer.readerIndex(buffer.readerIndex() + written);
+
+        return this;
+    }
+
+    @Override
+    public ReadableBuffer slice() {
+        return new AmqpReadableBuffer(buffer.slice());
+    }
+
+    @Override
+    public ReadableBuffer flip() {
+        buffer.setIndex(0, buffer.readerIndex());
+        return this;
+    }
+
+    @Override
+    public ReadableBuffer limit(int limit) {
+        buffer.writerIndex(limit);
+        return this;
+    }
+
+    @Override
+    public int limit() {
+        return buffer.writerIndex();
+    }
+
+    @Override
+    public ReadableBuffer position(int position) {
+        buffer.readerIndex(position);
+        return this;
+    }
+
+    @Override
+    public int position() {
+        return buffer.readerIndex();
+    }
+
+    @Override
+    public ReadableBuffer mark() {
+        buffer.markReaderIndex();
+        return this;
+    }
+
+    @Override
+    public ReadableBuffer reset() {
+        buffer.resetReaderIndex();
+        return this;
+    }
+
+    @Override
+    public ReadableBuffer rewind() {
+        buffer.readerIndex(0);
+        return this;
+    }
+
+    @Override
+    public ReadableBuffer clear() {
+        buffer.setIndex(0, buffer.capacity());
+        return this;
+    }
+
+    @Override
+    public int remaining() {
+        return buffer.readableBytes();
+    }
+
+    @Override
+    public boolean hasRemaining() {
+        return buffer.isReadable();
+    }
+
+    @Override
+    public ReadableBuffer duplicate() {
+        return new AmqpReadableBuffer(buffer.duplicate());
+    }
+
+    @Override
+    public ByteBuffer byteBuffer() {
+        return buffer.nioBuffer();
+    }
+
+    @Override
+    public String readUTF8() throws CharacterCodingException {
+        return buffer.toString(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public String readString(CharsetDecoder decoder) throws CharacterCodingException {
+        return buffer.toString(StandardCharsets.UTF_8);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBufferTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBufferTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBufferTest.java
new file mode 100644
index 0000000..59bd371
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBufferTest.java
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+/**
+ * Tests for the ReadableBuffer wrapper that uses Netty ByteBuf underneath
+ */
+public class AmqpReadableBufferTest {
+
+    @Test
+    public void testWrapBuffer() {
+        ByteBuf byteBuffer = Unpooled.buffer(100, 100);
+
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        assertEquals(100, buffer.capacity());
+        assertSame(byteBuffer, buffer.getBuffer());
+        assertSame(buffer, buffer.reclaimRead());
+    }
+
+    @Test
+    public void testArrayAccess() {
+        ByteBuf byteBuffer = Unpooled.buffer(100, 100);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        assertTrue(buffer.hasArray());
+        assertSame(buffer.array(), byteBuffer.array());
+        assertEquals(buffer.arrayOffset(), byteBuffer.arrayOffset());
+    }
+
+    @Test
+    public void testArrayAccessWhenNoArray() {
+        ByteBuf byteBuffer = Unpooled.directBuffer();
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        assertFalse(buffer.hasArray());
+    }
+
+    @Test
+    public void testByteBuffer() {
+        byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        ByteBuffer nioBuffer = buffer.byteBuffer();
+        assertEquals(data.length, nioBuffer.remaining());
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(data[i], nioBuffer.get());
+        }
+    }
+
+    @Test
+    public void testGet() {
+        byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(data[i], buffer.get());
+        }
+
+        assertFalse(buffer.hasRemaining());
+
+        try {
+            buffer.get();
+            fail("Should throw an IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException ioe) {}
+    }
+
+    @Test
+    public void testGetIndex() {
+        byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(data[i], buffer.get(i));
+        }
+
+        assertTrue(buffer.hasRemaining());
+    }
+
+    @Test
+    public void testGetShort() {
+        byte[] data = new byte[] { 0, 1 };
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        assertEquals(1, buffer.getShort());
+        assertFalse(buffer.hasRemaining());
+
+        try {
+            buffer.getShort();
+            fail("Should throw an IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException ioe) {}
+    }
+
+    @Test
+    public void testGetInt() {
+        byte[] data = new byte[] { 0, 0, 0, 1 };
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        assertEquals(1, buffer.getInt());
+        assertFalse(buffer.hasRemaining());
+
+        try {
+            buffer.getInt();
+            fail("Should throw an IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException ioe) {}
+    }
+
+    @Test
+    public void testGetLong() {
+        byte[] data = new byte[] { 0, 0, 0, 0, 0, 0, 0, 1 };
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        assertEquals(1, buffer.getLong());
+        assertFalse(buffer.hasRemaining());
+
+        try {
+            buffer.getLong();
+            fail("Should throw an IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException ioe) {}
+    }
+
+    @Test
+    public void testGetFloat() {
+        byte[] data = new byte[] { 0, 0, 0, 0 };
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        assertEquals(0, buffer.getFloat(), 0.0);
+        assertFalse(buffer.hasRemaining());
+
+        try {
+            buffer.getFloat();
+            fail("Should throw an IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException ioe) {}
+    }
+
+    @Test
+    public void testGetDouble() {
+        byte[] data = new byte[] { 0, 0, 0, 0, 0, 0, 0, 0 };
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        assertEquals(0, buffer.getDouble(), 0.0);
+        assertFalse(buffer.hasRemaining());
+
+        try {
+            buffer.getDouble();
+            fail("Should throw an IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException ioe) {}
+    }
+
+    @Test
+    public void testGetBytes() {
+        byte[] data = new byte[] { 0, 1, 2, 3, 4};
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        byte[] target = new byte[data.length];
+
+        buffer.get(target);
+        assertFalse(buffer.hasRemaining());
+        assertArrayEquals(data, target);
+
+        try {
+            buffer.get(target);
+            fail("Should throw an IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException ioe) {}
+    }
+
+    @Test
+    public void testGetBytesIntInt() {
+        byte[] data = new byte[] { 0, 1, 2, 3, 4};
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        byte[] target = new byte[data.length];
+
+        buffer.get(target, 0, target.length);
+        assertFalse(buffer.hasRemaining());
+        assertArrayEquals(data, target);
+
+        try {
+            buffer.get(target, 0, target.length);
+            fail("Should throw an IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException ioe) {}
+    }
+
+    @Test
+    public void testGetBytesToWritableBuffer() {
+        byte[] data = new byte[] { 0, 1, 2, 3, 4};
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+        ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length);
+        AmqpWritableBuffer target = new AmqpWritableBuffer(targetBuffer);
+
+        buffer.get(target);
+        assertFalse(buffer.hasRemaining());
+        assertArrayEquals(targetBuffer.array(), data);
+    }
+
+    @Test
+    public void testGetBytesToWritableBufferThatIsDirect() {
+        byte[] data = new byte[] { 0, 1, 2, 3, 4};
+        ByteBuf byteBuffer = Unpooled.directBuffer(data.length, data.length);
+        byteBuffer.writeBytes(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+        ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length);
+        AmqpWritableBuffer target = new AmqpWritableBuffer(targetBuffer);
+
+        buffer.get(target);
+        assertFalse(buffer.hasRemaining());
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(data[i], target.getBuffer().readByte());
+        }
+    }
+
+    @Test
+    public void testDuplicate() {
+        byte[] data = new byte[] { 0, 1, 2, 3, 4};
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        ReadableBuffer duplicate = buffer.duplicate();
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(data[i], duplicate.get());
+        }
+
+        assertFalse(duplicate.hasRemaining());
+    }
+
+    @Test
+    public void testSlice() {
+        byte[] data = new byte[] { 0, 1, 2, 3, 4};
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        ReadableBuffer slice = buffer.slice();
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(data[i], slice.get());
+        }
+
+        assertFalse(slice.hasRemaining());
+    }
+
+    @Test
+    public void testLimit() {
+        byte[] data = new byte[] { 1, 2 };
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        assertEquals(data.length, buffer.limit());
+        buffer.limit(1);
+        assertEquals(1, buffer.limit());
+        assertEquals(1, buffer.get());
+        assertFalse(buffer.hasRemaining());
+
+        try {
+            buffer.get();
+            fail("Should throw an IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException ioe) {}
+    }
+
+    @Test
+    public void testClear() {
+        byte[] data = new byte[] { 0, 1, 2, 3, 4};
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        byte[] target = new byte[data.length];
+
+        buffer.get(target);
+        assertFalse(buffer.hasRemaining());
+        assertArrayEquals(data, target);
+
+        try {
+            buffer.get(target);
+            fail("Should throw an IndexOutOfBoundsException");
+        } catch (IndexOutOfBoundsException ioe) {}
+
+        buffer.clear();
+        assertTrue(buffer.hasRemaining());
+        assertEquals(data.length, buffer.remaining());
+        buffer.get(target);
+        assertFalse(buffer.hasRemaining());
+        assertArrayEquals(data, target);
+    }
+
+    @Test
+    public void testRewind() {
+        byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(data[i], buffer.get());
+        }
+
+        assertFalse(buffer.hasRemaining());
+        buffer.rewind();
+        assertTrue(buffer.hasRemaining());
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(data[i], buffer.get());
+        }
+    }
+
+    @Test
+    public void testReset() {
+        byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        buffer.mark();
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(data[i], buffer.get());
+        }
+
+        assertFalse(buffer.hasRemaining());
+        buffer.reset();
+        assertTrue(buffer.hasRemaining());
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(data[i], buffer.get());
+        }
+    }
+
+    @Test
+    public void testGetPosition() {
+        byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        assertEquals(buffer.position(), 0);
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(buffer.position(), i);
+            assertEquals(data[i], buffer.get());
+            assertEquals(buffer.position(), i + 1);
+        }
+    }
+
+    @Test
+    public void testSetPosition() {
+        byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(data[i], buffer.get());
+        }
+
+        assertFalse(buffer.hasRemaining());
+        buffer.position(0);
+        assertTrue(buffer.hasRemaining());
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(data[i], buffer.get());
+        }
+    }
+
+    @Test
+    public void testFlip() {
+        byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        buffer.mark();
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(data[i], buffer.get());
+        }
+
+        assertFalse(buffer.hasRemaining());
+        buffer.flip();
+        assertTrue(buffer.hasRemaining());
+
+        for (int i = 0; i < data.length; i++) {
+            assertEquals(data[i], buffer.get());
+        }
+    }
+
+    @Test
+    public void testReadUTF8() throws CharacterCodingException {
+        String testString = "test-string-1";
+        byte[] asUtf8bytes = testString.getBytes(StandardCharsets.UTF_8);
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(asUtf8bytes);
+
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        assertEquals(testString, buffer.readUTF8());
+    }
+
+    @Test
+    public void testReadString() throws CharacterCodingException {
+        String testString = "test-string-1";
+        byte[] asUtf8bytes = testString.getBytes(StandardCharsets.UTF_8);
+        ByteBuf byteBuffer = Unpooled.wrappedBuffer(asUtf8bytes);
+
+        AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+        assertEquals(testString, buffer.readString(StandardCharsets.UTF_8.newDecoder()));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org