You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/01/18 22:05:54 UTC

[1/2] activemq-artemis git commit: ARTEMIS-928 Changing Netty and InVM to copy buffers, and retain them on the Netty Polls.

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 7a7f33527 -> fbc77b44c


ARTEMIS-928 Changing Netty and InVM to copy buffers, and retain them on the Netty Polls.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3347a4fd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3347a4fd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3347a4fd

Branch: refs/heads/master
Commit: 3347a4fd2716c7c998a0ef68499f4dc3d8106241
Parents: 7a7f335
Author: Will Reichert <wi...@gmail.com>
Authored: Wed Jan 4 10:14:16 2017 -0600
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Jan 18 16:59:32 2017 -0500

----------------------------------------------------------------------
 .../artemis/api/core/ActiveMQBuffer.java        |  6 ++++
 .../artemis/api/core/ActiveMQBuffers.java       |  9 +++++
 .../core/buffers/impl/ChannelBufferWrapper.java | 29 +++++++++++++--
 .../CompressedLargeMessageControllerImpl.java   |  5 +++
 .../client/impl/LargeMessageControllerImpl.java |  5 +++
 .../artemis/core/message/impl/MessageImpl.java  | 37 +++++++++++++++++---
 .../artemis/core/protocol/core/Packet.java      |  9 +++++
 .../core/impl/ActiveMQSessionContext.java       |  2 ++
 .../core/protocol/core/impl/ChannelImpl.java    |  6 ++++
 .../core/protocol/core/impl/PacketImpl.java     |  8 ++++-
 .../impl/wireformat/SessionReceiveMessage.java  |  2 +-
 .../impl/wireformat/SessionSendMessage.java     |  2 +-
 .../remoting/impl/netty/NettyConnection.java    |  5 +++
 .../protocol/AbstractRemotingConnection.java    |  5 +++
 .../spi/core/protocol/RemotingConnection.java   |  2 ++
 .../artemis/spi/core/remoting/Connection.java   |  2 ++
 .../amqp/converter/TestConversions.java         |  5 +++
 .../core/protocol/mqtt/MQTTConnection.java      |  5 +++
 .../core/protocol/stomp/StompConnection.java    |  5 +++
 .../core/impl/wireformat/QuorumVoteMessage.java |  6 ++++
 .../core/remoting/impl/invm/InVMConnection.java | 23 ++++++++++--
 .../unit/core/message/impl/MessageImplTest.java |  7 +++-
 22 files changed, 171 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
index c129e3a..f30ef35 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
@@ -1149,4 +1149,10 @@ public interface ActiveMQBuffer extends DataInput {
     * @return A converted NIO Buffer
     */
    ByteBuffer toByteBuffer(int index, int length);
+
+   /**
+   * Release any underlying resources held by this buffer
+   */
+   void release();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
index 22f0f9a..849c921 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffers.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.api.core;
 
 import java.nio.ByteBuffer;
 
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 
@@ -26,6 +27,9 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
  */
 public final class ActiveMQBuffers {
 
+
+   private static final PooledByteBufAllocator ALLOCATOR = new PooledByteBufAllocator();
+
    /**
     * Creates a <em>self-expanding</em> ActiveMQBuffer with the given initial size
     *
@@ -36,6 +40,11 @@ public final class ActiveMQBuffers {
       return new ChannelBufferWrapper(Unpooled.buffer(size));
    }
 
+   public static ActiveMQBuffer pooledBuffer(final int size) {
+      return new ChannelBufferWrapper(ALLOCATOR.heapBuffer(size),true, true);
+   }
+
+
    /**
     * Creates a <em>self-expanding</em> ActiveMQBuffer filled with the given byte array
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
index 60262f8..c75be21 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
@@ -31,7 +31,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
 
    protected ByteBuf buffer; // NO_UCD (use final)
    private final boolean releasable;
-
+   private final boolean isPooled;
    public static ByteBuf unwrap(ByteBuf buffer) {
       ByteBuf parent;
       while ((parent = buffer.unwrap()) != null && parent != buffer) { // this last part is just in case the semantic
@@ -45,14 +45,18 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
    public ChannelBufferWrapper(final ByteBuf buffer) {
       this(buffer, false);
    }
-
    public ChannelBufferWrapper(final ByteBuf buffer, boolean releasable) {
+      this(buffer, releasable, false);
+   }
+   public ChannelBufferWrapper(final ByteBuf buffer, boolean releasable, boolean pooled) {
       if (!releasable) {
          this.buffer = Unpooled.unreleasableBuffer(buffer);
       } else {
          this.buffer = buffer;
       }
       this.releasable = releasable;
+      this.isPooled = pooled;
+
    }
 
    @Override
@@ -398,7 +402,19 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
 
    @Override
    public ActiveMQBuffer readSlice(final int length) {
-      return new ChannelBufferWrapper(buffer.readSlice(length), releasable);
+      if ( isPooled ) {
+         ByteBuf fromBuffer = buffer.readSlice(length);
+         ByteBuf newNettyBuffer = Unpooled.buffer(fromBuffer.capacity());
+         int read = fromBuffer.readerIndex();
+         int writ = fromBuffer.writerIndex();
+         fromBuffer.readerIndex(0);
+         fromBuffer.readBytes(newNettyBuffer,0,writ);
+         newNettyBuffer.setIndex(read,writ);
+         ActiveMQBuffer returnBuffer = new ChannelBufferWrapper(newNettyBuffer,releasable,false);
+         returnBuffer.setIndex(read,writ);
+         return returnBuffer;
+      }
+      return new ChannelBufferWrapper(buffer.readSlice(length), releasable, isPooled);
    }
 
    @Override
@@ -523,6 +539,13 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
    }
 
    @Override
+   public void release() {
+      if ( this.isPooled ) {
+         buffer.release();
+      }
+   }
+
+   @Override
    public boolean writable() {
       return buffer.isWritable();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
index 24b7f1e..55f9129 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
@@ -203,6 +203,11 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
    }
 
    @Override
+   public void release() {
+      //no-op
+   }
+
+   @Override
    public int readerIndex() {
       return 0;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index 5a27f24..951aea2 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -525,6 +525,11 @@ public class LargeMessageControllerImpl implements LargeMessageController {
    }
 
    @Override
+   public void release() {
+      //no-op
+   }
+
+   @Override
    public int readerIndex() {
       return (int) readerIndex;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
index 921f97d..1e19817 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Set;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -76,9 +77,9 @@ public abstract class MessageImpl implements MessageInternal {
 
    protected byte priority;
 
-   protected ActiveMQBuffer buffer;
+   protected volatile ActiveMQBuffer buffer;
 
-   protected ResetLimitWrappedActiveMQBuffer bodyBuffer;
+   protected volatile ResetLimitWrappedActiveMQBuffer bodyBuffer;
 
    protected volatile boolean bufferValid;
 
@@ -434,12 +435,16 @@ public abstract class MessageImpl implements MessageInternal {
 
    @Override
    public void decodeFromBuffer(final ActiveMQBuffer buffer) {
-      this.buffer = buffer;
+
+      this.buffer = copyMessageBuffer(buffer);
 
       decode();
 
+      //synchronize indexes
+      buffer.setIndex(this.buffer.readerIndex(),this.buffer.writerIndex());
+
       // Setting up the BodyBuffer based on endOfBodyPosition set from decode
-      ResetLimitWrappedActiveMQBuffer tmpbodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, buffer, null);
+      ResetLimitWrappedActiveMQBuffer tmpbodyBuffer = new ResetLimitWrappedActiveMQBuffer(BODY_OFFSET, this.buffer, null);
       tmpbodyBuffer.readerIndex(BODY_OFFSET);
       tmpbodyBuffer.writerIndex(endOfBodyPosition);
       // only set this after the writer and reader is set,
@@ -449,6 +454,30 @@ public abstract class MessageImpl implements MessageInternal {
 
    }
 
+   private ActiveMQBuffer copyMessageBuffer(ActiveMQBuffer buffer) {
+      ActiveMQBuffer copiedBuffer;
+
+      ByteBuf newNettyBuffer = Unpooled.buffer( buffer.byteBuf().capacity() );
+
+      int read = buffer.byteBuf().readerIndex();
+      int writ = buffer.byteBuf().writerIndex();
+
+      int readArt = buffer.readerIndex();
+      int writArt = buffer.writerIndex();
+      buffer.byteBuf().readerIndex( 0 );
+
+      buffer.byteBuf().readBytes( newNettyBuffer, 0, buffer.byteBuf().writerIndex() );
+      buffer.byteBuf().setIndex( read, writ );
+      newNettyBuffer.setIndex( read, writ );
+
+      copiedBuffer = new ChannelBufferWrapper( newNettyBuffer );
+
+      buffer.setIndex( readArt, writArt );
+      copiedBuffer.setIndex( readArt, writArt );
+
+      return copiedBuffer;
+   }
+
    @Override
    public void bodyChanged() {
       bufferValid = false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
index ddb734e..d7ae5b3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
@@ -64,6 +64,15 @@ public interface Packet {
    ActiveMQBuffer encode(RemotingConnection connection);
 
    /**
+    * Encodes the packet and returns a {@link ActiveMQBuffer} containing the data
+    *
+    * @param connection the connection
+    * @param usePooled if the returned buffer should be pooled or unpooled
+    * @return the buffer to encode to
+    */
+   ActiveMQBuffer encode(RemotingConnection connection, boolean usePooled);
+
+   /**
     * decodes the buffer into this packet
     *
     * @param buffer the buffer to decode from

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 92f291c..0bd1cff 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -919,6 +919,8 @@ public class ActiveMQSessionContext extends SessionContext {
       ActiveMQBuffer buffer = packet.encode(this.getCoreConnection());
 
       conn.write(buffer, false, false);
+
+      buffer.release();
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 5f20f46..5d1e37a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -305,6 +305,8 @@ public final class ChannelImpl implements Channel {
          // buffer is full, preventing any incoming buffers being handled and blocking failover
          connection.getTransportConnection().write(buffer, flush, batch);
 
+         buffer.release();
+
          return true;
       }
    }
@@ -412,6 +414,7 @@ public final class ChannelImpl implements Channel {
             }
          } finally {
             lock.unlock();
+            buffer.release();
          }
 
          return response;
@@ -634,6 +637,9 @@ public final class ChannelImpl implements Channel {
       final ActiveMQBuffer buffer = packet.encode(connection);
 
       connection.getTransportConnection().write(buffer, false, false);
+
+      buffer.release();
+
    }
 
    private void addResendPacket(Packet packet) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 646eb28..9025210 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -304,7 +304,13 @@ public class PacketImpl implements Packet {
 
    @Override
    public ActiveMQBuffer encode(final RemotingConnection connection) {
-      ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE);
+      return encode(connection,true);
+   }
+
+
+   @Override
+   public ActiveMQBuffer encode(final RemotingConnection connection, boolean usePooled) {
+      ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE, usePooled);
 
       // The standard header fields
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
index 02ed2bf..ce76186 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
@@ -56,7 +56,7 @@ public class SessionReceiveMessage extends MessagePacket {
    public ActiveMQBuffer encode(final RemotingConnection connection) {
       ActiveMQBuffer buffer = message.getEncodedBuffer();
 
-      ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex());
+      ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex(), true);
       bufferWrite.writeBytes(buffer, 0, bufferWrite.capacity());
       bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
index 91d43a5..c7bb30e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
@@ -68,7 +68,7 @@ public class SessionSendMessage extends MessagePacket {
          // this is for unit tests only
          bufferWrite = buffer.copy(0, buffer.capacity());
       } else {
-         bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1); // 1 for the requireResponse
+         bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1, true); // 1 for the requireResponse
       }
       bufferWrite.writeBytes(buffer, 0, buffer.writerIndex());
       bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 0b307ef..33dbf4b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -210,6 +210,11 @@ public class NettyConnection implements Connection {
 
    @Override
    public ActiveMQBuffer createTransportBuffer(final int size) {
+      return createTransportBuffer(size, false);
+   }
+
+   @Override
+   public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
       return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
index a9e12aa..6884243 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
@@ -179,6 +179,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
    }
 
    @Override
+   public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
+      return transportConnection.createTransportBuffer(size, pooled);
+   }
+
+   @Override
    public Connection getTransportConnection() {
       return transportConnection;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
index 39ecdf6..a68999b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
@@ -120,6 +120,8 @@ public interface RemotingConnection extends BufferHandler {
     */
    ActiveMQBuffer createTransportBuffer(int size);
 
+   ActiveMQBuffer createTransportBuffer(int size, boolean pooled);
+
    /**
     * called when the underlying connection fails.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index 7ab0c40..a5fcf87 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -35,6 +35,8 @@ public interface Connection {
     */
    ActiveMQBuffer createTransportBuffer(int size);
 
+   ActiveMQBuffer createTransportBuffer(int size, boolean pooled);
+
    RemotingConnection getProtocolConnection();
 
    void setProtocolConnection(RemotingConnection connection);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index 6beee36..08c46be 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -765,5 +765,10 @@ public class TestConversions extends Assert {
       public ByteBuffer toByteBuffer(int index, int length) {
          return null;
       }
+
+      @Override
+      public void release() {
+         //no-op
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
index 446e362..6143cf7 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
@@ -132,6 +132,11 @@ public class MQTTConnection implements RemotingConnection {
 
    @Override
    public ActiveMQBuffer createTransportBuffer(int size) {
+      return createTransportBuffer(size, false);
+   }
+
+   @Override
+   public ActiveMQBuffer createTransportBuffer(int size, boolean pooled) {
       return transportConnection.createTransportBuffer(size);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 5dafe60..0eb81b9 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -293,6 +293,11 @@ public final class StompConnection implements RemotingConnection {
 
    @Override
    public ActiveMQBuffer createTransportBuffer(int size) {
+      return createTransportBuffer(size, false);
+   }
+
+   @Override
+   public ActiveMQBuffer createTransportBuffer(int size, boolean pooled) {
       return ActiveMQBuffers.dynamicBuffer(size);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java
index f808349..78ebcb9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java
@@ -21,6 +21,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler;
 import org.apache.activemq.artemis.core.server.cluster.qourum.Vote;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 
 public class QuorumVoteMessage extends PacketImpl {
 
@@ -41,6 +42,11 @@ public class QuorumVoteMessage extends PacketImpl {
    }
 
    @Override
+   public ActiveMQBuffer encode(final RemotingConnection connection) {
+      return encode(connection,false);
+   }
+
+   @Override
    public void encodeRest(ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
       buffer.writeSimpleString(handler);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index 27fc544..24931d3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -146,7 +146,16 @@ public class InVMConnection implements Connection {
 
    @Override
    public ActiveMQBuffer createTransportBuffer(final int size) {
-      return ActiveMQBuffers.dynamicBuffer(size);
+      return createTransportBuffer(size, false);
+   }
+
+   @Override
+   public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
+      if ( pooled ) {
+         return ActiveMQBuffers.pooledBuffer( size );
+      } else {
+         return ActiveMQBuffers.dynamicBuffer( size );
+      }
    }
 
    @Override
@@ -173,9 +182,13 @@ public class InVMConnection implements Connection {
                      final boolean flush,
                      final boolean batch,
                      final ChannelFutureListener futureListener) {
-      final ActiveMQBuffer copied = buffer.copy(0, buffer.capacity());
 
-      copied.setIndex(buffer.readerIndex(), buffer.writerIndex());
+      final ActiveMQBuffer copied = ActiveMQBuffers.pooledBuffer(buffer.capacity());
+      int read = buffer.readerIndex();
+      int writ = buffer.writerIndex();
+      copied.writeBytes(buffer,read,writ - read);
+      copied.setIndex(read,writ);
+      buffer.setIndex(read,writ);
 
       try {
          executor.execute(new Runnable() {
@@ -201,6 +214,10 @@ public class InVMConnection implements Connection {
                   if (logger.isTraceEnabled()) {
                      logger.trace(InVMConnection.this + "::packet sent done");
                   }
+                  copied.release();
+//                  if ( copied.byteBuf().refCnt() > 0 ) {
+//                     copied.release();
+//                  }
                }
             }
          });

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3347a4fd/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java
index c559bf9..252b0eb 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/message/impl/MessageImplTest.java
@@ -286,13 +286,18 @@ public class MessageImplTest extends ActiveMQTestBase {
             }
 
             for (int i = 0; i < RUNS; i++) {
+               ActiveMQBuffer buf = null;
                try {
                   SessionSendMessage ssm = new SessionSendMessage(msg);
-                  ActiveMQBuffer buf = ssm.encode(null);
+                  buf = ssm.encode(null);
                   simulateRead(buf);
                } catch (Throwable e) {
                   e.printStackTrace();
                   errors.incrementAndGet();
+               } finally {
+                  if ( buf != null ) {
+                     buf.release();
+                  }
                }
             }
          }


[2/2] activemq-artemis git commit: This closes #970

Posted by cl...@apache.org.
This closes #970


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fbc77b44
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fbc77b44
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fbc77b44

Branch: refs/heads/master
Commit: fbc77b44c2bf72cede87b7dd3b74ba2604ccb7b6
Parents: 7a7f335 3347a4f
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Jan 18 17:05:47 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Jan 18 17:05:47 2017 -0500

----------------------------------------------------------------------
 .../artemis/api/core/ActiveMQBuffer.java        |  6 ++++
 .../artemis/api/core/ActiveMQBuffers.java       |  9 +++++
 .../core/buffers/impl/ChannelBufferWrapper.java | 29 +++++++++++++--
 .../CompressedLargeMessageControllerImpl.java   |  5 +++
 .../client/impl/LargeMessageControllerImpl.java |  5 +++
 .../artemis/core/message/impl/MessageImpl.java  | 37 +++++++++++++++++---
 .../artemis/core/protocol/core/Packet.java      |  9 +++++
 .../core/impl/ActiveMQSessionContext.java       |  2 ++
 .../core/protocol/core/impl/ChannelImpl.java    |  6 ++++
 .../core/protocol/core/impl/PacketImpl.java     |  8 ++++-
 .../impl/wireformat/SessionReceiveMessage.java  |  2 +-
 .../impl/wireformat/SessionSendMessage.java     |  2 +-
 .../remoting/impl/netty/NettyConnection.java    |  5 +++
 .../protocol/AbstractRemotingConnection.java    |  5 +++
 .../spi/core/protocol/RemotingConnection.java   |  2 ++
 .../artemis/spi/core/remoting/Connection.java   |  2 ++
 .../amqp/converter/TestConversions.java         |  5 +++
 .../core/protocol/mqtt/MQTTConnection.java      |  5 +++
 .../core/protocol/stomp/StompConnection.java    |  5 +++
 .../core/impl/wireformat/QuorumVoteMessage.java |  6 ++++
 .../core/remoting/impl/invm/InVMConnection.java | 23 ++++++++++--
 .../unit/core/message/impl/MessageImplTest.java |  7 +++-
 22 files changed, 171 insertions(+), 14 deletions(-)
----------------------------------------------------------------------