You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2017/04/03 11:59:18 UTC

[1/3] activemq-artemis git commit: ARTEMIS-1056 Removing PartialPooledByteBufAllocator

Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x 200088778 -> 7929fff89


ARTEMIS-1056 Removing PartialPooledByteBufAllocator


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c2989e17
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c2989e17
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c2989e17

Branch: refs/heads/1.x
Commit: c2989e1742f499b1a46237071ffdcbfc0946b029
Parents: 2000887
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Mar 23 10:31:24 2017 -0400
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Mon Apr 3 12:56:21 2017 +0100

----------------------------------------------------------------------
 .../impl/netty/ActiveMQChannelHandler.java      |   6 +-
 .../remoting/impl/netty/NettyConnection.java    |   3 +-
 .../remoting/impl/netty/NettyConnector.java     |   1 -
 .../netty/PartialPooledByteBufAllocator.java    | 138 -------------------
 .../artemis/core/protocol/ProtocolHandler.java  |   5 -
 .../core/remoting/impl/netty/NettyAcceptor.java |   1 -
 .../client/transport/NettyTcpTransport.java     |   1 -
 .../amqp/client/transport/NettyWSTransport.java |   1 -
 .../PartialPooledByteBufAllocator.java          | 133 ------------------
 .../NettyConnectorWithHTTPUpgradeTest.java      |   3 -
 10 files changed, 7 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2989e17/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
index 93be281..cc4407c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
@@ -65,7 +65,11 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
       ByteBuf buffer = (ByteBuf) msg;
 
-      handler.bufferReceived(channelId(ctx.channel()), new ChannelBufferWrapper(buffer));
+      try {
+         handler.bufferReceived(channelId(ctx.channel()), new ChannelBufferWrapper(buffer));
+      } finally {
+         buffer.release();
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2989e17/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index 33dbf4b..c3a71c5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.Semaphore;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
@@ -215,7 +216,7 @@ public class NettyConnection implements Connection {
 
    @Override
    public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
-      return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true);
+      return new ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size), true);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2989e17/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index 3f226ae..6211e8f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -407,7 +407,6 @@ public class NettyConnector extends AbstractConnector {
       }
       bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
       bootstrap.option(ChannelOption.SO_REUSEADDR, true);
-      bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
       channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE);
 
       final SSLContext context;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2989e17/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
deleted file mode 100644
index 5e67952..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.remoting.impl.netty;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.UnpooledByteBufAllocator;
-
-/**
- * A {@link ByteBufAllocator} which is partial pooled. Which means only direct {@link ByteBuf}s are pooled. The rest
- * is unpooled.
- */
-public class PartialPooledByteBufAllocator implements ByteBufAllocator {
-
-   private static final ByteBufAllocator POOLED = PooledByteBufAllocator.DEFAULT;
-   private static final ByteBufAllocator UNPOOLED = new UnpooledByteBufAllocator(false);
-
-   public static final PartialPooledByteBufAllocator INSTANCE = new PartialPooledByteBufAllocator();
-
-   private PartialPooledByteBufAllocator() {
-   }
-
-   @Override
-   public ByteBuf buffer() {
-      return UNPOOLED.heapBuffer();
-   }
-
-   @Override
-   public ByteBuf buffer(int initialCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity);
-   }
-
-   @Override
-   public ByteBuf buffer(int initialCapacity, int maxCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
-   }
-
-   @Override
-   public ByteBuf ioBuffer() {
-      return UNPOOLED.heapBuffer();
-   }
-
-   @Override
-   public ByteBuf ioBuffer(int initialCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity);
-   }
-
-   @Override
-   public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
-   }
-
-   @Override
-   public ByteBuf heapBuffer() {
-      return UNPOOLED.heapBuffer();
-   }
-
-   @Override
-   public ByteBuf heapBuffer(int initialCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity);
-   }
-
-   @Override
-   public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
-   }
-
-   @Override
-   public ByteBuf directBuffer() {
-      return POOLED.directBuffer();
-   }
-
-   @Override
-   public ByteBuf directBuffer(int initialCapacity) {
-      return POOLED.directBuffer(initialCapacity);
-   }
-
-   @Override
-   public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
-      return POOLED.directBuffer(initialCapacity, maxCapacity);
-   }
-
-   @Override
-   public CompositeByteBuf compositeBuffer() {
-      return UNPOOLED.compositeHeapBuffer();
-   }
-
-   @Override
-   public CompositeByteBuf compositeBuffer(int maxNumComponents) {
-      return UNPOOLED.compositeHeapBuffer(maxNumComponents);
-   }
-
-   @Override
-   public CompositeByteBuf compositeHeapBuffer() {
-      return UNPOOLED.compositeHeapBuffer();
-   }
-
-   @Override
-   public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
-      return UNPOOLED.compositeHeapBuffer(maxNumComponents);
-   }
-
-   @Override
-   public CompositeByteBuf compositeDirectBuffer() {
-      return POOLED.compositeDirectBuffer();
-   }
-
-   @Override
-   public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
-      return POOLED.compositeDirectBuffer();
-   }
-
-   @Override
-   public boolean isDirectBufferPooled() {
-      return true;
-   }
-
-   @Override
-   public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
-      return UNPOOLED.calculateNewCapacity(minNewCapacity, maxCapacity);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2989e17/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
index 340861b..ca78f29 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
@@ -44,7 +44,6 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.HttpKeepAliveRunnabl
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
-import org.apache.activemq.artemis.core.remoting.impl.netty.PartialPooledByteBufAllocator;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketServerHandler;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -185,10 +184,6 @@ public class ProtocolHandler {
          protocolManagerToUse.handshake(connection, new ChannelBufferWrapper(in));
          pipeline.remove(this);
 
-         // https://issues.apache.org/jira/browse/ARTEMIS-392
-         // Application servers or other components may upgrade a regular socket to Netty
-         // We need to be able to work normally as with anything else on Artemis
-         ctx.channel().config().setAllocator(PartialPooledByteBufAllocator.INSTANCE);
          ctx.flush();
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2989e17/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index 9088e57..df08108 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -319,7 +319,6 @@ public class NettyAcceptor extends AbstractAcceptor {
       bootstrap.option(ChannelOption.SO_REUSEADDR, true);
       bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
       bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
-      bootstrap.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
       channelGroup = new DefaultChannelGroup("activemq-accepted-channels", GlobalEventExecutor.INSTANCE);
 
       serverChannelGroup = new DefaultChannelGroup("activemq-acceptor-channels", GlobalEventExecutor.INSTANCE);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2989e17/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
index d4b9f54..29963a0 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
@@ -268,7 +268,6 @@ public class NettyTcpTransport implements NettyTransport {
       bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
       bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
       bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
-      bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
 
       if (options.getSendBufferSize() != -1) {
          bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2989e17/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
index 8a34a4b..f75a52e 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
@@ -285,7 +285,6 @@ public class NettyWSTransport implements NettyTransport {
       bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
       bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
       bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
-      bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
 
       if (options.getSendBufferSize() != -1) {
          bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2989e17/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
deleted file mode 100644
index 12f5568..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.CompositeByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.UnpooledByteBufAllocator;
-
-/**
- * A {@link ByteBufAllocator} which is partial pooled. Which means only direct
- * {@link ByteBuf}s are pooled. The rest is unpooled.
- */
-public class PartialPooledByteBufAllocator implements ByteBufAllocator {
-
-   private static final ByteBufAllocator POOLED = PooledByteBufAllocator.DEFAULT;
-   private static final ByteBufAllocator UNPOOLED = new UnpooledByteBufAllocator(false);
-
-   public static final PartialPooledByteBufAllocator INSTANCE = new PartialPooledByteBufAllocator();
-
-   private PartialPooledByteBufAllocator() {
-   }
-
-   @Override
-   public ByteBuf buffer() {
-      return UNPOOLED.heapBuffer();
-   }
-
-   @Override
-   public ByteBuf buffer(int initialCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity);
-   }
-
-   @Override
-   public ByteBuf buffer(int initialCapacity, int maxCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
-   }
-
-   @Override
-   public ByteBuf ioBuffer() {
-      return UNPOOLED.heapBuffer();
-   }
-
-   @Override
-   public ByteBuf ioBuffer(int initialCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity);
-   }
-
-   @Override
-   public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
-   }
-
-   @Override
-   public ByteBuf heapBuffer() {
-      return UNPOOLED.heapBuffer();
-   }
-
-   @Override
-   public ByteBuf heapBuffer(int initialCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity);
-   }
-
-   @Override
-   public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
-   }
-
-   @Override
-   public ByteBuf directBuffer() {
-      return POOLED.directBuffer();
-   }
-
-   @Override
-   public ByteBuf directBuffer(int initialCapacity) {
-      return POOLED.directBuffer(initialCapacity);
-   }
-
-   @Override
-   public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
-      return POOLED.directBuffer(initialCapacity, maxCapacity);
-   }
-
-   @Override
-   public CompositeByteBuf compositeBuffer() {
-      return UNPOOLED.compositeHeapBuffer();
-   }
-
-   @Override
-   public CompositeByteBuf compositeBuffer(int maxNumComponents) {
-      return UNPOOLED.compositeHeapBuffer(maxNumComponents);
-   }
-
-   @Override
-   public CompositeByteBuf compositeHeapBuffer() {
-      return UNPOOLED.compositeHeapBuffer();
-   }
-
-   @Override
-   public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
-      return UNPOOLED.compositeHeapBuffer(maxNumComponents);
-   }
-
-   @Override
-   public CompositeByteBuf compositeDirectBuffer() {
-      return POOLED.compositeDirectBuffer();
-   }
-
-   @Override
-   public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
-      return POOLED.compositeDirectBuffer();
-   }
-
-   @Override
-   public boolean isDirectBufferPooled() {
-      return true;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c2989e17/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java
index 0afd30c..0f08ecd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -52,7 +51,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
-import org.apache.activemq.artemis.core.remoting.impl.netty.PartialPooledByteBufAllocator;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -216,7 +214,6 @@ public class NettyConnectorWithHTTPUpgradeTest extends ActiveMQTestBase {
       } else {
          context = null;
       }
-      b.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
       b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {


[2/3] activemq-artemis git commit: ARTEMIS-1056 Improving allocations on InVM Transport

Posted by ma...@apache.org.
ARTEMIS-1056 Improving allocations on InVM Transport


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b819026d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b819026d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b819026d

Branch: refs/heads/1.x
Commit: b819026dfcdcd4a1228774ba87dcae2e94b13a9b
Parents: c2989e1
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Mar 27 09:40:11 2017 -0400
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Mon Apr 3 12:58:13 2017 +0100

----------------------------------------------------------------------
 .../artemis/core/protocol/core/Packet.java      |  9 -------
 .../core/impl/ActiveMQSessionContext.java       |  2 --
 .../core/protocol/core/impl/ChannelImpl.java    |  6 -----
 .../core/protocol/core/impl/PacketImpl.java     |  8 +-----
 .../wireformat/SessionContinuationMessage.java  | 10 ++++----
 .../impl/wireformat/SessionReceiveMessage.java  |  2 +-
 .../impl/wireformat/SessionSendMessage.java     |  2 +-
 .../remoting/impl/netty/NettyConnection.java    |  5 ----
 .../protocol/AbstractRemotingConnection.java    |  5 ----
 .../spi/core/protocol/RemotingConnection.java   |  2 --
 .../artemis/spi/core/remoting/Connection.java   |  2 --
 .../core/protocol/mqtt/MQTTConnection.java      |  5 ----
 .../core/protocol/stomp/StompConnection.java    |  5 ----
 .../core/impl/wireformat/QuorumVoteMessage.java | 10 +++-----
 .../impl/wireformat/QuorumVoteReplyMessage.java |  4 ++-
 .../core/remoting/impl/invm/InVMConnection.java | 26 ++++----------------
 16 files changed, 19 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
index d7ae5b3..ddb734e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
@@ -64,15 +64,6 @@ public interface Packet {
    ActiveMQBuffer encode(RemotingConnection connection);
 
    /**
-    * Encodes the packet and returns a {@link ActiveMQBuffer} containing the data
-    *
-    * @param connection the connection
-    * @param usePooled if the returned buffer should be pooled or unpooled
-    * @return the buffer to encode to
-    */
-   ActiveMQBuffer encode(RemotingConnection connection, boolean usePooled);
-
-   /**
     * decodes the buffer into this packet
     *
     * @param buffer the buffer to decode from

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index b6c0793..56c7135 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -877,8 +877,6 @@ public class ActiveMQSessionContext extends SessionContext {
       ActiveMQBuffer buffer = packet.encode(this.getCoreConnection());
 
       conn.write(buffer, false, false);
-
-      buffer.release();
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index bf4dd18..6e5f027 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -300,9 +300,6 @@ public final class ChannelImpl implements Channel {
          // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp
          // buffer is full, preventing any incoming buffers being handled and blocking failover
          connection.getTransportConnection().write(buffer, flush, batch);
-
-         buffer.release();
-
          return true;
       }
    }
@@ -410,7 +407,6 @@ public final class ChannelImpl implements Channel {
             }
          } finally {
             lock.unlock();
-            buffer.release();
          }
 
          return response;
@@ -634,8 +630,6 @@ public final class ChannelImpl implements Channel {
 
       connection.getTransportConnection().write(buffer, false, false);
 
-      buffer.release();
-
    }
 
    private void addResendPacket(Packet packet) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index d373176..6dddf3b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -274,13 +274,7 @@ public class PacketImpl implements Packet {
 
    @Override
    public ActiveMQBuffer encode(final RemotingConnection connection) {
-      return encode(connection,true);
-   }
-
-
-   @Override
-   public ActiveMQBuffer encode(final RemotingConnection connection, boolean usePooled) {
-      ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE, usePooled);
+      ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE);
 
       // The standard header fields
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
index faeed08..40d3622 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
@@ -79,8 +79,8 @@ public abstract class SessionContinuationMessage extends PacketImpl {
    }
 
    @Override
-   public final ActiveMQBuffer encode(final RemotingConnection connection, boolean usePooled) {
-      final ActiveMQBuffer buffer = createPacket(connection, usePooled);
+   public final ActiveMQBuffer encode(final RemotingConnection connection) {
+      final ActiveMQBuffer buffer = createPacket(connection);
 
       // The standard header fields
 
@@ -100,12 +100,12 @@ public abstract class SessionContinuationMessage extends PacketImpl {
       return buffer;
    }
 
-   protected final ActiveMQBuffer createPacket(RemotingConnection connection, boolean usePooled) {
+   protected final ActiveMQBuffer createPacket(RemotingConnection connection) {
       final int expectedEncodedSize = expectedEncodedSize();
       if (connection == null) {
          return new ChannelBufferWrapper(Unpooled.buffer(expectedEncodedSize));
       } else {
-         return connection.createTransportBuffer(expectedEncodedSize, usePooled);
+         return connection.createTransportBuffer(expectedEncodedSize);
       }
    }
 
@@ -158,4 +158,4 @@ public abstract class SessionContinuationMessage extends PacketImpl {
       return true;
    }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
index c21ebda..2129b49 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java
@@ -56,7 +56,7 @@ public class SessionReceiveMessage extends MessagePacket {
    public ActiveMQBuffer encode(final RemotingConnection connection) {
       ActiveMQBuffer buffer = message.getEncodedBuffer();
 
-      ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + DataConstants.SIZE_LONG + DataConstants.SIZE_INT, true);
+      ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + DataConstants.SIZE_LONG + DataConstants.SIZE_INT);
       bufferWrite.writeBytes(buffer, 0, buffer.capacity());
       bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
index c7bb30e..91d43a5 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java
@@ -68,7 +68,7 @@ public class SessionSendMessage extends MessagePacket {
          // this is for unit tests only
          bufferWrite = buffer.copy(0, buffer.capacity());
       } else {
-         bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1, true); // 1 for the requireResponse
+         bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1); // 1 for the requireResponse
       }
       bufferWrite.writeBytes(buffer, 0, buffer.writerIndex());
       bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index c3a71c5..679844a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -211,11 +211,6 @@ public class NettyConnection implements Connection {
 
    @Override
    public ActiveMQBuffer createTransportBuffer(final int size) {
-      return createTransportBuffer(size, false);
-   }
-
-   @Override
-   public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
       return new ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size), true);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
index 6884243..a9e12aa 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
@@ -179,11 +179,6 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
    }
 
    @Override
-   public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
-      return transportConnection.createTransportBuffer(size, pooled);
-   }
-
-   @Override
    public Connection getTransportConnection() {
       return transportConnection;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
index a68999b..39ecdf6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
@@ -120,8 +120,6 @@ public interface RemotingConnection extends BufferHandler {
     */
    ActiveMQBuffer createTransportBuffer(int size);
 
-   ActiveMQBuffer createTransportBuffer(int size, boolean pooled);
-
    /**
     * called when the underlying connection fails.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index a5fcf87..7ab0c40 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -35,8 +35,6 @@ public interface Connection {
     */
    ActiveMQBuffer createTransportBuffer(int size);
 
-   ActiveMQBuffer createTransportBuffer(int size, boolean pooled);
-
    RemotingConnection getProtocolConnection();
 
    void setProtocolConnection(RemotingConnection connection);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
index 6143cf7..446e362 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
@@ -132,11 +132,6 @@ public class MQTTConnection implements RemotingConnection {
 
    @Override
    public ActiveMQBuffer createTransportBuffer(int size) {
-      return createTransportBuffer(size, false);
-   }
-
-   @Override
-   public ActiveMQBuffer createTransportBuffer(int size, boolean pooled) {
       return transportConnection.createTransportBuffer(size);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 899ffde..c046b26 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -269,11 +269,6 @@ public final class StompConnection implements RemotingConnection {
 
    @Override
    public ActiveMQBuffer createTransportBuffer(int size) {
-      return createTransportBuffer(size, false);
-   }
-
-   @Override
-   public ActiveMQBuffer createTransportBuffer(int size, boolean pooled) {
       return ActiveMQBuffers.dynamicBuffer(size);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java
index 78ebcb9..5c030ae 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java
@@ -17,11 +17,11 @@
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler;
 import org.apache.activemq.artemis.core.server.cluster.qourum.Vote;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 
 public class QuorumVoteMessage extends PacketImpl {
 
@@ -42,11 +42,6 @@ public class QuorumVoteMessage extends PacketImpl {
    }
 
    @Override
-   public ActiveMQBuffer encode(final RemotingConnection connection) {
-      return encode(connection,false);
-   }
-
-   @Override
    public void encodeRest(ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
       buffer.writeSimpleString(handler);
@@ -57,7 +52,8 @@ public class QuorumVoteMessage extends PacketImpl {
    public void decodeRest(ActiveMQBuffer buffer) {
       super.decodeRest(buffer);
       handler = buffer.readSimpleString();
-      voteBuffer = buffer.readSlice(buffer.readableBytes());
+      voteBuffer = ActiveMQBuffers.fixedBuffer(buffer.readableBytes());
+      buffer.readBytes(voteBuffer);
    }
 
    public SimpleString getHandler() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteReplyMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteReplyMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteReplyMessage.java
index ff0609c..8a4f091 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteReplyMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteReplyMessage.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler;
@@ -62,7 +63,8 @@ public class QuorumVoteReplyMessage extends PacketImpl {
    public void decodeRest(ActiveMQBuffer buffer) {
       super.decodeRest(buffer);
       handler = buffer.readSimpleString();
-      voteBuffer = buffer.readSlice(buffer.readableBytes());
+      voteBuffer = ActiveMQBuffers.fixedBuffer(buffer.readableBytes());
+      buffer.readBytes(voteBuffer);
    }
 
    public void decodeRest(QuorumVoteHandler voteHandler) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b819026d/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index 1bd1bac..33da5f7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -152,15 +152,10 @@ public class InVMConnection implements Connection {
 
    @Override
    public ActiveMQBuffer createTransportBuffer(final int size) {
-      return createTransportBuffer(size, false);
-   }
-
-   @Override
-   public ActiveMQBuffer createTransportBuffer(final int size, boolean pooled) {
-      if (bufferPoolingEnabled && pooled) {
+      if (bufferPoolingEnabled) {
          return ActiveMQBuffers.pooledBuffer( size );
       }
-      return ActiveMQBuffers.dynamicBuffer( size );
+      return ActiveMQBuffers.dynamicBuffer(size);
    }
 
    @Override
@@ -188,26 +183,18 @@ public class InVMConnection implements Connection {
                      final boolean batch,
                      final ChannelFutureListener futureListener) {
 
-      final ActiveMQBuffer copied = ActiveMQBuffers.pooledBuffer(buffer.capacity());
-      int read = buffer.readerIndex();
-      int writ = buffer.writerIndex();
-      copied.writeBytes(buffer,read,writ - read);
-      copied.setIndex(read,writ);
-      buffer.setIndex(read,writ);
-
       try {
          executor.execute(new Runnable() {
             @Override
             public void run() {
                try {
                   if (!closed) {
-                     copied.readInt(); // read and discard
+                     buffer.readInt(); // read and discard
                      if (logger.isTraceEnabled()) {
                         logger.trace(InVMConnection.this + "::Sending inVM packet");
                      }
-                     handler.bufferReceived(id, copied);
+                     handler.bufferReceived(id, buffer);
                      if (futureListener != null) {
-                        // TODO BEFORE MERGE: (is null a good option here?)
                         futureListener.operationComplete(null);
                      }
                   }
@@ -216,13 +203,10 @@ public class InVMConnection implements Connection {
                   ActiveMQServerLogger.LOGGER.errorWritingToInvmConnector(e, this);
                   throw new IllegalStateException(msg, e);
                } finally {
+                  buffer.release();
                   if (logger.isTraceEnabled()) {
                      logger.trace(InVMConnection.this + "::packet sent done");
                   }
-                  copied.release();
-//                  if ( copied.byteBuf().refCnt() > 0 ) {
-//                     copied.release();
-//                  }
                }
             }
          });


[3/3] activemq-artemis git commit: ARTEMIS-1089 Fixing Replication catchup slow

Posted by ma...@apache.org.
ARTEMIS-1089 Fixing Replication catchup slow


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7929fff8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7929fff8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7929fff8

Branch: refs/heads/1.x
Commit: 7929fff893e1510390ae8e6a7924aa0e4f3864c0
Parents: b819026
Author: Clebert Suconic <cl...@apache.org>
Authored: Sun Apr 2 19:20:42 2017 -0400
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Mon Apr 3 12:58:13 2017 +0100

----------------------------------------------------------------------
 .../artemis/api/core/ActiveMQBuffer.java        | 13 +++
 .../core/buffers/impl/ChannelBufferWrapper.java |  5 +
 .../impl/ResetLimitWrappedActiveMQBuffer.java   |  8 ++
 .../CompressedLargeMessageControllerImpl.java   |  6 ++
 .../client/impl/LargeMessageControllerImpl.java | 15 +++
 .../amqp/converter/TestConversions.java         |  5 +
 .../cursor/impl/PageCursorProviderImpl.java     |  1 +
 .../core/paging/impl/PagingStoreImpl.java       | 31 ++++---
 .../impl/journal/JournalStorageManager.java     |  2 +-
 .../wireformat/ReplicationSyncFileMessage.java  | 13 ++-
 .../core/replication/ReplicationEndpoint.java   |  2 +-
 .../core/replication/ReplicationManager.java    | 97 ++++++++++++--------
 .../core/server/ActiveMQServerLogger.java       |  5 +-
 13 files changed, 139 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
index f30ef35..d753b8f 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQBuffer.java
@@ -1065,6 +1065,19 @@ public interface ActiveMQBuffer extends DataInput {
     */
    void writeBytes(ByteBuffer src);
 
+
+   /**
+    * Transfers the specified source buffer's data to this buffer starting at
+    * the current {@code writerIndex} until the source buffer's position
+    * reaches its limit, and increases the {@code writerIndex} by the
+    * number of the transferred bytes.
+    *
+    * @param src The source buffer
+    * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than
+    *                                   {@code this.writableBytes}
+    */
+   void writeBytes(ByteBuf src, int srcIndex, int length);
+
    /**
     * Returns a copy of this buffer's readable bytes.  Modifying the content
     * of the returned buffer or this buffer does not affect each other at all.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
index c75be21..496c146 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ChannelBufferWrapper.java
@@ -576,6 +576,11 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
    }
 
    @Override
+   public void writeBytes(ByteBuf src, int srcIndex, int length) {
+      buffer.writeBytes(src, srcIndex, length);
+   }
+
+   @Override
    public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) {
       buffer.writeBytes(src.byteBuf(), srcIndex, length);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
index ec6cf09..d6cba00 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
@@ -263,6 +263,14 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
       super.writeBytes(src);
    }
 
+
+   @Override
+   public void writeBytes(final ByteBuf src, final int srcIndex, final int length) {
+      changed();
+
+      super.writeBytes(src, srcIndex, length);
+   }
+
    @Override
    public void writeBytes(final ActiveMQBuffer src, final int srcIndex, final int length) {
       changed();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
index 55f9129..ce652d2 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
@@ -513,6 +513,12 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
    }
 
    @Override
+   public void writeBytes(ByteBuf src, int srcIndex, int length) {
+      throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
+   }
+
+
+   @Override
    public ByteBuffer toByteBuffer() {
       throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index 951aea2..0bb5690 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -863,6 +863,21 @@ public class LargeMessageControllerImpl implements LargeMessageController {
       throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
    }
 
+   /**
+    * Transfers the specified source buffer's data to this buffer starting at
+    * the current {@code writerIndex} until the source buffer's position
+    * reaches its limit, and increases the {@code writerIndex} by the
+    * number of the transferred bytes.
+    *
+    * @param src The source buffer
+    * @throws IndexOutOfBoundsException if {@code src.remaining()} is greater than
+    *                                   {@code this.writableBytes}
+    */
+   @Override
+   public void writeBytes(ByteBuf src, int srcIndex, int length) {
+      throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
+   }
+
    public int writeBytes(final InputStream in, final int length) throws IOException {
       throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index cbe2699..e154cd2 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -719,6 +719,11 @@ public class TestConversions extends Assert {
       }
 
       @Override
+      public void writeBytes(ByteBuf src, int srcIndex, int length) {
+
+      }
+
+      @Override
       public void readFully(byte[] b) throws IOException {
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 76ad26b..701f86c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -332,6 +332,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
    @Override
    public void resumeCleanup() {
       this.cleanupEnabled = true;
+      scheduleCleanup();
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 4e57c85..8cba9fe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -1093,28 +1093,29 @@ public class PagingStoreImpl implements PagingStore {
 
    @Override
    public Collection<Integer> getCurrentIds() throws Exception {
-      List<Integer> ids = new ArrayList<>();
-      if (fileFactory != null) {
-         for (String fileName : fileFactory.listFiles("page")) {
-            ids.add(getPageIdFromFileName(fileName));
+      lock.writeLock().lock();
+      try {
+         List<Integer> ids = new ArrayList<>();
+         if (fileFactory != null) {
+            for (String fileName : fileFactory.listFiles("page")) {
+               ids.add(getPageIdFromFileName(fileName));
+            }
          }
+         return ids;
+      } finally {
+         lock.writeLock().unlock();
       }
-      return ids;
    }
 
    @Override
    public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception {
-      lock.writeLock().lock();
-      try {
-         for (Integer id : pageIds) {
-            SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id));
-            if (!sFile.exists()) {
-               continue;
-            }
-            replicator.syncPages(sFile, id, getAddress());
+      for (Integer id : pageIds) {
+         SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id));
+         if (!sFile.exists()) {
+            continue;
          }
-      } finally {
-         lock.writeLock().unlock();
+         ActiveMQServerLogger.LOGGER.replicaSyncFile(sFile, sFile.size());
+         replicator.syncPages(sFile, id, getAddress());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 7c0a651..9c122b3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -587,10 +587,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
          stopReplication();
          throw e;
       } finally {
-         pagingManager.resumeCleanup();
          // Re-enable compact and reclaim of journal files
          originalBindingsJournal.replicationSyncFinished();
          originalMessageJournal.replicationSyncFinished();
+         pagingManager.resumeCleanup();
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
index de7f73e..90d2ca0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
@@ -16,11 +16,11 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.Set;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
@@ -42,7 +42,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
     */
    private long fileId;
    private int dataSize;
-   private ByteBuffer byteBuffer;
+   private ByteBuf byteBuffer;
    private byte[] byteArray;
    private SimpleString pageStoreName;
    private FileType fileType;
@@ -78,7 +78,7 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
                                      SimpleString storeName,
                                      long id,
                                      int size,
-                                     ByteBuffer buffer) {
+                                     ByteBuf buffer) {
       this();
       this.byteBuffer = buffer;
       this.pageStoreName = storeName;
@@ -124,7 +124,12 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
        * (which might receive appends)
        */
       if (dataSize > 0) {
-         buffer.writeBytes(byteBuffer);
+         buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
+      }
+
+      if (byteBuffer != null) {
+         byteBuffer.release();
+         byteBuffer = null;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 1a07adc..e1879da 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -410,7 +410,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       if (!channel1.isOpen()) {
          channel1.open();
       }
-      channel1.writeDirect(ByteBuffer.wrap(data), true);
+      channel1.writeDirect(ByteBuffer.wrap(data), false);
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index d0468d1..7e0881c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -25,8 +25,11 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -121,6 +124,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
 
    private final ExecutorFactory executorFactory;
 
+   private final Executor replicationStream;
+
    private SessionFailureListener failureListener;
 
    private CoreRemotingConnection remotingConnection;
@@ -140,6 +145,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
       this.executorFactory = executorFactory;
       this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
       this.remotingConnection = remotingConnection;
+      this.replicationStream = executorFactory.getExecutor();
       this.timeout = timeout;
    }
 
@@ -175,7 +181,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
                                   boolean sync,
                                   final boolean lineUp) throws Exception {
       if (enabled) {
-         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
+         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp, true);
       }
    }
 
@@ -340,15 +346,15 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
    }
 
    private OperationContext sendReplicatePacket(final Packet packet) {
-      return sendReplicatePacket(packet, true);
+      return sendReplicatePacket(packet, true, true);
    }
 
-   private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
+   private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean useExecutor) {
       if (!enabled)
          return null;
       boolean runItNow = false;
 
-      OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
+      final OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
       if (lineUp) {
          repliToken.replicationLineUp();
       }
@@ -356,10 +362,17 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
       synchronized (replicationLock) {
          if (enabled) {
             pendingTokens.add(repliToken);
-            if (!flowControl()) {
-               return repliToken;
+            if (useExecutor) {
+               replicationStream.execute(() -> {
+                  if (enabled) {
+                     flowControl();
+                     replicatingChannel.send(packet);
+                  }
+               });
+            } else {
+               flowControl();
+               replicatingChannel.send(packet);
             }
-            replicatingChannel.send(packet);
          } else {
             // Already replicating channel failed, so just play the action now
             runItNow = true;
@@ -380,33 +393,35 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
     * In case you refactor this in any way, this method must hold a lock on replication lock. .
     */
    private boolean flowControl() {
-      // synchronized (replicationLock) { -- I'm not adding this because the caller already has it
-      // future maintainers of this code please be aware that the intention here is hold the lock on replication lock
-      if (!replicatingChannel.getConnection().isWritable(this)) {
-         try {
-            logger.trace("flowControl waiting on writable");
-            writable.set(false);
-            //don't wait for ever as this may hang tests etc, we've probably been closed anyway
-            long now = System.currentTimeMillis();
-            long deadline = now + timeout;
-            while (!writable.get() && now < deadline) {
-               replicationLock.wait(deadline - now);
-               now = System.currentTimeMillis();
-            }
-            logger.trace("flow control done");
-
-            if (!writable.get()) {
-               ActiveMQServerLogger.LOGGER.slowReplicationResponse();
-               logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now);
-               try {
-                  stop();
-               } catch (Exception e) {
-                  logger.warn(e.getMessage(), e);
+      synchronized (replicationLock) {
+         // synchronized (replicationLock) { -- I'm not adding this because the caller already has it
+         // future maintainers of this code please be aware that the intention here is hold the lock on replication lock
+         if (!replicatingChannel.getConnection().isWritable(this)) {
+            try {
+               logger.trace("flowControl waiting on writable replication");
+               writable.set(false);
+               //don't wait for ever as this may hang tests etc, we've probably been closed anyway
+               long now = System.currentTimeMillis();
+               long deadline = now + timeout;
+               while (!writable.get() && now < deadline) {
+                  replicationLock.wait(deadline - now);
+                  now = System.currentTimeMillis();
+               }
+               logger.trace("flow control done on replication");
+
+               if (!writable.get()) {
+                  ActiveMQServerLogger.LOGGER.slowReplicationResponse();
+                  logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now);
+                  try {
+                     stop();
+                  } catch (Exception e) {
+                     logger.warn(e.getMessage(), e);
+                  }
+                  return false;
                }
-               return false;
+            } catch (InterruptedException e) {
+               throw new ActiveMQInterruptedException(e);
             }
-         } catch (InterruptedException e) {
-            throw new ActiveMQInterruptedException(e);
          }
       }
       return true;
@@ -512,7 +527,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
       }
       SequentialFile file = jf.getFile().cloneFile();
       try {
-         ActiveMQServerLogger.LOGGER.journalSynch(jf, file.size(), file);
+         ActiveMQServerLogger.LOGGER.replicaSyncFile(file, file.size());
          sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
       } finally {
          if (file.isOpen())
@@ -557,10 +572,11 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
             // We can afford having a single buffer here for this entire loop
             // because sendReplicatePacket will encode the packet as a NettyBuffer
             // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
-            final ByteBuffer buffer = ByteBuffer.allocate(1 << 17); // 1 << 17 == 131072 == 128 * 1024
+            int size = 1 << 17;
             while (true) {
-               buffer.clear();
-               final int bytesRead = channel.read(buffer);
+               final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
+               ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
+               final int bytesRead = channel.read(byteBuffer);
                int toSend = bytesRead;
                if (bytesRead > 0) {
                   if (bytesRead >= maxBytesToSend) {
@@ -569,12 +585,13 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
                   } else {
                      maxBytesToSend = maxBytesToSend - bytesRead;
                   }
-                  buffer.limit(toSend);
                }
-               buffer.rewind();
-
+               logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName());
                // sending -1 or 0 bytes will close the file at the backup
-               sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer));
+               // We cannot simply send everything of a file through the executor,
+               // otherwise we would run out of memory.
+               // so we don't use the executor here
+               sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, false);
                if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
                   break;
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7929fff8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index cf904e1..a25a7f6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
-import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -189,8 +188,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
    void backupServerSynched(ActiveMQServerImpl server);
 
    @LogMessage(level = Logger.Level.INFO)
-   @Message(id = 221025, value = "Replication: sending {0} (size={1}) to backup. {2}", format = Message.Format.MESSAGE_FORMAT)
-   void journalSynch(JournalFile jf, Long size, SequentialFile file);
+   @Message(id = 221025, value = "Replication: sending {0} (size={1}) to replica.", format = Message.Format.MESSAGE_FORMAT)
+   void replicaSyncFile(SequentialFile jf, Long size);
 
    @LogMessage(level = Logger.Level.INFO)
    @Message(