You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/05/03 16:12:04 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1843 Update Qpid JMS 0.32.0 and Proton-j 0.27.1

Repository: activemq-artemis
Updated Branches:
  refs/heads/master b60f6489f -> 410cb9ee2


ARTEMIS-1843 Update Qpid JMS 0.32.0  and Proton-j 0.27.1

Use new no copy variants for the delivery send and receive and make
use of the ReadableBuffer type that is now used to convery tranfer
payloads without a copy.  Also set max outgoing frame size to match
the configured maxFrameSize for the AMQP protocol head to avoid the
case where an overly large frame can be written instead of chunking
a large message.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c1cf9ef1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c1cf9ef1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c1cf9ef1

Branch: refs/heads/master
Commit: c1cf9ef12d010cec89d6228b29f3459c3d51ee0d
Parents: b60f648
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Mar 27 17:09:48 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu May 3 12:10:25 2018 -0400

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPMessage.java       |  97 ++--
 .../amqp/broker/AMQPSessionCallback.java        |   3 +-
 .../amqp/proton/AMQPConnectionContext.java      |  15 +-
 .../proton/ProtonServerReceiverContext.java     |   5 +-
 .../amqp/proton/ProtonServerSenderContext.java  |  18 +-
 .../protocol/amqp/util/NettyReadable.java       | 125 ++++-
 .../protocol/amqp/util/NettyWritable.java       |  29 +-
 .../protocol/amqp/util/NettyReadableTest.java   | 454 +++++++++++++++++++
 .../protocol/amqp/util/NettyWritableTest.java   | 151 ++++++
 pom.xml                                         |   4 +-
 .../integration/amqp/AmqpLargeMessageTest.java  |  95 +++-
 11 files changed, 917 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1cf9ef1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 5f7dbac..2775f77 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
 import org.apache.activemq.artemis.reader.MessageUtil;
@@ -54,6 +55,7 @@ import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.amqp.messaging.Section;
 import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
@@ -69,7 +71,7 @@ public class AMQPMessage extends RefCountMessage {
    public static final int MAX_MESSAGE_PRIORITY = 9;
 
    final long messageFormat;
-   ByteBuf data;
+   ReadableBuffer data;
    boolean bufferValid;
    Boolean durable;
    long messageID;
@@ -106,7 +108,11 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    public AMQPMessage(long messageFormat, byte[] data, CoreMessageObjectPools coreMessageObjectPools) {
-      this.data = Unpooled.wrappedBuffer(data);
+      this(messageFormat, ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(data)), coreMessageObjectPools);
+   }
+
+   public AMQPMessage(long messageFormat, ReadableBuffer data, CoreMessageObjectPools coreMessageObjectPools) {
+      this.data = data;
       this.messageFormat = messageFormat;
       this.bufferValid = true;
       this.coreMessageObjectPools = coreMessageObjectPools;
@@ -136,8 +142,8 @@ public class AMQPMessage extends RefCountMessage {
          protonMessage = (MessageImpl) Message.Factory.create();
 
          if (data != null) {
-            data.readerIndex(0);
-            protonMessage.decode(data.nioBuffer());
+            data.rewind();
+            protonMessage.decode(data.duplicate());
             this._header = protonMessage.getHeader();
             protonMessage.setHeader(null);
          }
@@ -162,7 +168,6 @@ public class AMQPMessage extends RefCountMessage {
       }
    }
 
-   @SuppressWarnings("unchecked")
    private Map<String, Object> getApplicationPropertiesMap() {
       ApplicationProperties appMap = getApplicationProperties();
       Map<String, Object> map = null;
@@ -183,15 +188,15 @@ public class AMQPMessage extends RefCountMessage {
       parseHeaders();
 
       if (applicationProperties == null && appLocation >= 0) {
-         ByteBuffer buffer = getBuffer().nioBuffer();
+         ReadableBuffer buffer = data.duplicate();
          buffer.position(appLocation);
-         TLSEncode.getDecoder().setByteBuffer(buffer);
+         TLSEncode.getDecoder().setBuffer(buffer);
          Object section = TLSEncode.getDecoder().readObject();
          if (section instanceof ApplicationProperties) {
             this.applicationProperties = (ApplicationProperties) section;
          }
          this.appLocation = -1;
-         TLSEncode.getDecoder().setByteBuffer(null);
+         TLSEncode.getDecoder().setBuffer(null);
       }
 
       return applicationProperties;
@@ -202,7 +207,7 @@ public class AMQPMessage extends RefCountMessage {
          if (data == null) {
             initalizeObjects();
          } else {
-            partialDecode(data.nioBuffer());
+            partialDecode(data);
          }
          parsedHeaders = true;
       }
@@ -367,10 +372,9 @@ public class AMQPMessage extends RefCountMessage {
       rejectedConsumers.add(consumer);
    }
 
-   private synchronized void partialDecode(ByteBuffer buffer) {
+   private synchronized void partialDecode(ReadableBuffer buffer) {
       DecoderImpl decoder = TLSEncode.getDecoder();
-      decoder.setByteBuffer(buffer);
-      buffer.position(0);
+      decoder.setBuffer(buffer.rewind());
 
       _header = null;
       _deliveryAnnotations = null;
@@ -449,6 +453,7 @@ public class AMQPMessage extends RefCountMessage {
          }
       } finally {
          decoder.setByteBuffer(null);
+         data.position(0);
       }
    }
 
@@ -456,14 +461,6 @@ public class AMQPMessage extends RefCountMessage {
       return messageFormat;
    }
 
-   public int getLength() {
-      return data.array().length;
-   }
-
-   public byte[] getArray() {
-      return data.array();
-   }
-
    @Override
    public void messageChanged() {
       bufferValid = false;
@@ -475,7 +472,7 @@ public class AMQPMessage extends RefCountMessage {
       if (data == null) {
          return null;
       } else {
-         return Unpooled.wrappedBuffer(data);
+         return Unpooled.wrappedBuffer(data.byteBuffer());
       }
    }
 
@@ -489,14 +486,15 @@ public class AMQPMessage extends RefCountMessage {
    public org.apache.activemq.artemis.api.core.Message copy() {
       checkBuffer();
 
-      byte[] origin = data.array();
-      byte[] newData = new byte[data.array().length - (messagePaylodStart - headerEnds)];
+      ReadableBuffer view = data.duplicate();
 
-      // Copy the original header
-      System.arraycopy(origin, 0, newData, 0, headerEnds);
+      byte[] newData = new byte[view.remaining() - (messagePaylodStart - headerEnds)];
 
-      // Copy the body following the delivery annotations if present
-      System.arraycopy(origin, messagePaylodStart, newData, headerEnds, data.array().length - messagePaylodStart);
+      view.position(0).limit(headerEnds);
+      view.get(newData, 0, headerEnds);
+      view.clear();
+      view.position(messagePaylodStart);
+      view.get(newData, headerEnds, view.remaining());
 
       AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData);
       newEncode.setDurable(isDurable()).setMessageID(this.getMessageID());
@@ -679,7 +677,7 @@ public class AMQPMessage extends RefCountMessage {
          getProtonMessage().encode(new NettyWritable(buffer));
          byte[] bytes = new byte[buffer.writerIndex()];
          buffer.readBytes(bytes);
-         this.data = Unpooled.wrappedBuffer(bytes);
+         this.data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(bytes));
       } finally {
          buffer.release();
       }
@@ -689,7 +687,7 @@ public class AMQPMessage extends RefCountMessage {
    public int getEncodeSize() {
       checkBuffer();
       // + 20checkBuffer is an estimate for the Header with the deliveryCount
-      return data.array().length - messagePaylodStart + 20;
+      return data.remaining() - messagePaylodStart + 20;
    }
 
    @Override
@@ -715,10 +713,12 @@ public class AMQPMessage extends RefCountMessage {
             TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
          }
       } else if (headerEnds > 0) {
-         buffer.writeBytes(data, 0, headerEnds);
+         buffer.writeBytes(data.duplicate().limit(headerEnds).byteBuffer());
       }
 
-      buffer.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
+      data.position(messagePaylodStart);
+      buffer.writeBytes(data.byteBuffer());
+      data.position(0);
    }
 
    /**
@@ -734,7 +734,7 @@ public class AMQPMessage extends RefCountMessage {
     *
     * @return a Netty ByteBuf containing the encoded bytes of this Message instance.
     */
-   public ByteBuf getSendBuffer(int deliveryCount) {
+   public ReadableBuffer getSendBuffer(int deliveryCount) {
       checkBuffer();
 
       if (deliveryCount > 1) {
@@ -744,23 +744,28 @@ public class AMQPMessage extends RefCountMessage {
       } else {
          // Common case message has no delivery annotations and this is the first delivery
          // so no re-encoding or section skipping needed.
-         return data.retainedDuplicate();
+         return data.duplicate();
       }
    }
 
-   private ByteBuf createCopyWithoutDeliveryAnnotations() {
+   private ReadableBuffer createCopyWithoutDeliveryAnnotations() {
       assert headerEnds != messagePaylodStart;
 
       // The original message had delivery annotations and so we must copy into a new
       // buffer skipping the delivery annotations section as that is not meant to survive
       // beyond this hop.
+      ReadableBuffer duplicate = data.duplicate();
+
       final ByteBuf result = PooledByteBufAllocator.DEFAULT.heapBuffer(getEncodeSize());
-      result.writeBytes(data, 0, headerEnds);
-      result.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
-      return result;
+      result.writeBytes(duplicate.limit(headerEnds).byteBuffer());
+      duplicate.clear();
+      duplicate.position(messagePaylodStart);
+      result.writeBytes(duplicate.byteBuffer());
+
+      return new NettyReadable(result);
    }
 
-   private ByteBuf createCopyWithNewDeliveryCount(int deliveryCount) {
+   private ReadableBuffer createCopyWithNewDeliveryCount(int deliveryCount) {
       assert deliveryCount > 1;
 
       final int amqpDeliveryCount = deliveryCount - 1;
@@ -786,9 +791,11 @@ public class AMQPMessage extends RefCountMessage {
 
       // This will skip any existing delivery annotations that might have been present
       // in the original message.
-      result.writeBytes(data, messagePaylodStart, data.writerIndex() - messagePaylodStart);
+      data.position(messagePaylodStart);
+      result.writeBytes(data.byteBuffer());
+      data.position(0);
 
-      return result;
+      return new NettyReadable(result);
    }
 
    public TypedProperties createExtraProperties() {
@@ -1222,14 +1229,18 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    private int internalPersistSize() {
-      return data.array().length;
+      return data.remaining();
    }
 
    @Override
    public void persist(ActiveMQBuffer targetRecord) {
       checkBuffer();
       targetRecord.writeInt(internalPersistSize());
-      targetRecord.writeBytes(data.array(), 0, data.array().length );
+      if (data.hasArray()) {
+         targetRecord.writeBytes(data.array(), data.arrayOffset(), data.remaining());
+      } else {
+         targetRecord.writeBytes(data.byteBuffer());
+      }
    }
 
    @Override
@@ -1238,7 +1249,7 @@ public class AMQPMessage extends RefCountMessage {
       byte[] recordArray = new byte[size];
       record.readBytes(recordArray);
       this.messagePaylodStart = 0; // whatever was persisted will be sent
-      this.data = Unpooled.wrappedBuffer(recordArray);
+      this.data = ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(recordArray));
       this.bufferValid = true;
       this.durable = true; // it's coming from the journal, so it's durable
       parseHeaders();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1cf9ef1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 7134d3b..105d58a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -71,6 +71,7 @@ import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Receiver;
@@ -441,7 +442,7 @@ public class AMQPSessionCallback implements SessionCallback {
                           final Delivery delivery,
                           SimpleString address,
                           int messageFormat,
-                          byte[] data) throws Exception {
+                          ReadableBuffer data) throws Exception {
       AMQPMessage message = new AMQPMessage(messageFormat, data, coreMessageObjectPools);
       if (address != null) {
          message.setAddress(address);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1cf9ef1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 39f9609..4788d0d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton;
 
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
+
 import java.net.URI;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -25,7 +31,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@@ -52,11 +57,7 @@ import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.engine.Transport;
 import org.jboss.logging.Logger;
 
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
+import io.netty.buffer.ByteBuf;
 
 public class AMQPConnectionContext extends ProtonInitializable implements EventHandler {
 
@@ -118,12 +119,12 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
       transport.setChannelMax(channelMax);
       transport.setInitialRemoteMaxFrameSize(protocolManager.getInitialRemoteMaxFrameSize());
       transport.setMaxFrameSize(maxFrameSize);
+      transport.setOutboundFrameSizeLimit(maxFrameSize);
       if (!isIncomingConnection && saslClientFactory != null) {
          handler.createClientSASL();
       }
    }
 
-
    public void scheduledFlush() {
       handler.scheduledFlush();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1cf9ef1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 3c35d76..0036004 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -40,6 +40,7 @@ import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.jboss.logging.Logger;
@@ -221,10 +222,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
          receiver = ((Receiver) delivery.getLink());
 
          Transaction tx = null;
-         byte[] data;
 
-         data = new byte[delivery.available()];
-         receiver.recv(data, 0, data.length);
+         ReadableBuffer data = receiver.recv();
          receiver.advance();
 
          if (delivery.getRemoteState() instanceof TransactionalState) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1cf9ef1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 990a217..9b4704f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -68,14 +68,13 @@ import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Sender;
 import org.jboss.logging.Logger;
 
-import io.netty.buffer.ByteBuf;
-
 /**
  * TODO: Merge {@link ProtonServerSenderContext} and {@link org.apache.activemq.artemis.protocol.amqp.client.ProtonClientSenderContext} once we support 'global' link names. The split is a workaround for outgoing links
  */
@@ -692,10 +691,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
 
       // Let the Message decide how to present the message bytes
-      ByteBuf sendBuffer = message.getSendBuffer(deliveryCount);
+      ReadableBuffer sendBuffer = message.getSendBuffer(deliveryCount);
 
       try {
-         int size = sendBuffer.writerIndex();
+         int size = sendBuffer.remaining();
 
          while (!connection.tryLock(1, TimeUnit.SECONDS)) {
             if (closed || sender.getLocalState() == EndpointState.CLOSED) {
@@ -715,12 +714,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             delivery.setMessageFormat((int) message.getMessageFormat());
             delivery.setContext(messageReference);
 
-            if (sendBuffer.hasArray()) {
-               // this will avoid a copy.. patch provided by Norman using buffer.array()
-               sender.send(sendBuffer.array(), sendBuffer.arrayOffset() + sendBuffer.readerIndex(), sendBuffer.readableBytes());
-            } else {
-               sender.send(new NettyReadable(sendBuffer));
-            }
+            sender.sendNoCopy(sendBuffer);
 
             if (preSettle) {
                // Presettled means the client implicitly accepts any delivery we send it.
@@ -736,7 +730,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
          return size;
       } finally {
-         sendBuffer.release();
+         if (sendBuffer instanceof NettyReadable) {
+            ((NettyReadable) sendBuffer).getByteBuf().release();
+         }
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1cf9ef1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
index e0705b4..096d4a6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,15 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.activemq.artemis.protocol.amqp.util;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
 
-import io.netty.buffer.ByteBuf;
 import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import io.netty.buffer.ByteBuf;
 
+/**
+ * {@link ReadableBuffer} implementation that wraps a Netty {@link ByteBuf} to
+ * allow use of Netty buffers to be used when decoding AMQP messages.
+ */
 public class NettyReadable implements ReadableBuffer {
 
    private static final Charset Charset_UTF8 = Charset.forName("UTF-8");
@@ -33,9 +40,8 @@ public class NettyReadable implements ReadableBuffer {
       this.buffer = buffer;
    }
 
-   @Override
-   public void put(ReadableBuffer other) {
-      buffer.writeBytes(other.byteBuffer());
+   public ByteBuf getByteBuf() {
+      return this.buffer;
    }
 
    @Override
@@ -93,7 +99,8 @@ public class NettyReadable implements ReadableBuffer {
 
    @Override
    public ReadableBuffer flip() {
-      return new NettyReadable(buffer.duplicate().setIndex(0, buffer.readerIndex()));
+      buffer.setIndex(0, buffer.readerIndex());
+      return this;
    }
 
    @Override
@@ -136,4 +143,108 @@ public class NettyReadable implements ReadableBuffer {
    public String readUTF8() {
       return buffer.toString(Charset_UTF8);
    }
+
+   @Override
+   public byte[] array() {
+      return buffer.array();
+   }
+
+   @Override
+   public int arrayOffset() {
+      return buffer.arrayOffset() + buffer.readerIndex();
+   }
+
+   @Override
+   public int capacity() {
+      return buffer.capacity();
+   }
+
+   @Override
+   public ReadableBuffer clear() {
+      buffer.setIndex(0, buffer.capacity());
+      return this;
+   }
+
+   @Override
+   public ReadableBuffer reclaimRead() {
+      return this;
+   }
+
+   @Override
+   public byte get(int index) {
+      return buffer.getByte(index);
+   }
+
+   @Override
+   public boolean hasArray() {
+      return buffer.hasArray();
+   }
+
+   @Override
+   public ReadableBuffer mark() {
+      buffer.markReaderIndex();
+      return this;
+   }
+
+   @Override
+   public String readString(CharsetDecoder decoder) throws CharacterCodingException {
+      return buffer.toString(decoder.charset());
+   }
+
+   @Override
+   public ReadableBuffer reset() {
+      buffer.resetReaderIndex();
+      return this;
+   }
+
+   @Override
+   public ReadableBuffer rewind() {
+      buffer.setIndex(0, buffer.writerIndex());
+      return this;
+   }
+
+   @Override
+   public ReadableBuffer get(WritableBuffer target) {
+      int start = target.position();
+
+      if (buffer.hasArray()) {
+         target.put(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
+      } else {
+         target.put(buffer.nioBuffer());
+      }
+
+      int written = target.position() - start;
+
+      buffer.readerIndex(buffer.readerIndex() + written);
+
+      return this;
+   }
+
+   @Override
+   public String toString() {
+      return buffer.toString();
+   }
+
+   @Override
+   public int hashCode() {
+      return buffer.hashCode();
+   }
+
+   @Override
+   public boolean equals(Object other) {
+      if (this == other) {
+         return true;
+      }
+
+      if (!(other instanceof ReadableBuffer)) {
+         return false;
+      }
+
+      ReadableBuffer readable = (ReadableBuffer) other;
+      if (this.remaining() != readable.remaining()) {
+         return false;
+      }
+
+      return buffer.nioBuffer().equals(readable.byteBuffer());
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1cf9ef1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
index 75d39b6..659f35f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
@@ -18,13 +18,15 @@ package org.apache.activemq.artemis.protocol.amqp.util;
 
 import java.nio.ByteBuffer;
 
-import io.netty.buffer.ByteBuf;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
 
+import io.netty.buffer.ByteBuf;
+
 /**
- * This is to use NettyBuffer within Proton
+ * {@link WritableBuffer} implementation that wraps a Netty {@link ByteBuf} to
+ * allow use of Netty buffers to be used when encoding AMQP messages.
  */
-
 public class NettyWritable implements WritableBuffer {
 
    final ByteBuf nettyBuffer;
@@ -33,6 +35,10 @@ public class NettyWritable implements WritableBuffer {
       this.nettyBuffer = nettyBuffer;
    }
 
+   public ByteBuf getByteBuf() {
+      return nettyBuffer;
+   }
+
    @Override
    public void put(byte b) {
       nettyBuffer.writeByte(b);
@@ -75,7 +81,7 @@ public class NettyWritable implements WritableBuffer {
 
    @Override
    public int remaining() {
-      return nettyBuffer.capacity() - nettyBuffer.writerIndex();
+      return nettyBuffer.maxCapacity() - nettyBuffer.writerIndex();
    }
 
    @Override
@@ -93,8 +99,23 @@ public class NettyWritable implements WritableBuffer {
       nettyBuffer.writeBytes(payload);
    }
 
+   public void put(ByteBuf payload) {
+      nettyBuffer.writeBytes(payload);
+   }
+
    @Override
    public int limit() {
       return nettyBuffer.capacity();
    }
+
+   @Override
+   public void put(ReadableBuffer buffer) {
+      if (buffer.hasArray()) {
+         nettyBuffer.writeBytes(buffer.array(), buffer.arrayOffset(), buffer.remaining());
+      } else {
+         while (buffer.hasRemaining()) {
+            nettyBuffer.writeByte(buffer.get());
+         }
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1cf9ef1/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadableTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadableTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadableTest.java
new file mode 100644
index 0000000..437d57b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadableTest.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.util;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+/**
+ * Tests for the ReadableBuffer wrapper that uses Netty ByteBuf underneath
+ */
+public class NettyReadableTest {
+
+   @Test
+   public void testWrapBuffer() {
+      ByteBuf byteBuffer = Unpooled.buffer(100, 100);
+
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      assertEquals(100, buffer.capacity());
+      assertSame(byteBuffer, buffer.getByteBuf());
+      assertSame(buffer, buffer.reclaimRead());
+   }
+
+   @Test
+   public void testArrayAccess() {
+      ByteBuf byteBuffer = Unpooled.buffer(100, 100);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      assertTrue(buffer.hasArray());
+      assertSame(buffer.array(), byteBuffer.array());
+      assertEquals(buffer.arrayOffset(), byteBuffer.arrayOffset());
+   }
+
+   @Test
+   public void testArrayAccessWhenNoArray() {
+      ByteBuf byteBuffer = Unpooled.directBuffer();
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      assertFalse(buffer.hasArray());
+   }
+
+   @Test
+   public void testByteBuffer() {
+      byte[] data = new byte[] {0, 1, 2, 3, 4};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      ByteBuffer nioBuffer = buffer.byteBuffer();
+      assertEquals(data.length, nioBuffer.remaining());
+
+      for (int i = 0; i < data.length; i++) {
+         assertEquals(data[i], nioBuffer.get());
+      }
+   }
+
+   @Test
+   public void testGet() {
+      byte[] data = new byte[] {0, 1, 2, 3, 4};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      for (int i = 0; i < data.length; i++) {
+         assertEquals(data[i], buffer.get());
+      }
+
+      assertFalse(buffer.hasRemaining());
+
+      try {
+         buffer.get();
+         fail("Should throw an IndexOutOfBoundsException");
+      } catch (IndexOutOfBoundsException ioe) {
+      }
+   }
+
+   @Test
+   public void testGetIndex() {
+      byte[] data = new byte[] {0, 1, 2, 3, 4};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      for (int i = 0; i < data.length; i++) {
+         assertEquals(data[i], buffer.get(i));
+      }
+
+      assertTrue(buffer.hasRemaining());
+   }
+
+   @Test
+   public void testGetShort() {
+      byte[] data = new byte[] {0, 1};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      assertEquals(1, buffer.getShort());
+      assertFalse(buffer.hasRemaining());
+
+      try {
+         buffer.getShort();
+         fail("Should throw an IndexOutOfBoundsException");
+      } catch (IndexOutOfBoundsException ioe) {
+      }
+   }
+
+   @Test
+   public void testGetInt() {
+      byte[] data = new byte[] {0, 0, 0, 1};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      assertEquals(1, buffer.getInt());
+      assertFalse(buffer.hasRemaining());
+
+      try {
+         buffer.getInt();
+         fail("Should throw an IndexOutOfBoundsException");
+      } catch (IndexOutOfBoundsException ioe) {
+      }
+   }
+
+   @Test
+   public void testGetLong() {
+      byte[] data = new byte[] {0, 0, 0, 0, 0, 0, 0, 1};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      assertEquals(1, buffer.getLong());
+      assertFalse(buffer.hasRemaining());
+
+      try {
+         buffer.getLong();
+         fail("Should throw an IndexOutOfBoundsException");
+      } catch (IndexOutOfBoundsException ioe) {
+      }
+   }
+
+   @Test
+   public void testGetFloat() {
+      byte[] data = new byte[] {0, 0, 0, 0};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      assertEquals(0, buffer.getFloat(), 0.0);
+      assertFalse(buffer.hasRemaining());
+
+      try {
+         buffer.getFloat();
+         fail("Should throw an IndexOutOfBoundsException");
+      } catch (IndexOutOfBoundsException ioe) {
+      }
+   }
+
+   @Test
+   public void testGetDouble() {
+      byte[] data = new byte[] {0, 0, 0, 0, 0, 0, 0, 0};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      assertEquals(0, buffer.getDouble(), 0.0);
+      assertFalse(buffer.hasRemaining());
+
+      try {
+         buffer.getDouble();
+         fail("Should throw an IndexOutOfBoundsException");
+      } catch (IndexOutOfBoundsException ioe) {
+      }
+   }
+
+   @Test
+   public void testGetBytes() {
+      byte[] data = new byte[] {0, 1, 2, 3, 4};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      byte[] target = new byte[data.length];
+
+      buffer.get(target);
+      assertFalse(buffer.hasRemaining());
+      assertArrayEquals(data, target);
+
+      try {
+         buffer.get(target);
+         fail("Should throw an IndexOutOfBoundsException");
+      } catch (IndexOutOfBoundsException ioe) {
+      }
+   }
+
+   @Test
+   public void testGetBytesIntInt() {
+      byte[] data = new byte[] {0, 1, 2, 3, 4};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      byte[] target = new byte[data.length];
+
+      buffer.get(target, 0, target.length);
+      assertFalse(buffer.hasRemaining());
+      assertArrayEquals(data, target);
+
+      try {
+         buffer.get(target, 0, target.length);
+         fail("Should throw an IndexOutOfBoundsException");
+      } catch (IndexOutOfBoundsException ioe) {
+      }
+   }
+
+   @Test
+   public void testGetBytesToWritableBuffer() {
+      byte[] data = new byte[] {0, 1, 2, 3, 4};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+      ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length);
+      NettyWritable target = new NettyWritable(targetBuffer);
+
+      buffer.get(target);
+      assertFalse(buffer.hasRemaining());
+      assertArrayEquals(targetBuffer.array(), data);
+   }
+
+   @Test
+   public void testGetBytesToWritableBufferThatIsDirect() {
+      byte[] data = new byte[] {0, 1, 2, 3, 4};
+      ByteBuf byteBuffer = Unpooled.directBuffer(data.length, data.length);
+      byteBuffer.writeBytes(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+      ByteBuf targetBuffer = Unpooled.buffer(data.length, data.length);
+      NettyWritable target = new NettyWritable(targetBuffer);
+
+      buffer.get(target);
+      assertFalse(buffer.hasRemaining());
+
+      for (int i = 0; i < data.length; i++) {
+         assertEquals(data[i], target.getByteBuf().readByte());
+      }
+   }
+
+   @Test
+   public void testDuplicate() {
+      byte[] data = new byte[] {0, 1, 2, 3, 4};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      ReadableBuffer duplicate = buffer.duplicate();
+
+      for (int i = 0; i < data.length; i++) {
+         assertEquals(data[i], duplicate.get());
+      }
+
+      assertFalse(duplicate.hasRemaining());
+   }
+
+   @Test
+   public void testSlice() {
+      byte[] data = new byte[] {0, 1, 2, 3, 4};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      ReadableBuffer slice = buffer.slice();
+
+      for (int i = 0; i < data.length; i++) {
+         assertEquals(data[i], slice.get());
+      }
+
+      assertFalse(slice.hasRemaining());
+   }
+
+   @Test
+   public void testLimit() {
+      byte[] data = new byte[] {1, 2};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      assertEquals(data.length, buffer.limit());
+      buffer.limit(1);
+      assertEquals(1, buffer.limit());
+      assertEquals(1, buffer.get());
+      assertFalse(buffer.hasRemaining());
+
+      try {
+         buffer.get();
+         fail("Should throw an IndexOutOfBoundsException");
+      } catch (IndexOutOfBoundsException ioe) {
+      }
+   }
+
+   @Test
+   public void testClear() {
+      byte[] data = new byte[] {0, 1, 2, 3, 4};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      byte[] target = new byte[data.length];
+
+      buffer.get(target);
+      assertFalse(buffer.hasRemaining());
+      assertArrayEquals(data, target);
+
+      try {
+         buffer.get(target);
+         fail("Should throw an IndexOutOfBoundsException");
+      } catch (IndexOutOfBoundsException ioe) {
+      }
+
+      buffer.clear();
+      assertTrue(buffer.hasRemaining());
+      assertEquals(data.length, buffer.remaining());
+      buffer.get(target);
+      assertFalse(buffer.hasRemaining());
+      assertArrayEquals(data, target);
+   }
+
+   @Test
+   public void testRewind() {
+      byte[] data = new byte[] {0, 1, 2, 3, 4};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      for (int i = 0; i < data.length; i++) {
+         assertEquals(data[i], buffer.get());
+      }
+
+      assertFalse(buffer.hasRemaining());
+      buffer.rewind();
+      assertTrue(buffer.hasRemaining());
+
+      for (int i = 0; i < data.length; i++) {
+         assertEquals(data[i], buffer.get());
+      }
+   }
+
+   @Test
+   public void testReset() {
+      byte[] data = new byte[] {0, 1, 2, 3, 4};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      buffer.mark();
+
+      for (int i = 0; i < data.length; i++) {
+         assertEquals(data[i], buffer.get());
+      }
+
+      assertFalse(buffer.hasRemaining());
+      buffer.reset();
+      assertTrue(buffer.hasRemaining());
+
+      for (int i = 0; i < data.length; i++) {
+         assertEquals(data[i], buffer.get());
+      }
+   }
+
+   @Test
+   public void testGetPosition() {
+      byte[] data = new byte[] {0, 1, 2, 3, 4};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      assertEquals(buffer.position(), 0);
+      for (int i = 0; i < data.length; i++) {
+         assertEquals(buffer.position(), i);
+         assertEquals(data[i], buffer.get());
+         assertEquals(buffer.position(), i + 1);
+      }
+   }
+
+   @Test
+   public void testSetPosition() {
+      byte[] data = new byte[] {0, 1, 2, 3, 4};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      for (int i = 0; i < data.length; i++) {
+         assertEquals(data[i], buffer.get());
+      }
+
+      assertFalse(buffer.hasRemaining());
+      buffer.position(0);
+      assertTrue(buffer.hasRemaining());
+
+      for (int i = 0; i < data.length; i++) {
+         assertEquals(data[i], buffer.get());
+      }
+   }
+
+   @Test
+   public void testFlip() {
+      byte[] data = new byte[] {0, 1, 2, 3, 4};
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(data);
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      buffer.mark();
+
+      for (int i = 0; i < data.length; i++) {
+         assertEquals(data[i], buffer.get());
+      }
+
+      assertFalse(buffer.hasRemaining());
+      buffer.flip();
+      assertTrue(buffer.hasRemaining());
+
+      for (int i = 0; i < data.length; i++) {
+         assertEquals(data[i], buffer.get());
+      }
+   }
+
+   @Test
+   public void testReadUTF8() throws CharacterCodingException {
+      String testString = "test-string-1";
+      byte[] asUtf8bytes = testString.getBytes(StandardCharsets.UTF_8);
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(asUtf8bytes);
+
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      assertEquals(testString, buffer.readUTF8());
+   }
+
+   @Test
+   public void testReadString() throws CharacterCodingException {
+      String testString = "test-string-1";
+      byte[] asUtf8bytes = testString.getBytes(StandardCharsets.UTF_8);
+      ByteBuf byteBuffer = Unpooled.wrappedBuffer(asUtf8bytes);
+
+      NettyReadable buffer = new NettyReadable(byteBuffer);
+
+      assertEquals(testString, buffer.readString(StandardCharsets.UTF_8.newDecoder()));
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1cf9ef1/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritableTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritableTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritableTest.java
new file mode 100644
index 0000000..f0de51a
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritableTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+/**
+ * Tests for behavior of NettyWritable
+ */
+public class NettyWritableTest {
+
+   @Test
+   public void testGetBuffer() {
+      ByteBuf buffer = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(buffer);
+
+      assertSame(buffer, writable.getByteBuf());
+   }
+
+   @Test
+   public void testLimit() {
+      ByteBuf buffer = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(buffer);
+
+      assertEquals(buffer.capacity(), writable.limit());
+   }
+
+   @Test
+   public void testRemaining() {
+      ByteBuf buffer = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(buffer);
+
+      assertEquals(buffer.maxCapacity(), writable.remaining());
+      writable.put((byte) 0);
+      assertEquals(buffer.maxCapacity() - 1, writable.remaining());
+   }
+
+   @Test
+   public void testHasRemaining() {
+      ByteBuf buffer = Unpooled.buffer(100, 100);
+      NettyWritable writable = new NettyWritable(buffer);
+
+      assertTrue(writable.hasRemaining());
+      writable.put((byte) 0);
+      assertTrue(writable.hasRemaining());
+      buffer.writerIndex(buffer.maxCapacity());
+      assertFalse(writable.hasRemaining());
+   }
+
+   @Test
+   public void testGetPosition() {
+      ByteBuf buffer = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(buffer);
+
+      assertEquals(0, writable.position());
+      writable.put((byte) 0);
+      assertEquals(1, writable.position());
+   }
+
+   @Test
+   public void testSetPosition() {
+      ByteBuf buffer = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(buffer);
+
+      assertEquals(0, writable.position());
+      writable.position(1);
+      assertEquals(1, writable.position());
+   }
+
+   @Test
+   public void testPutByteBuffer() {
+      ByteBuffer input = ByteBuffer.allocate(1024);
+      input.put((byte) 1);
+      input.flip();
+
+      ByteBuf buffer = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(buffer);
+
+      assertEquals(0, writable.position());
+      writable.put(input);
+      assertEquals(1, writable.position());
+   }
+
+   @Test
+   public void testPutByteBuf() {
+      ByteBuf input = Unpooled.buffer();
+      input.writeByte((byte) 1);
+
+      ByteBuf buffer = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(buffer);
+
+      assertEquals(0, writable.position());
+      writable.put(input);
+      assertEquals(1, writable.position());
+   }
+
+   @Test
+   public void testPutReadableBuffer() {
+      doPutReadableBufferTestImpl(true);
+      doPutReadableBufferTestImpl(false);
+   }
+
+   private void doPutReadableBufferTestImpl(boolean readOnly) {
+      ByteBuffer buf = ByteBuffer.allocate(1024);
+      buf.put((byte) 1);
+      buf.flip();
+      if (readOnly) {
+         buf = buf.asReadOnlyBuffer();
+      }
+
+      ReadableBuffer input = new ReadableBuffer.ByteBufferReader(buf);
+
+      if (readOnly) {
+         assertFalse("Expected buffer not to hasArray()", input.hasArray());
+      } else {
+         assertTrue("Expected buffer to hasArray()", input.hasArray());
+      }
+
+      ByteBuf buffer = Unpooled.buffer(1024);
+      NettyWritable writable = new NettyWritable(buffer);
+
+      assertEquals(0, writable.position());
+      writable.put(input);
+      assertEquals(1, writable.position());
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1cf9ef1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2fa7b0b..b1e133e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,10 +92,10 @@
       <maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
       <mockito.version>2.8.47</mockito.version>
       <netty.version>4.1.22.Final</netty.version>
-      <proton.version>0.26.0</proton.version>
+      <proton.version>0.27.1</proton.version>
       <resteasy.version>3.0.19.Final</resteasy.version>
       <slf4j.version>1.7.21</slf4j.version>
-      <qpid.jms.version>0.30.0</qpid.jms.version>
+      <qpid.jms.version>0.32.0</qpid.jms.version>
       <johnzon.version>0.9.5</johnzon.version>
       <json-p.spec.version>1.0-alpha-1</json-p.spec.version>
       <javax.inject.version>1</javax.inject.version>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c1cf9ef1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
index d70c700..7e80c8f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
@@ -17,12 +17,15 @@
 package org.apache.activemq.artemis.tests.integration.amqp;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -46,11 +49,18 @@ import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AmqpLargeMessageTest extends AmqpClientTestSupport {
 
-   private static final int FRAME_SIZE = 10024;
+   protected static final Logger LOG = LoggerFactory.getLogger(AmqpLargeMessageTest.class);
+
+   private final Random rand = new Random(System.currentTimeMillis());
+
+   private static final int FRAME_SIZE = 32767;
    private static final int PAYLOAD = 110 * 1024;
 
    String testQueueName = "ConnectionFrameSize";
@@ -232,6 +242,89 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
       receiveJMS(nMsgs, factory);
    }
 
+   private byte[] createLargePayload(int sizeInBytes) {
+      byte[] payload = new byte[sizeInBytes];
+      for (int i = 0; i < sizeInBytes; i++) {
+         payload[i] = (byte) rand.nextInt(256);
+      }
+
+      LOG.debug("Created buffer with size : " + sizeInBytes + " bytes");
+      return payload;
+   }
+
+   @Test(timeout = 60000)
+   public void testSendSmallerMessages() throws Exception {
+      for (int i = 512; i <= (8 * 1024); i += 512) {
+         doTestSendLargeMessage(i);
+      }
+   }
+
+   @Test(timeout = 120000)
+   public void testSendFixedSizedMessages() throws Exception {
+      doTestSendLargeMessage(65536);
+      doTestSendLargeMessage(65536 * 2);
+      doTestSendLargeMessage(65536 * 4);
+   }
+
+   @Test(timeout = 120000)
+   public void testSend1MBMessage() throws Exception {
+      doTestSendLargeMessage(1024 * 1024);
+   }
+
+   @Ignore("Useful for performance testing")
+   @Test(timeout = 120000)
+   public void testSend10MBMessage() throws Exception {
+      doTestSendLargeMessage(1024 * 1024 * 10);
+   }
+
+   @Ignore("Useful for performance testing")
+   @Test(timeout = 120000)
+   public void testSend100MBMessage() throws Exception {
+      doTestSendLargeMessage(1024 * 1024 * 100);
+   }
+
+   public void doTestSendLargeMessage(int expectedSize) throws Exception {
+      LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
+      byte[] payload = createLargePayload(expectedSize);
+      assertEquals(expectedSize, payload.length);
+
+      ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
+      try (Connection connection = factory.createConnection()) {
+
+         long startTime = System.currentTimeMillis();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(name.getMethodName());
+         MessageProducer producer = session.createProducer(queue);
+         BytesMessage message = session.createBytesMessage();
+         message.writeBytes(payload);
+         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+         // Set this to non-default to get a Header in the encoded message.
+         producer.setPriority(4);
+         producer.send(message);
+         long endTime = System.currentTimeMillis();
+
+         LOG.info("Returned from send after {} ms", endTime - startTime);
+         startTime = System.currentTimeMillis();
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+
+         LOG.info("Calling receive");
+         Message received = consumer.receive();
+         assertNotNull(received);
+         assertTrue(received instanceof BytesMessage);
+         BytesMessage bytesMessage = (BytesMessage) received;
+         assertNotNull(bytesMessage);
+         endTime = System.currentTimeMillis();
+
+         LOG.info("Returned from receive after {} ms", endTime - startTime);
+         byte[] bytesReceived = new byte[expectedSize];
+         assertEquals(expectedSize, bytesMessage.readBytes(bytesReceived, expectedSize));
+         assertTrue(Arrays.equals(payload, bytesReceived));
+         connection.close();
+      }
+   }
+
    private void sendObjectMessages(int nMsgs, ConnectionFactory factory) throws Exception {
       try (Connection connection = factory.createConnection()) {
          Session session = connection.createSession();


[2/2] activemq-artemis git commit: This closes #2064

Posted by cl...@apache.org.
This closes #2064


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/410cb9ee
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/410cb9ee
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/410cb9ee

Branch: refs/heads/master
Commit: 410cb9ee23f205ebdfb520ecf0699a7ceefd7119
Parents: b60f648 c1cf9ef
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu May 3 12:10:26 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu May 3 12:10:26 2018 -0400

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPMessage.java       |  97 ++--
 .../amqp/broker/AMQPSessionCallback.java        |   3 +-
 .../amqp/proton/AMQPConnectionContext.java      |  15 +-
 .../proton/ProtonServerReceiverContext.java     |   5 +-
 .../amqp/proton/ProtonServerSenderContext.java  |  18 +-
 .../protocol/amqp/util/NettyReadable.java       | 125 ++++-
 .../protocol/amqp/util/NettyWritable.java       |  29 +-
 .../protocol/amqp/util/NettyReadableTest.java   | 454 +++++++++++++++++++
 .../protocol/amqp/util/NettyWritableTest.java   | 151 ++++++
 pom.xml                                         |   4 +-
 .../integration/amqp/AmqpLargeMessageTest.java  |  95 +++-
 11 files changed, 917 insertions(+), 79 deletions(-)
----------------------------------------------------------------------