You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/04/23 19:09:22 UTC
qpid-jms git commit: QPIDJMS-383 Use new APIs to reduce buffer copying
Repository: qpid-jms
Updated Branches:
refs/heads/master 8f876f926 -> 436d406ff
QPIDJMS-383 Use new APIs to reduce buffer copying
Use the new send and receive methods in proton-j 0.27.0 to reduce buffer copies
when sending and receiving.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/436d406f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/436d406f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/436d406f
Branch: refs/heads/master
Commit: 436d406ffce8d8ee0252bcfbf4aa35d1e36668a4
Parents: 8f876f9
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Mar 29 18:44:08 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Apr 23 15:08:30 2018 -0400
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpConsumer.java | 28 +-
.../jms/provider/amqp/AmqpFixedProducer.java | 3 +-
.../jms/provider/amqp/message/AmqpCodec.java | 22 +-
.../amqp/message/AmqpMessageSupport.java | 7 +-
.../amqp/message/AmqpReadableBuffer.java | 219 +++++++++
.../amqp/message/AmqpReadableBufferTest.java | 444 +++++++++++++++++++
6 files changed, 680 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index be2438e..c39b55d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -47,9 +47,6 @@ import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
/**
* AMQP Consumer object that is used to manage JMS MessageConsumer semantics.
*/
@@ -57,12 +54,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
- private static final int INITIAL_BUFFER_CAPACITY = 1024 * 128;
-
protected final AmqpSession session;
protected AsyncResult stopRequest;
protected AsyncResult pullRequest;
- protected final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
protected long incomingSequence;
protected long deliveredCount;
protected boolean deferredClose;
@@ -485,7 +479,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
JmsMessage message = null;
try {
- message = AmqpCodec.decodeMessage(this, unwrapIncomingMessage(incoming)).asJmsMessage();
+ message = AmqpCodec.decodeMessage(this, getEndpoint().recv()).asJmsMessage();
} catch (Exception e) {
LOG.warn("Error on transform: {}", e.getMessage());
// TODO - We could signal provider error but not sure we want to fail
@@ -495,8 +489,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
// a bytes messages as a fall back.
settleDelivery(incoming, MODIFIED_FAILED_UNDELIVERABLE);
return false;
- } finally {
- incomingBuffer.clear();
}
try {
@@ -585,24 +577,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
}
- protected ByteBuf unwrapIncomingMessage(Delivery incoming) {
- int count;
-
- // Attempt to preemptively size the buffer for the incoming delivery.
- if (incomingBuffer.capacity() < incoming.available()) {
- incomingBuffer.capacity(incoming.available());
- }
-
- while ((count = getEndpoint().recv(incomingBuffer.array(), incomingBuffer.writerIndex(), incomingBuffer.writableBytes())) > 0) {
- incomingBuffer.writerIndex(incomingBuffer.writerIndex() + count);
- if (!incomingBuffer.isWritable()) {
- incomingBuffer.capacity((int) (incomingBuffer.capacity() * 1.5));
- }
- }
-
- return incomingBuffer;
- }
-
public void preCommit() {
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index e93d74e..b2036fe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -32,6 +32,7 @@ import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.meta.JmsConnectionInfo;
import org.apache.qpid.jms.meta.JmsProducerInfo;
import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.amqp.message.AmqpReadableBuffer;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
@@ -138,7 +139,7 @@ public class AmqpFixedProducer extends AmqpProducer {
// Write the already encoded AMQP message into the Sender
ByteBuf encoded = (ByteBuf) envelope.getPayload();
- getEndpoint().send(encoded.array(), encoded.arrayOffset() + encoded.readerIndex(), encoded.readableBytes());
+ getEndpoint().sendNoCopy(new AmqpReadableBuffer(encoded.duplicate()));
AmqpProvider provider = getParent().getProvider();
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
index 85d8d06..733294f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
@@ -28,7 +28,6 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.SERIA
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.isContentType;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@@ -50,6 +49,7 @@ import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.AMQPDefinedTypes;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import io.netty.buffer.ByteBuf;
@@ -196,11 +196,10 @@ public final class AmqpCodec {
*
* @throws IOException if an error occurs while creating the message objects.
*/
- public static AmqpJmsMessageFacade decodeMessage(AmqpConsumer consumer, ByteBuf messageBytes) throws IOException {
+ public static AmqpJmsMessageFacade decodeMessage(AmqpConsumer consumer, ReadableBuffer messageBytes) throws IOException {
DecoderImpl decoder = getDecoder();
- ByteBuffer buffer = messageBytes.nioBuffer();
- decoder.setByteBuffer(buffer);
+ decoder.setBuffer(messageBytes);
Header header = null;
DeliveryAnnotations deliveryAnnotations = null;
@@ -211,13 +210,13 @@ public final class AmqpCodec {
Footer footer = null;
Section section = null;
- if (buffer.hasRemaining()) {
+ if (messageBytes.hasRemaining()) {
section = (Section) decoder.readObject();
}
if (section instanceof Header) {
header = (Header) section;
- if (buffer.hasRemaining()) {
+ if (messageBytes.hasRemaining()) {
section = (Section) decoder.readObject();
} else {
section = null;
@@ -227,7 +226,7 @@ public final class AmqpCodec {
if (section instanceof DeliveryAnnotations) {
deliveryAnnotations = (DeliveryAnnotations) section;
- if (buffer.hasRemaining()) {
+ if (messageBytes.hasRemaining()) {
section = (Section) decoder.readObject();
} else {
section = null;
@@ -237,7 +236,7 @@ public final class AmqpCodec {
if (section instanceof MessageAnnotations) {
messageAnnotations = (MessageAnnotations) section;
- if (buffer.hasRemaining()) {
+ if (messageBytes.hasRemaining()) {
section = (Section) decoder.readObject();
} else {
section = null;
@@ -247,7 +246,7 @@ public final class AmqpCodec {
if (section instanceof Properties) {
properties = (Properties) section;
- if (buffer.hasRemaining()) {
+ if (messageBytes.hasRemaining()) {
section = (Section) decoder.readObject();
} else {
section = null;
@@ -257,7 +256,7 @@ public final class AmqpCodec {
if (section instanceof ApplicationProperties) {
applicationProperties = (ApplicationProperties) section;
- if (buffer.hasRemaining()) {
+ if (messageBytes.hasRemaining()) {
section = (Section) decoder.readObject();
} else {
section = null;
@@ -267,7 +266,7 @@ public final class AmqpCodec {
if (section != null && !(section instanceof Footer)) {
body = section;
- if (buffer.hasRemaining()) {
+ if (messageBytes.hasRemaining()) {
section = (Section) decoder.readObject();
} else {
section = null;
@@ -279,7 +278,6 @@ public final class AmqpCodec {
}
decoder.setByteBuffer(null);
- messageBytes.resetReaderIndex();
// First we try the easy way, if the annotation is there we don't have to work hard.
AmqpJmsMessageFacade result = createFromMsgAnnotation(messageAnnotations);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
index 702870b..3303628 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
@@ -16,14 +16,15 @@
*/
package org.apache.qpid.jms.provider.amqp.message;
+import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.message.Message;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
/**
* Support class containing constant values and static methods that are
@@ -179,7 +180,7 @@ public final class AmqpMessageSupport {
*
* @return a buffer containing the wire level representation of the input Message.
*/
- public static ByteBuf encodeMessage(Message message) {
+ public static ReadableBuffer encodeMessage(Message message) {
final int BUFFER_SIZE = 4096;
byte[] encodedMessage = new byte[BUFFER_SIZE];
int encodedSize = 0;
@@ -192,6 +193,6 @@ public final class AmqpMessageSupport {
}
}
- return Unpooled.wrappedBuffer(encodedMessage, 0, encodedSize);
+ return ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(encodedMessage, 0, encodedSize));
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java
new file mode 100644
index 0000000..7f14bf4
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * ReadableBuffer implementation that wraps a Netty ByteBuf
+ */
+public class AmqpReadableBuffer implements ReadableBuffer {
+
+ private ByteBuf buffer;
+
+ public AmqpReadableBuffer(ByteBuf buffer) {
+ this.buffer = buffer;
+ }
+
+ public ByteBuf getBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public int capacity() {
+ return buffer.capacity();
+ }
+
+ @Override
+ public boolean hasArray() {
+ return buffer.hasArray();
+ }
+
+ @Override
+ public byte[] array() {
+ return buffer.array();
+ }
+
+ @Override
+ public int arrayOffset() {
+ return buffer.arrayOffset() + buffer.readerIndex();
+ }
+
+ @Override
+ public ReadableBuffer reclaimRead() {
+ return this;
+ }
+
+ @Override
+ public byte get() {
+ return buffer.readByte();
+ }
+
+ @Override
+ public byte get(int index) {
+ return buffer.getByte(index);
+ }
+
+ @Override
+ public int getInt() {
+ return buffer.readInt();
+ }
+
+ @Override
+ public long getLong() {
+ return buffer.readLong();
+ }
+
+ @Override
+ public short getShort() {
+ return buffer.readShort();
+ }
+
+ @Override
+ public float getFloat() {
+ return buffer.readFloat();
+ }
+
+ @Override
+ public double getDouble() {
+ return buffer.readDouble();
+ }
+
+ @Override
+ public ReadableBuffer get(byte[] target, int offset, int length) {
+ buffer.readBytes(target, offset, length);
+ return this;
+ }
+
+ @Override
+ public ReadableBuffer get(byte[] target) {
+ buffer.readBytes(target);
+ return this;
+ }
+
+ @Override
+ public ReadableBuffer get(WritableBuffer target) {
+ int start = target.position();
+
+ if (buffer.hasArray()) {
+ target.put(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
+ } else {
+ target.put(buffer.nioBuffer());
+ }
+
+ int written = target.position() - start;
+
+ buffer.readerIndex(buffer.readerIndex() + written);
+
+ return this;
+ }
+
+ @Override
+ public ReadableBuffer slice() {
+ return new AmqpReadableBuffer(buffer.slice());
+ }
+
+ @Override
+ public ReadableBuffer flip() {
+ buffer.setIndex(0, buffer.readerIndex());
+ return this;
+ }
+
+ @Override
+ public ReadableBuffer limit(int limit) {
+ buffer.writerIndex(limit);
+ return this;
+ }
+
+ @Override
+ public int limit() {
+ return buffer.writerIndex();
+ }
+
+ @Override
+ public ReadableBuffer position(int position) {
+ buffer.readerIndex(position);
+ return this;
+ }
+
+ @Override
+ public int position() {
+ return buffer.readerIndex();
+ }
+
+ @Override
+ public ReadableBuffer mark() {
+ buffer.markReaderIndex();
+ return this;
+ }
+
+ @Override
+ public ReadableBuffer reset() {
+ buffer.resetReaderIndex();
+ return this;
+ }
+
+ @Override
+ public ReadableBuffer rewind() {
+ buffer.readerIndex(0);
+ return this;
+ }
+
+ @Override
+ public ReadableBuffer clear() {
+ buffer.setIndex(0, buffer.capacity());
+ return this;
+ }
+
+ @Override
+ public int remaining() {
+ return buffer.readableBytes();
+ }
+
+ @Override
+ public boolean hasRemaining() {
+ return buffer.isReadable();
+ }
+
+ @Override
+ public ReadableBuffer duplicate() {
+ return new AmqpReadableBuffer(buffer.duplicate());
+ }
+
+ @Override
+ public ByteBuffer byteBuffer() {
+ return buffer.nioBuffer();
+ }
+
+ @Override
+ public String readUTF8() throws CharacterCodingException {
+ return buffer.toString(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public String readString(CharsetDecoder decoder) throws CharacterCodingException {
+ return buffer.toString(StandardCharsets.UTF_8);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/436d406f/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBufferTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBufferTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBufferTest.java
new file mode 100644
index 0000000..59bd371
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBufferTest.java
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+/**
+ * Tests for the ReadableBuffer wrapper that uses Netty ByteBuf underneath
+ */
+public class AmqpReadableBufferTest {
+
+ @Test
+ public void testWrapBuffer() {
+ ByteBuf byteBuffer = Unpooled.buffer(100, 100);
+
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ assertEquals(100, buffer.capacity());
+ assertSame(byteBuffer, buffer.getBuffer());
+ assertSame(buffer, buffer.reclaimRead());
+ }
+
+ @Test
+ public void testArrayAccess() {
+ ByteBuf byteBuffer = Unpooled.buffer(100, 100);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ assertTrue(buffer.hasArray());
+ assertSame(buffer.array(), byteBuffer.array());
+ assertEquals(buffer.arrayOffset(), byteBuffer.arrayOffset());
+ }
+
+ @Test
+ public void testArrayAccessWhenNoArray() {
+ ByteBuf byteBuffer = Unpooled.directBuffer();
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ assertFalse(buffer.hasArray());
+ }
+
+ @Test
+ public void testByteBuffer() {
+ byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ ByteBuffer nioBuffer = buffer.byteBuffer();
+ assertEquals(data.length, nioBuffer.remaining());
+
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i], nioBuffer.get());
+ }
+ }
+
+ @Test
+ public void testGet() {
+ byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i], buffer.get());
+ }
+
+ assertFalse(buffer.hasRemaining());
+
+ try {
+ buffer.get();
+ fail("Should throw an IndexOutOfBoundsException");
+ } catch (IndexOutOfBoundsException ioe) {}
+ }
+
+ @Test
+ public void testGetIndex() {
+ byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i], buffer.get(i));
+ }
+
+ assertTrue(buffer.hasRemaining());
+ }
+
+ @Test
+ public void testGetShort() {
+ byte[] data = new byte[] { 0, 1 };
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ assertEquals(1, buffer.getShort());
+ assertFalse(buffer.hasRemaining());
+
+ try {
+ buffer.getShort();
+ fail("Should throw an IndexOutOfBoundsException");
+ } catch (IndexOutOfBoundsException ioe) {}
+ }
+
+ @Test
+ public void testGetInt() {
+ byte[] data = new byte[] { 0, 0, 0, 1 };
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ assertEquals(1, buffer.getInt());
+ assertFalse(buffer.hasRemaining());
+
+ try {
+ buffer.getInt();
+ fail("Should throw an IndexOutOfBoundsException");
+ } catch (IndexOutOfBoundsException ioe) {}
+ }
+
+ @Test
+ public void testGetLong() {
+ byte[] data = new byte[] { 0, 0, 0, 0, 0, 0, 0, 1 };
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ assertEquals(1, buffer.getLong());
+ assertFalse(buffer.hasRemaining());
+
+ try {
+ buffer.getLong();
+ fail("Should throw an IndexOutOfBoundsException");
+ } catch (IndexOutOfBoundsException ioe) {}
+ }
+
+ @Test
+ public void testGetFloat() {
+ byte[] data = new byte[] { 0, 0, 0, 0 };
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ assertEquals(0, buffer.getFloat(), 0.0);
+ assertFalse(buffer.hasRemaining());
+
+ try {
+ buffer.getFloat();
+ fail("Should throw an IndexOutOfBoundsException");
+ } catch (IndexOutOfBoundsException ioe) {}
+ }
+
+ @Test
+ public void testGetDouble() {
+ byte[] data = new byte[] { 0, 0, 0, 0, 0, 0, 0, 0 };
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ assertEquals(0, buffer.getDouble(), 0.0);
+ assertFalse(buffer.hasRemaining());
+
+ try {
+ buffer.getDouble();
+ fail("Should throw an IndexOutOfBoundsException");
+ } catch (IndexOutOfBoundsException ioe) {}
+ }
+
+ @Test
+ public void testGetBytes() {
+ byte[] data = new byte[] { 0, 1, 2, 3, 4};
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ byte[] target = new byte[data.length];
+
+ buffer.get(target);
+ assertFalse(buffer.hasRemaining());
+ assertArrayEquals(data, target);
+
+ try {
+ buffer.get(target);
+ fail("Should throw an IndexOutOfBoundsException");
+ } catch (IndexOutOfBoundsException ioe) {}
+ }
+
+ @Test
+ public void testGetBytesIntInt() {
+ byte[] data = new byte[] { 0, 1, 2, 3, 4};
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ byte[] target = new byte[data.length];
+
+ buffer.get(target, 0, target.length);
+ assertFalse(buffer.hasRemaining());
+ assertArrayEquals(data, target);
+
+ try {
+ buffer.get(target, 0, target.length);
+ fail("Should throw an IndexOutOfBoundsException");
+ } catch (IndexOutOfBoundsException ioe) {}
+ }
+
+ @Test
+ public void testGetBytesToWritableBuffer() {
+ byte[] data = new byte[] { 0, 1, 2, 3, 4};
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+ ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length);
+ AmqpWritableBuffer target = new AmqpWritableBuffer(targetBuffer);
+
+ buffer.get(target);
+ assertFalse(buffer.hasRemaining());
+ assertArrayEquals(targetBuffer.array(), data);
+ }
+
+ @Test
+ public void testGetBytesToWritableBufferThatIsDirect() {
+ byte[] data = new byte[] { 0, 1, 2, 3, 4};
+ ByteBuf byteBuffer = Unpooled.directBuffer(data.length, data.length);
+ byteBuffer.writeBytes(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+ ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length);
+ AmqpWritableBuffer target = new AmqpWritableBuffer(targetBuffer);
+
+ buffer.get(target);
+ assertFalse(buffer.hasRemaining());
+
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i], target.getBuffer().readByte());
+ }
+ }
+
+ @Test
+ public void testDuplicate() {
+ byte[] data = new byte[] { 0, 1, 2, 3, 4};
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ ReadableBuffer duplicate = buffer.duplicate();
+
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i], duplicate.get());
+ }
+
+ assertFalse(duplicate.hasRemaining());
+ }
+
+ @Test
+ public void testSlice() {
+ byte[] data = new byte[] { 0, 1, 2, 3, 4};
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ ReadableBuffer slice = buffer.slice();
+
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i], slice.get());
+ }
+
+ assertFalse(slice.hasRemaining());
+ }
+
+ @Test
+ public void testLimit() {
+ byte[] data = new byte[] { 1, 2 };
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ assertEquals(data.length, buffer.limit());
+ buffer.limit(1);
+ assertEquals(1, buffer.limit());
+ assertEquals(1, buffer.get());
+ assertFalse(buffer.hasRemaining());
+
+ try {
+ buffer.get();
+ fail("Should throw an IndexOutOfBoundsException");
+ } catch (IndexOutOfBoundsException ioe) {}
+ }
+
+ @Test
+ public void testClear() {
+ byte[] data = new byte[] { 0, 1, 2, 3, 4};
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ byte[] target = new byte[data.length];
+
+ buffer.get(target);
+ assertFalse(buffer.hasRemaining());
+ assertArrayEquals(data, target);
+
+ try {
+ buffer.get(target);
+ fail("Should throw an IndexOutOfBoundsException");
+ } catch (IndexOutOfBoundsException ioe) {}
+
+ buffer.clear();
+ assertTrue(buffer.hasRemaining());
+ assertEquals(data.length, buffer.remaining());
+ buffer.get(target);
+ assertFalse(buffer.hasRemaining());
+ assertArrayEquals(data, target);
+ }
+
+ @Test
+ public void testRewind() {
+ byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i], buffer.get());
+ }
+
+ assertFalse(buffer.hasRemaining());
+ buffer.rewind();
+ assertTrue(buffer.hasRemaining());
+
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i], buffer.get());
+ }
+ }
+
+ @Test
+ public void testReset() {
+ byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ buffer.mark();
+
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i], buffer.get());
+ }
+
+ assertFalse(buffer.hasRemaining());
+ buffer.reset();
+ assertTrue(buffer.hasRemaining());
+
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i], buffer.get());
+ }
+ }
+
+ @Test
+ public void testGetPosition() {
+ byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ assertEquals(buffer.position(), 0);
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(buffer.position(), i);
+ assertEquals(data[i], buffer.get());
+ assertEquals(buffer.position(), i + 1);
+ }
+ }
+
+ @Test
+ public void testSetPosition() {
+ byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i], buffer.get());
+ }
+
+ assertFalse(buffer.hasRemaining());
+ buffer.position(0);
+ assertTrue(buffer.hasRemaining());
+
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i], buffer.get());
+ }
+ }
+
+ @Test
+ public void testFlip() {
+ byte[] data = new byte[] { 0, 1, 2, 3, 4 };
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ buffer.mark();
+
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i], buffer.get());
+ }
+
+ assertFalse(buffer.hasRemaining());
+ buffer.flip();
+ assertTrue(buffer.hasRemaining());
+
+ for (int i = 0; i < data.length; i++) {
+ assertEquals(data[i], buffer.get());
+ }
+ }
+
+ @Test
+ public void testReadUTF8() throws CharacterCodingException {
+ String testString = "test-string-1";
+ byte[] asUtf8bytes = testString.getBytes(StandardCharsets.UTF_8);
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(asUtf8bytes);
+
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ assertEquals(testString, buffer.readUTF8());
+ }
+
+ @Test
+ public void testReadString() throws CharacterCodingException {
+ String testString = "test-string-1";
+ byte[] asUtf8bytes = testString.getBytes(StandardCharsets.UTF_8);
+ ByteBuf byteBuffer = Unpooled.wrappedBuffer(asUtf8bytes);
+
+ AmqpReadableBuffer buffer = new AmqpReadableBuffer(byteBuffer);
+
+ assertEquals(testString, buffer.readString(StandardCharsets.UTF_8.newDecoder()));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org