You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/09/26 18:49:02 UTC

[14/17] activemq-artemis git commit: ARTEMIS-1353 Initial replication of large messages out of executor

ARTEMIS-1353 Initial replication of large messages out of executor

This is based on the work @jbertram made at the github pr #1466 and the discussions we had there

(cherry picked from commit ce6942a9aa9375efaa449424fe89de2db3f22e36)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/492b55e0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/492b55e0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/492b55e0

Branch: refs/heads/1.x
Commit: 492b55e09affb03a943c3516a5a3bf513024ca8b
Parents: 5db0c87
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Aug 18 15:01:33 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:07 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/protocol/core/Packet.java      |  7 ++
 .../core/protocol/core/impl/PacketImpl.java     |  1 +
 .../wireformat/ReplicationSyncFileMessage.java  | 10 +++
 .../core/replication/ReplicationManager.java    | 81 ++++++++++++++------
 .../impl/SharedNothingLiveActivation.java       |  2 +-
 .../replication/ReplicationTest.java            |  2 +-
 6 files changed, 77 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
index a86c5c1..efb9aa6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
@@ -93,4 +93,11 @@ public interface Packet {
     * @return true if confirmation is required
     */
    boolean isRequiresConfirmations();
+
+
+
+   /** The packe wasn't used because the stream is closed,
+    * this gives a chance to sub classes to cleanup anything that won't be used. */
+   default void release() {
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 99c052b..afbaf53 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -354,6 +354,7 @@ public class PacketImpl implements Packet {
       return result;
    }
 
+
    @Override
    public boolean equals(Object obj) {
       if (this == obj) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
index 4d3c32f..b81782b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
@@ -159,6 +159,16 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
       if (dataSize > 0) {
          buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
       }
+
+      release();
+   }
+
+   @Override
+   public void release() {
+      if (byteBuffer != null) {
+         byteBuffer.release();
+         byteBuffer = null;
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index e1027d4..d298a24 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -26,6 +26,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.netty.buffer.ByteBuf;
@@ -92,8 +93,7 @@ public final class ReplicationManager implements ActiveMQComponent {
          public boolean toBoolean() {
             return true;
          }
-      },
-      ADD {
+      }, ADD {
          @Override
          public boolean toBoolean() {
             return false;
@@ -129,6 +129,8 @@ public final class ReplicationManager implements ActiveMQComponent {
 
    private final long timeout;
 
+   private final long initialReplicationSyncTimeout;
+
    private volatile boolean inSync = true;
 
    private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);
@@ -138,8 +140,10 @@ public final class ReplicationManager implements ActiveMQComponent {
     */
    public ReplicationManager(CoreRemotingConnection remotingConnection,
                              final long timeout,
+                             final long initialReplicationSyncTimeout,
                              final ExecutorFactory executorFactory) {
       this.executorFactory = executorFactory;
+      this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
       this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
       this.remotingConnection = remotingConnection;
       this.replicationStream = executorFactory.getExecutor();
@@ -178,7 +182,7 @@ public final class ReplicationManager implements ActiveMQComponent {
                                   boolean sync,
                                   final boolean lineUp) throws Exception {
       if (enabled) {
-         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp, true);
+         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
       }
    }
 
@@ -339,10 +343,10 @@ public final class ReplicationManager implements ActiveMQComponent {
    }
 
    private OperationContext sendReplicatePacket(final Packet packet) {
-      return sendReplicatePacket(packet, true, true);
+      return sendReplicatePacket(packet, true);
    }
 
-   private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean useExecutor) {
+   private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
       if (!enabled)
          return null;
       boolean runItNow = false;
@@ -353,22 +357,17 @@ public final class ReplicationManager implements ActiveMQComponent {
       }
 
       if (enabled) {
-         if (useExecutor) {
-            replicationStream.execute(() -> {
-               if (enabled) {
-                  pendingTokens.add(repliToken);
-                  flowControl(packet.expectedEncodeSize());
-                  replicatingChannel.send(packet);
-               }
-            });
-         } else {
-            pendingTokens.add(repliToken);
-            flowControl(packet.expectedEncodeSize());
-            replicatingChannel.send(packet);
-         }
+         replicationStream.execute(() -> {
+            if (enabled) {
+               pendingTokens.add(repliToken);
+               flowControl(packet.expectedEncodeSize());
+               replicatingChannel.send(packet);
+            }
+         });
       } else {
          // Already replicating channel failed, so just play the action now
          runItNow = true;
+         packet.release();
       }
 
       // Execute outside lock
@@ -396,7 +395,6 @@ public final class ReplicationManager implements ActiveMQComponent {
          }
       }
 
-
       return flowWorked;
    }
 
@@ -511,6 +509,24 @@ public final class ReplicationManager implements ActiveMQComponent {
          sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
    }
 
+   private class FlushAction implements Runnable {
+
+      ReusableLatch latch = new ReusableLatch(1);
+
+      public void reset() {
+         latch.setCount(1);
+      }
+
+      public boolean await(long timeout, TimeUnit unit) throws Exception {
+         return latch.await(timeout, unit);
+      }
+
+      @Override
+      public void run() {
+         latch.countDown();
+      }
+   }
+
    /**
     * Sends large files in reasonably sized chunks to the backup during replication synchronization.
     *
@@ -532,15 +548,19 @@ public final class ReplicationManager implements ActiveMQComponent {
          file.open();
       }
       int size = 32 * 1024;
-      final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
+
+      int flowControlSize = 10;
+
+      int packetsSent = 0;
+      FlushAction action = new FlushAction();
 
       try {
-         try (final FileInputStream fis = new FileInputStream(file.getJavaFile());
-              final FileChannel channel = fis.getChannel()) {
+         try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) {
             // We can afford having a single buffer here for this entire loop
             // because sendReplicatePacket will encode the packet as a NettyBuffer
             // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
             while (true) {
+               final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
                buffer.clear();
                ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
                final int bytesRead = channel.read(byteBuffer);
@@ -558,18 +578,31 @@ public final class ReplicationManager implements ActiveMQComponent {
                // We cannot simply send everything of a file through the executor,
                // otherwise we would run out of memory.
                // so we don't use the executor here
-               sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, false);
+               sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
+               packetsSent++;
+
+               if (packetsSent % flowControlSize == 0) {
+                  flushReplicationStream(action);
+               }
                if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
                   break;
             }
          }
+         flushReplicationStream(action);
       } finally {
-         buffer.release();
          if (file.isOpen())
             file.close();
       }
    }
 
+   private void flushReplicationStream(FlushAction action) throws Exception {
+      action.reset();
+      replicationStream.execute(action);
+      if (!action.await(this.timeout, TimeUnit.MILLISECONDS)) {
+         throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
+      }
+   }
+
    /**
     * Reserve the following fileIDs in the backup server.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
index c984ae2..b532e57 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
@@ -157,7 +157,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
          ReplicationFailureListener listener = new ReplicationFailureListener();
          rc.addCloseListener(listener);
          rc.addFailureListener(listener);
-         replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), activeMQServer.getExecutorFactory());
+         replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getExecutorFactory());
          replicationManager.start();
          Thread t = new Thread(new Runnable() {
             @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 398e895..46cb085 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -189,7 +189,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
       setupServer(false);
       try {
          ClientSessionFactory sf = createSessionFactory(locator);
-         manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), factory);
+         manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory);
          addActiveMQComponent(manager);
          manager.start();
          Assert.fail("Exception was expected");