You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/02/19 20:24:44 UTC

[activemq-artemis] branch master updated: ARTEMIS-3045 ReplicationManager can batch sent replicated packets

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6126d92  ARTEMIS-3045 ReplicationManager can batch sent replicated packets
     new 898b406  This closes #3392
6126d92 is described below

commit 6126d926ddcdf34b740603e506878059ea663680
Author: franz1981 <ni...@gmail.com>
AuthorDate: Thu Dec 24 13:02:43 2020 +0100

    ARTEMIS-3045 ReplicationManager can batch sent replicated packets
---
 .../artemis/core/protocol/core/Channel.java        |   7 +
 .../core/protocol/core/CoreRemotingConnection.java |   3 +-
 .../protocol/core/impl/ActiveMQSessionContext.java |   8 +-
 .../core/protocol/core/impl/ChannelImpl.java       |   7 +-
 .../protocol/core/impl/RemotingConnectionImpl.java |   4 +-
 .../core/remoting/impl/netty/NettyConnection.java  | 104 +++------
 .../artemis/spi/core/remoting/Connection.java      |  14 +-
 .../core/protocol/core/impl/ChannelImplTest.java   |   2 +-
 .../core/replication/ReplicationEndpoint.java      |  21 +-
 .../core/replication/ReplicationManager.java       | 260 ++++++++++++++++-----
 .../integration/cluster/util/BackupSyncDelay.java  |   5 +
 .../remoting/impl/netty/NettyConnectionTest.java   |   2 +-
 12 files changed, 280 insertions(+), 157 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
index 355e502..372cad4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
@@ -85,6 +85,13 @@ public interface Channel {
    boolean sendBatched(Packet packet);
 
    /**
+    * Similarly to {@code flushConnection} on {@link #send(Packet, boolean)}, it requests
+    * any un-flushed previous sent packets to be flushed to the underlying connection.<br>
+    * It can be a no-op in case of InVM transports, because they would likely to flush already on each send.
+    */
+   void flushConnection();
+
+   /**
     * Sends a packet on this channel, but request it to be flushed (along with the un-flushed previous ones) only iff
     * {@code flushConnection} is {@code true}.
     *
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
index 377b1b5..76f87cf 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
@@ -135,10 +135,9 @@ public interface CoreRemotingConnection extends RemotingConnection {
 
    /**
     *
-    * @param size size we are trying to write
     * @param timeout
     * @return
     * @throws IllegalStateException if the connection is closed
     */
-   boolean blockUntilWritable(int size, long timeout);
+   boolean blockUntilWritable(long timeout);
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index b4c1dcd..899fc2b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -1039,19 +1039,21 @@ public class ActiveMQSessionContext extends SessionContext {
       } else {
          chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler);
       }
-      final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
       //perform a weak form of flow control to avoid OOM on tight loops
       final CoreRemotingConnection connection = channel.getConnection();
       final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout());
       final long startFlowControl = System.nanoTime();
       try {
-         final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis);
+         final boolean isWritable = connection.blockUntilWritable(blockingCallTimeoutMillis);
          if (!isWritable) {
             final long endFlowControl = System.nanoTime();
             final long elapsedFlowControl = endFlowControl - startFlowControl;
             final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
             ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
-            logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]");
+            if (logger.isDebugEnabled()) {
+               logger.debugf("try to write %d bytes after blocked %d ms on a not writable connection: [%s]",
+                             chunkPacket.expectedEncodeSize(), elapsedMillis, connection.getID());
+            }
          }
          if (requiresResponse) {
             // When sending it blocking, only the last chunk will be blocking.
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index a97f381..4ecd2c6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -232,6 +232,11 @@ public final class ChannelImpl implements Channel {
    }
 
    @Override
+   public void flushConnection() {
+      connection.getTransportConnection().flush();
+   }
+
+   @Override
    public boolean send(Packet packet, boolean flushConnection) {
       if (invokeInterceptors(packet, interceptors, connection) != null) {
          return false;
@@ -557,7 +562,7 @@ public final class ChannelImpl implements Channel {
    public static String invokeInterceptors(final Packet packet,
                                            final List<Interceptor> interceptors,
                                            final RemotingConnection connection) {
-      if (interceptors != null) {
+      if (interceptors != null && !interceptors.isEmpty()) {
          for (final Interceptor interceptor : interceptors) {
             try {
                boolean callNext = interceptor.intercept(packet, connection);
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index dcc8ecb..8f4e1b7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -244,8 +244,8 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
    }
 
    @Override
-   public boolean blockUntilWritable(int size, long timeout) {
-      return transportConnection.blockUntilWritable(size, timeout, TimeUnit.MILLISECONDS);
+   public boolean blockUntilWritable(long timeout) {
+      return transportConnection.blockUntilWritable(timeout, TimeUnit.MILLISECONDS);
    }
 
    @Override
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index cde37ae..431834e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -29,6 +29,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoop;
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -47,7 +48,6 @@ public class NettyConnection implements Connection {
 
    private static final Logger logger = Logger.getLogger(NettyConnection.class);
 
-   private static final int DEFAULT_BATCH_BYTES = Integer.getInteger("io.netty.batch.bytes", 8192);
    private static final int DEFAULT_WAIT_MILLIS = 10_000;
 
    protected final Channel channel;
@@ -59,11 +59,9 @@ public class NettyConnection implements Connection {
     * here for when the connection (or Netty Channel) becomes available again.
     */
    private final List<ReadyListener> readyListeners = new ArrayList<>();
-   private final ThreadLocal<ArrayList<ReadyListener>> localListenersPool = new ThreadLocal<>();
+   private final FastThreadLocal<ArrayList<ReadyListener>> localListenersPool = new FastThreadLocal<>();
 
    private final boolean batchingEnabled;
-   private final int writeBufferHighWaterMark;
-   private final int batchLimit;
 
    private boolean closed;
    private RemotingConnection protocolConnection;
@@ -84,10 +82,6 @@ public class NettyConnection implements Connection {
       this.directDeliver = directDeliver;
 
       this.batchingEnabled = batchingEnabled;
-
-      this.writeBufferHighWaterMark = this.channel.config().getWriteBufferHighWaterMark();
-
-      this.batchLimit = batchingEnabled ? Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0;
    }
 
    private static void waitFor(ChannelPromise promise, long millis) {
@@ -103,22 +97,9 @@ public class NettyConnection implements Connection {
 
    /**
     * Returns an estimation of the current size of the write buffer in the channel.
-    * To obtain a more precise value is necessary to use the unsafe API of the channel to
-    * call the {@link io.netty.channel.ChannelOutboundBuffer#totalPendingWriteBytes()}.
-    * Anyway, both these values are subject to concurrent modifications.
     */
-   private static int batchBufferSize(Channel channel, int writeBufferHighWaterMark) {
-      //Channel::bytesBeforeUnwritable is performing a volatile load
-      //this is the reason why writeBufferHighWaterMark is passed as an argument
-      final int bytesBeforeUnwritable = (int) channel.bytesBeforeUnwritable();
-      assert bytesBeforeUnwritable >= 0;
-      final int writtenBytes = writeBufferHighWaterMark - bytesBeforeUnwritable;
-      assert writtenBytes >= 0;
-      return writtenBytes;
-   }
-
-   public final int pendingWritesOnChannel() {
-      return batchBufferSize(this.channel, this.writeBufferHighWaterMark);
+   private static long batchBufferSize(Channel channel) {
+      return channel.unsafe().outboundBuffer().totalPendingWriteBytes();
    }
 
    public final Channel getNettyChannel() {
@@ -252,7 +233,7 @@ public class NettyConnection implements Connection {
       try {
          return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
       } catch (OutOfMemoryError oom) {
-         final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
+         final long totalPendingWriteBytes = batchBufferSize(this.channel);
          // I'm not using the ActiveMQLogger framework here, as I wanted the class name to be very specific here
          logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + " causes: " + oom.getMessage(), oom);
          throw oom;
@@ -268,9 +249,8 @@ public class NettyConnection implements Connection {
    @Override
    public final void checkFlushBatchBuffer() {
       if (this.batchingEnabled) {
-         //perform the flush only if necessary
-         final int batchBufferSize = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
-         if (batchBufferSize > 0) {
+         // perform the flush only if necessary
+         if (batchBufferSize(this.channel) > 0 && !channel.isWritable()) {
             this.channel.flush();
          }
       }
@@ -293,6 +273,12 @@ public class NettyConnection implements Connection {
    }
 
    @Override
+   public void flush() {
+      checkConnectionState();
+      this.channel.flush();
+   }
+
+   @Override
    public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
       write(buffer, flush, batched, null);
    }
@@ -304,22 +290,22 @@ public class NettyConnection implements Connection {
    }
 
    @Override
-   public final boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) {
+   public final boolean blockUntilWritable(final long timeout, final TimeUnit timeUnit) {
       checkConnectionState();
       final boolean isAllowedToBlock = isAllowedToBlock();
       if (!isAllowedToBlock) {
+         if (timeout > 0) {
+            if (Env.isTestEnv()) {
+               // this will only show when inside the testsuite.
+               // we may great the log for FAILURE
+               logger.warn("FAILURE! The code is using blockUntilWritable inside a Netty worker, which would block. " + "The code will probably need fixing!", new Exception("trace"));
+            }
 
-         if (Env.isTestEnv()) {
-            // this will only show when inside the testsuite.
-            // we may great the log for FAILURE
-            logger.warn("FAILURE! The code is using blockUntilWritable inside a Netty worker, which would block. " +
-                           "The code will probably need fixing!", new Exception("trace"));
-         }
-
-         if (logger.isDebugEnabled()) {
-            logger.debug("Calling blockUntilWritable using a thread where it's not allowed");
+            if (logger.isDebugEnabled()) {
+               logger.debug("Calling blockUntilWritable using a thread where it's not allowed");
+            }
          }
-         return canWrite(requiredCapacity);
+         return channel.isWritable();
       } else {
          final long timeoutNanos = timeUnit.toNanos(timeout);
          final long deadline = System.nanoTime() + timeoutNanos;
@@ -333,7 +319,7 @@ public class NettyConnection implements Connection {
             parkNanos = 1000L;
          }
          boolean canWrite;
-         while (!(canWrite = canWrite(requiredCapacity)) && (System.nanoTime() - deadline) < 0) {
+         while (!(canWrite = channel.isWritable()) && (System.nanoTime() - deadline) < 0) {
             //periodically check the connection state
             checkConnectionState();
             LockSupport.parkNanos(parkNanos);
@@ -348,31 +334,12 @@ public class NettyConnection implements Connection {
       return !inEventLoop;
    }
 
-   private boolean canWrite(final int requiredCapacity) {
-      //evaluate if the write request could be taken:
-      //there is enough space in the write buffer?
-      final long totalPendingWrites = this.pendingWritesOnChannel();
-      final boolean canWrite;
-      if (requiredCapacity > this.writeBufferHighWaterMark) {
-         canWrite = totalPendingWrites == 0;
-      } else {
-         canWrite = (totalPendingWrites + requiredCapacity) <= this.writeBufferHighWaterMark;
-      }
-      return canWrite;
-   }
-
    @Override
    public final void write(ActiveMQBuffer buffer,
                            final boolean flush,
                            final boolean batched,
                            final ChannelFutureListener futureListener) {
       final int readableBytes = buffer.readableBytes();
-      if (logger.isDebugEnabled()) {
-         final int remainingBytes = this.writeBufferHighWaterMark - readableBytes;
-         if (remainingBytes < 0) {
-            logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes");
-         }
-      }
       //no need to lock because the Netty's channel is thread-safe
       //and the order of write is ensured by the order of the write calls
       final Channel channel = this.channel;
@@ -385,10 +352,9 @@ public class NettyConnection implements Connection {
       final ChannelFuture future;
       final ByteBuf bytes = buffer.byteBuf();
       assert readableBytes >= 0;
-      final int writeBatchSize = this.batchLimit;
       final boolean batchingEnabled = this.batchingEnabled;
-      if (batchingEnabled && batched && !flush && readableBytes < writeBatchSize) {
-         future = writeBatch(bytes, readableBytes, promise);
+      if (batchingEnabled && batched && !flush && channel.isWritable()) {
+         future = channel.write(bytes, promise);
       } else {
          future = channel.writeAndFlush(bytes, promise);
       }
@@ -411,22 +377,6 @@ public class NettyConnection implements Connection {
       }
    }
 
-   private ChannelFuture writeBatch(final ByteBuf bytes, final int readableBytes, final ChannelPromise promise) {
-      final int batchBufferSize = batchBufferSize(channel, this.writeBufferHighWaterMark);
-      final int nextBatchSize = batchBufferSize + readableBytes;
-      if (nextBatchSize > batchLimit) {
-         //request to flush before writing, to create the chance to make the channel writable again
-         channel.flush();
-         //let netty use its write batching ability
-         return channel.write(bytes, promise);
-      } else if (nextBatchSize == batchLimit) {
-         return channel.writeAndFlush(bytes, promise);
-      } else {
-         //let netty use its write batching ability
-         return channel.write(bytes, promise);
-      }
-   }
-
    @Override
    public final String getRemoteAddress() {
       SocketAddress address = channel.remoteAddress();
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
index 0f76354..28584ae 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java
@@ -46,18 +46,17 @@ public interface Connection {
    boolean isOpen();
 
    /**
-    * Causes the current thread to wait until the connection can enqueue the required capacity unless the specified waiting time elapses.
+    * Causes the current thread to wait until the connection is writable unless the specified waiting time elapses.
     * The available capacity of the connection could change concurrently hence this method is suitable to perform precise flow-control
     * only in a single writer case, while its precision decrease inversely proportional with the rate and the number of concurrent writers.
     * If the current thread is not allowed to block the timeout will be ignored dependently on the connection type.
     *
-    * @param requiredCapacity the capacity in bytes to be enqueued
     * @param timeout          the maximum time to wait
     * @param timeUnit         the time unit of the timeout argument
-    * @return {@code true} if the connection can enqueue {@code requiredCapacity} bytes, {@code false} otherwise
+    * @return {@code true} if the connection is writable, {@code false} otherwise
     * @throws IllegalStateException if the connection is closed
     */
-   default boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) {
+   default boolean blockUntilWritable(final long timeout, final TimeUnit timeUnit) {
       return true;
    }
 
@@ -86,6 +85,13 @@ public interface Connection {
    void write(ActiveMQBuffer buffer, boolean requestFlush);
 
    /**
+    * Request to flush any previous written buffers into the wire.
+    */
+   default void flush() {
+
+   }
+
+   /**
     * writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
     *
     * @param buffer  the buffer to write
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
index d2eccf3..73fa591 100644
--- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImplTest.java
@@ -237,7 +237,7 @@ public class ChannelImplTest {
       }
 
       @Override
-      public boolean blockUntilWritable(int size, long timeout) {
+      public boolean blockUntilWritable(long timeout) {
          return false;
       }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index a5613fb..da26fa0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -21,7 +21,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.ArrayList;
+import java.util.ArrayDeque;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
@@ -80,7 +80,6 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERA
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-
 import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
@@ -134,7 +133,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
 
    private List<Interceptor> outgoingInterceptors = null;
 
-   private final ArrayList<Packet> pendingPackets;
+   private final ArrayDeque<Packet> pendingPackets;
 
 
    // Constructors --------------------------------------------------
@@ -146,7 +145,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       this.criticalErrorListener = criticalErrorListener;
       this.wantedFailBack = wantedFailBack;
       this.activation = activation;
-      this.pendingPackets = new ArrayList<>();
+      this.pendingPackets = new ArrayDeque<>();
       this.supportResponseBatching = false;
    }
 
@@ -262,18 +261,14 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
 
    @Override
    public void endOfBatch() {
-      final ArrayList<Packet> pendingPackets = this.pendingPackets;
+      final ArrayDeque<Packet> pendingPackets = this.pendingPackets;
       if (pendingPackets.isEmpty()) {
          return;
       }
-      try {
-         for (int i = 0, size = pendingPackets.size(); i < size; i++) {
-            final Packet packet = pendingPackets.get(i);
-            final boolean isLast = i == (size - 1);
-            channel.send(packet, isLast);
-         }
-      } finally {
-         pendingPackets.clear();
+      for (int i = 0, size = pendingPackets.size(); i < size; i++) {
+         final Packet packet = pendingPackets.poll();
+         final boolean isLast = i == (size - 1);
+         channel.send(packet, isLast);
       }
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 9a2d629..fa82ff6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -26,10 +26,15 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.EventLoop;
+import io.netty.channel.SingleThreadEventLoop;
+import io.netty.util.internal.PlatformDependent;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -69,6 +74,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -76,6 +82,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.jboss.logging.Logger;
@@ -126,13 +134,11 @@ public final class ReplicationManager implements ActiveMQComponent {
 
    private final ExecutorFactory ioExecutorFactory;
 
-   private final Executor replicationStream;
-
    private SessionFailureListener failureListener;
 
    private CoreRemotingConnection remotingConnection;
 
-   private final long timeout;
+   private final long maxAllowedSlownessNanos;
 
    private final long initialReplicationSyncTimeout;
 
@@ -140,6 +146,32 @@ public final class ReplicationManager implements ActiveMQComponent {
 
    private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);
 
+   private static final class ReplicatePacketRequest {
+
+      final Packet packet;
+      final OperationContext context;
+      // Although this field is needed just during the initial sync,
+      // the JVM field layout would likely left 4 bytes of wasted space without it
+      // so it makes sense to use it instead.
+      final ReusableLatch done;
+
+      ReplicatePacketRequest(Packet packet, OperationContext context, ReusableLatch done) {
+         this.packet = packet;
+         this.context = context;
+         this.done = done;
+      }
+   }
+
+   private final Queue<ReplicatePacketRequest> replicatePacketRequests;
+   private final Executor replicationStream;
+   private final ScheduledExecutorService scheduledExecutorService;
+   private ScheduledFuture<?> slowReplicationChecker;
+   private long notWritableFrom;
+   private boolean checkSlowReplication;
+   private final ReadyListener onResume;
+   private boolean isFlushing;
+   private boolean awaitingResume;
+
    /**
     * @param remotingConnection
     */
@@ -153,8 +185,23 @@ public final class ReplicationManager implements ActiveMQComponent {
       this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
       this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
       this.remotingConnection = remotingConnection;
-      this.replicationStream = ioExecutorFactory.getExecutor();
-      this.timeout = timeout;
+      final Connection transportConnection = this.remotingConnection.getTransportConnection();
+      if (transportConnection instanceof NettyConnection) {
+         final EventLoop eventLoop = ((NettyConnection) transportConnection).getNettyChannel().eventLoop();
+         this.replicationStream = eventLoop;
+         this.scheduledExecutorService = eventLoop;
+      } else {
+         this.replicationStream = ioExecutorFactory.getExecutor();
+         this.scheduledExecutorService = null;
+      }
+      this.maxAllowedSlownessNanos = timeout > 0 ? TimeUnit.MILLISECONDS.toNanos(timeout) : -1;
+      this.replicatePacketRequests = PlatformDependent.newMpscQueue();
+      this.slowReplicationChecker = null;
+      this.notWritableFrom = Long.MAX_VALUE;
+      this.awaitingResume = false;
+      this.onResume = this::resume;
+      this.isFlushing = false;
+      this.checkSlowReplication = false;
    }
 
    public void appendUpdateRecord(final byte journalID,
@@ -286,6 +333,25 @@ public final class ReplicationManager implements ActiveMQComponent {
       replicatingChannel.setHandler(responseHandler);
       failureListener = new ReplicatedSessionFailureListener();
       remotingConnection.addFailureListener(failureListener);
+      // only Netty connections can enable slow replication checker
+      if (scheduledExecutorService != null && maxAllowedSlownessNanos >= 0) {
+         long periodNanos = maxAllowedSlownessNanos / 10;
+         if (periodNanos > TimeUnit.SECONDS.toNanos(1)) {
+            periodNanos = TimeUnit.SECONDS.toNanos(1);
+         } else if (periodNanos < TimeUnit.MILLISECONDS.toNanos(100)) {
+            logger.warnf("The cluster call timeout is too low ie %d ms: consider raising it to save CPU",
+                         TimeUnit.NANOSECONDS.toMillis(maxAllowedSlownessNanos));
+            periodNanos = TimeUnit.MILLISECONDS.toNanos(100);
+         }
+         logger.debugf("Slow replication checker is running with a period of %d ms", TimeUnit.NANOSECONDS.toMillis(periodNanos));
+         // The slow detection has been implemented by using an always-on timer task
+         // instead of triggering one each time we detect an un-writable channel because:
+         // - getting temporarily an un-writable channel is rather common under load and scheduling/cancelling a
+         //   timed task is a CPU and GC intensive operation
+         // - choosing a period of 100-1000 ms lead to a reasonable and constant CPU utilization while idle too
+         slowReplicationChecker = scheduledExecutorService.scheduleAtFixedRate(this::checkSlowReplication,
+                                                                               periodNanos, periodNanos, TimeUnit.NANOSECONDS);
+      }
 
       started = true;
 
@@ -317,6 +383,11 @@ public final class ReplicationManager implements ActiveMQComponent {
          replicatingChannel.getConnection().getTransportConnection().fireReady(true);
       }
 
+      if (slowReplicationChecker != null) {
+         slowReplicationChecker.cancel(false);
+         slowReplicationChecker = null;
+      }
+
       enabled = false;
 
       if (clearTokens) {
@@ -374,6 +445,10 @@ public final class ReplicationManager implements ActiveMQComponent {
    }
 
    private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
+      return sendReplicatePacket(packet, lineUp, null);
+   }
+
+   private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, ReusableLatch done) {
       if (!enabled) {
          packet.release();
          return null;
@@ -383,29 +458,48 @@ public final class ReplicationManager implements ActiveMQComponent {
       if (lineUp) {
          repliToken.replicationLineUp();
       }
-
+      final ReplicatePacketRequest request = new ReplicatePacketRequest(packet, repliToken, done);
+      replicatePacketRequests.add(request);
       replicationStream.execute(() -> {
          if (enabled) {
-            pendingTokens.add(repliToken);
-            flowControl(packet.expectedEncodeSize());
-            replicatingChannel.send(packet);
+            sendReplicatedPackets(false);
          } else {
-            packet.release();
-            repliToken.replicationDone();
+            releaseReplicatedPackets(replicatePacketRequests);
          }
       });
 
       return repliToken;
    }
 
-   /**
-    * This was written as a refactoring of sendReplicatePacket.
-    * In case you refactor this in any way, this method must hold a lock on replication lock. .
-    */
-   private boolean flowControl(int size) {
-      boolean flowWorked = replicatingChannel.getConnection().blockUntilWritable(size, timeout);
+   private void releaseReplicatedPackets(Queue<ReplicatePacketRequest> requests) {
+      assert checkEventLoop();
+      ReplicatePacketRequest req;
+      while ((req = requests.poll()) != null) {
+         req.packet.release();
+         req.context.replicationDone();
+         if (req.done != null) {
+            req.done.countDown();
+         }
+      }
+   }
 
-      if (!flowWorked) {
+   private void checkSlowReplication() {
+      if (!enabled) {
+         return;
+      }
+      assert checkEventLoop();
+      if (!checkSlowReplication) {
+         return;
+      }
+      final boolean isWritable = replicatingChannel.getConnection().blockUntilWritable(0);
+      if (isWritable) {
+         checkSlowReplication = false;
+         return;
+      }
+      final long elapsedNanosNotWritable = System.nanoTime() - notWritableFrom;
+      if (elapsedNanosNotWritable >= maxAllowedSlownessNanos) {
+         checkSlowReplication = false;
+         releaseReplicatedPackets(replicatePacketRequests);
          try {
             ActiveMQServerLogger.LOGGER.slowReplicationResponse();
             stop();
@@ -413,8 +507,84 @@ public final class ReplicationManager implements ActiveMQComponent {
             logger.warn(e.getMessage(), e);
          }
       }
+   }
+
+   private void resume() {
+      sendReplicatedPackets(true);
+   }
+
+   private void sendReplicatedPackets(boolean resume) {
+      assert checkEventLoop();
+      if (resume) {
+         awaitingResume = false;
+      }
+      // We try to:
+      // - save recursive calls of resume due to flushConnection
+      // - saving flush pending writes *if* the OS hasn't notified that's writable again
+      if (awaitingResume || isFlushing || !enabled) {
+         return;
+      }
+      if (replicatePacketRequests.isEmpty()) {
+         return;
+      }
+      isFlushing = true;
+      final CoreRemotingConnection connection = replicatingChannel.getConnection();
+      try {
+         while (connection.blockUntilWritable(0)) {
+            checkSlowReplication = false;
+            final ReplicatePacketRequest request = replicatePacketRequests.poll();
+            if (request == null) {
+               replicatingChannel.flushConnection();
+               // given that there isn't any more work to do, we're not interested
+               // to check writability state to trigger the slow connection check
+               return;
+            }
+            pendingTokens.add(request.context);
+            final Packet pack = request.packet;
+            final ReusableLatch done = request.done;
+            if (done != null) {
+               done.countDown();
+            }
+            replicatingChannel.send(pack, false);
+         }
+         replicatingChannel.flushConnection();
+         assert !awaitingResume;
+         // we care about writability just if there is some work to do
+         if (!replicatePacketRequests.isEmpty()) {
+            if (!connection.isWritable(onResume)) {
+               checkSlowReplication = true;
+               notWritableFrom = System.nanoTime();
+               awaitingResume = true;
+            } else {
+               // submit itself again to continue draining:
+               // we're not trying it again here to save read starvation
+               // NOTE: maybe it's redundant because there are already others in-flights requests
+               replicationStream.execute(() -> sendReplicatedPackets(false));
+            }
+         }
+      } catch (Throwable t) {
+         assert !(t instanceof AssertionError) : t.getMessage();
+         if (!connection.getTransportConnection().isOpen()) {
+            // that's an handled state: right after this cleanup is expected to be stopped/closed
+            // or get the failure listener to be called!
+            logger.trace("Transport connection closed: cleaning up replicate tokens", t);
+            releaseReplicatedPackets(replicatePacketRequests);
+            // cleanup ReadyListener without triggering any further write/flush
+            connection.getTransportConnection().fireReady(true);
+         } else {
+            logger.warn("Unexpected error while flushing replicate packets", t);
+         }
+      } finally {
+         isFlushing = false;
+      }
+   }
 
-      return flowWorked;
+   private boolean checkEventLoop() {
+      if (!(replicationStream instanceof SingleThreadEventLoop)) {
+         return true;
+      }
+      final SingleThreadEventLoop eventLoop = (SingleThreadEventLoop) replicationStream;
+      return eventLoop.inEventLoop();
    }
 
    /**
@@ -423,6 +593,7 @@ public final class ReplicationManager implements ActiveMQComponent {
     *                               packets were not sent with {@link #sendReplicatePacket(Packet)}.
     */
    private void replicated() {
+      assert checkEventLoop();
       OperationContext ctx = pendingTokens.poll();
 
       if (ctx == null) {
@@ -528,24 +699,6 @@ public final class ReplicationManager implements ActiveMQComponent {
          sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
    }
 
-   private class FlushAction implements Runnable {
-
-      ReusableLatch latch = new ReusableLatch(1);
-
-      public void reset() {
-         latch.setCount(1);
-      }
-
-      public boolean await(long timeout, TimeUnit unit) throws Exception {
-         return latch.await(timeout, unit);
-      }
-
-      @Override
-      public void run() {
-         latch.countDown();
-      }
-   }
-
    /**
     * Sends large files in reasonably sized chunks to the backup during replication synchronization.
     *
@@ -566,12 +719,12 @@ public final class ReplicationManager implements ActiveMQComponent {
       if (!file.isOpen()) {
          file.open();
       }
-      int size = 32 * 1024;
+      final int size = 32 * 1024;
 
       int flowControlSize = 10;
 
       int packetsSent = 0;
-      FlushAction action = new FlushAction();
+      final ReusableLatch flushed = new ReusableLatch(1);
 
       try {
          try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) {
@@ -593,32 +746,33 @@ public final class ReplicationManager implements ActiveMQComponent {
                      maxBytesToSend = maxBytesToSend - bytesRead;
                   }
                }
-               logger.debug("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName());
+               if (logger.isDebugEnabled()) {
+                  logger.debugf("sending %d bytes on file %s", buffer.writerIndex(), file.getFileName());
+               }
                // sending -1 or 0 bytes will close the file at the backup
-               // We cannot simply send everything of a file through the executor,
-               // otherwise we would run out of memory.
-               // so we don't use the executor here
-               sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
+               final boolean lastPacket = bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0;
+               final boolean flowControlCheck = (packetsSent % flowControlSize == 0) || lastPacket;
+               if (flowControlCheck) {
+                  flushed.setCount(1);
+                  sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, flushed);
+                  awaitFlushOfReplicationStream(flushed);
+               } else {
+                  sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
+               }
                packetsSent++;
 
-               if (packetsSent % flowControlSize == 0) {
-                  flushReplicationStream(action);
-               }
-               if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
+               if (lastPacket)
                   break;
             }
          }
-         flushReplicationStream(action);
       } finally {
          if (file.isOpen())
             file.close();
       }
    }
 
-   private void flushReplicationStream(FlushAction action) throws Exception {
-      action.reset();
-      replicationStream.execute(action);
-      if (!action.await(this.initialReplicationSyncTimeout, TimeUnit.MILLISECONDS)) {
+   private void awaitFlushOfReplicationStream(ReusableLatch flushed) throws Exception {
+      if (!flushed.await(this.initialReplicationSyncTimeout, TimeUnit.MILLISECONDS)) {
          throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
       }
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
index c88cb16..9ddffd3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
@@ -223,6 +223,11 @@ public class BackupSyncDelay implements Interceptor {
       }
 
       @Override
+      public void flushConnection() {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
       public boolean sendAndFlush(Packet packet) {
          throw new UnsupportedOperationException();
       }
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
index c9c975c..5b58cb9 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
@@ -84,7 +84,7 @@ public class NettyConnectionTest extends ActiveMQTestBase {
       conn.close();
       //to make sure the channel is closed it needs to run the pending tasks
       channel.runPendingTasks();
-      conn.blockUntilWritable(0, 0, TimeUnit.NANOSECONDS);
+      conn.blockUntilWritable(0, TimeUnit.NANOSECONDS);
    }
 
    @Test