You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/08/19 16:40:38 UTC

[1/3] activemq-artemis git commit: ARTEMIS-1353 Initial replication of large messages out of executor

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 507176c6c -> 8966b599c


ARTEMIS-1353 Initial replication of large messages out of executor

This is based on the work @jbertram made at the github pr #1466 and the discussions we had there


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ce6942a9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ce6942a9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ce6942a9

Branch: refs/heads/master
Commit: ce6942a9aa9375efaa449424fe89de2db3f22e36
Parents: 507176c
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Aug 18 15:01:33 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Aug 18 16:19:25 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/protocol/core/Packet.java      |  7 ++
 .../core/protocol/core/impl/PacketImpl.java     |  1 +
 .../wireformat/ReplicationSyncFileMessage.java  | 10 +++
 .../core/replication/ReplicationManager.java    | 82 ++++++++++++++------
 .../impl/SharedNothingLiveActivation.java       |  2 +-
 .../replication/ReplicationTest.java            |  2 +-
 6 files changed, 78 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce6942a9/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
index a86c5c1..efb9aa6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
@@ -93,4 +93,11 @@ public interface Packet {
     * @return true if confirmation is required
     */
    boolean isRequiresConfirmations();
+
+
+
+   /** The packe wasn't used because the stream is closed,
+    * this gives a chance to sub classes to cleanup anything that won't be used. */
+   default void release() {
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce6942a9/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 97a7973..186a703 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -402,6 +402,7 @@ public class PacketImpl implements Packet {
       return result;
    }
 
+
    @Override
    public boolean equals(Object obj) {
       if (this == obj) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce6942a9/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
index 4d3c32f..b81782b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
@@ -159,6 +159,16 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
       if (dataSize > 0) {
          buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
       }
+
+      release();
+   }
+
+   @Override
+   public void release() {
+      if (byteBuffer != null) {
+         byteBuffer.release();
+         byteBuffer = null;
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce6942a9/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 3b6f9d6..73ad201 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -26,6 +26,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.netty.buffer.ByteBuf;
@@ -93,8 +94,7 @@ public final class ReplicationManager implements ActiveMQComponent {
          public boolean toBoolean() {
             return true;
          }
-      },
-      ADD {
+      }, ADD {
          @Override
          public boolean toBoolean() {
             return false;
@@ -130,6 +130,8 @@ public final class ReplicationManager implements ActiveMQComponent {
 
    private final long timeout;
 
+   private final long initialReplicationSyncTimeout;
+
    private volatile boolean inSync = true;
 
    private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);
@@ -139,8 +141,10 @@ public final class ReplicationManager implements ActiveMQComponent {
     */
    public ReplicationManager(CoreRemotingConnection remotingConnection,
                              final long timeout,
+                             final long initialReplicationSyncTimeout,
                              final ExecutorFactory executorFactory) {
       this.executorFactory = executorFactory;
+      this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
       this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
       this.remotingConnection = remotingConnection;
       this.replicationStream = executorFactory.getExecutor();
@@ -181,7 +185,7 @@ public final class ReplicationManager implements ActiveMQComponent {
                                   boolean sync,
                                   final boolean lineUp) throws Exception {
       if (enabled) {
-         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp, true);
+         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
       }
    }
 
@@ -342,10 +346,10 @@ public final class ReplicationManager implements ActiveMQComponent {
    }
 
    private OperationContext sendReplicatePacket(final Packet packet) {
-      return sendReplicatePacket(packet, true, true);
+      return sendReplicatePacket(packet, true);
    }
 
-   private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean useExecutor) {
+   private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
       if (!enabled)
          return null;
       boolean runItNow = false;
@@ -356,22 +360,17 @@ public final class ReplicationManager implements ActiveMQComponent {
       }
 
       if (enabled) {
-         if (useExecutor) {
-            replicationStream.execute(() -> {
-               if (enabled) {
-                  pendingTokens.add(repliToken);
-                  flowControl(packet.expectedEncodeSize());
-                  replicatingChannel.send(packet);
-               }
-            });
-         } else {
-            pendingTokens.add(repliToken);
-            flowControl(packet.expectedEncodeSize());
-            replicatingChannel.send(packet);
-         }
+         replicationStream.execute(() -> {
+            if (enabled) {
+               pendingTokens.add(repliToken);
+               flowControl(packet.expectedEncodeSize());
+               replicatingChannel.send(packet);
+            }
+         });
       } else {
          // Already replicating channel failed, so just play the action now
          runItNow = true;
+         packet.release();
       }
 
       // Execute outside lock
@@ -399,7 +398,6 @@ public final class ReplicationManager implements ActiveMQComponent {
          }
       }
 
-
       return flowWorked;
    }
 
@@ -514,6 +512,24 @@ public final class ReplicationManager implements ActiveMQComponent {
          sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
    }
 
+   private class FlushAction implements Runnable {
+
+      ReusableLatch latch = new ReusableLatch(1);
+
+      public void reset() {
+         latch.setCount(1);
+      }
+
+      public boolean await(long timeout, TimeUnit unit) throws Exception {
+         return latch.await(timeout, unit);
+      }
+
+      @Override
+      public void run() {
+         latch.countDown();
+      }
+   }
+
    /**
     * Sends large files in reasonably sized chunks to the backup during replication synchronization.
     *
@@ -535,15 +551,20 @@ public final class ReplicationManager implements ActiveMQComponent {
          file.open();
       }
       int size = 32 * 1024;
-      final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
+
+      int flowControlSize = 10;
+
+      int packetsSent = 0;
+      FlushAction action = new FlushAction();
 
       try {
-         try (FileInputStream fis = new FileInputStream(file.getJavaFile());
-              FileChannel channel = fis.getChannel()) {
+         try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) {
+
             // We can afford having a single buffer here for this entire loop
             // because sendReplicatePacket will encode the packet as a NettyBuffer
             // through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
             while (true) {
+               final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
                buffer.clear();
                ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
                final int bytesRead = channel.read(byteBuffer);
@@ -561,18 +582,31 @@ public final class ReplicationManager implements ActiveMQComponent {
                // We cannot simply send everything of a file through the executor,
                // otherwise we would run out of memory.
                // so we don't use the executor here
-               sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, false);
+               sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
+               packetsSent++;
+
+               if (packetsSent % flowControlSize == 0) {
+                  flushReplicationStream(action);
+               }
                if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
                   break;
             }
          }
+         flushReplicationStream(action);
       } finally {
-         buffer.release();
          if (file.isOpen())
             file.close();
       }
    }
 
+   private void flushReplicationStream(FlushAction action) throws Exception {
+      action.reset();
+      replicationStream.execute(action);
+      if (!action.await(this.timeout, TimeUnit.MILLISECONDS)) {
+         throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
+      }
+   }
+
    /**
     * Reserve the following fileIDs in the backup server.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce6942a9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
index 355cefb..920366a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
@@ -169,7 +169,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
          ReplicationFailureListener listener = new ReplicationFailureListener();
          rc.addCloseListener(listener);
          rc.addFailureListener(listener);
-         replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), activeMQServer.getExecutorFactory());
+         replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getExecutorFactory());
          replicationManager.start();
          Thread t = new Thread(new Runnable() {
             @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce6942a9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 8236702..05f3730 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -190,7 +190,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
       setupServer(false);
       try {
          ClientSessionFactory sf = createSessionFactory(locator);
-         manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), factory);
+         manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory);
          addActiveMQComponent(manager);
          manager.start();
          Assert.fail("Exception was expected");


[2/3] activemq-artemis git commit: NO-JIRA fixing MQTT Test

Posted by cl...@apache.org.
NO-JIRA fixing MQTT Test


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2033ee8c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2033ee8c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2033ee8c

Branch: refs/heads/master
Commit: 2033ee8c43279358e13530f0d168c7b415465f25
Parents: ce6942a
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Aug 18 16:36:03 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Aug 18 22:58:52 2017 -0400

----------------------------------------------------------------------
 .../mqtt/imported/MQTTInterceptorPropertiesTest.java  | 14 ++++++++++++--
 .../integration/mqtt/imported/MQTTTestSupport.java    |  9 +++++++--
 2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2033ee8c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
index 375e2f2..2600952 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.mqtt.imported;
 
 import io.netty.handler.codec.mqtt.MqttFixedHeader;
 import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
@@ -96,7 +97,12 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
          @Override
          public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
             System.out.println("incoming");
-            return checkMessageProperties(packet, expectedProperties);
+            if (packet.getClass() == MqttPublishMessage.class) {
+               return checkMessageProperties(packet, expectedProperties);
+            } else {
+               return true;
+            }
+
          }
       };
 
@@ -104,7 +110,11 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
          @Override
          public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
             System.out.println("outgoing");
-            return checkMessageProperties(packet, expectedProperties);
+            if (packet.getClass() == MqttPublishMessage.class) {
+               return checkMessageProperties(packet, expectedProperties);
+            } else {
+               return true;
+            }
          }
       };
       server.getRemotingService().addIncomingInterceptor(incomingInterceptor);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2033ee8c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index a45f06d..bac2e37 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -369,7 +370,9 @@ public class MQTTTestSupport extends ActiveMQTestBase {
 
       @Override
       public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
-         messageCount++;
+         if (packet.getClass() == MqttPublishMessage.class) {
+            messageCount++;
+         }
          return true;
       }
 
@@ -388,7 +391,9 @@ public class MQTTTestSupport extends ActiveMQTestBase {
 
       @Override
       public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
-         messageCount++;
+         if (packet.getClass() == MqttPublishMessage.class) {
+            messageCount++;
+         }
          return true;
       }
 


[3/3] activemq-artemis git commit: This closes #1472

Posted by cl...@apache.org.
This closes #1472


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8966b599
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8966b599
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8966b599

Branch: refs/heads/master
Commit: 8966b599c4004aceb17db65d4c3094ff0015837f
Parents: 507176c 2033ee8
Author: Clebert Suconic <cl...@apache.org>
Authored: Sat Aug 19 12:40:30 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Sat Aug 19 12:40:30 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/protocol/core/Packet.java      |  7 ++
 .../core/protocol/core/impl/PacketImpl.java     |  1 +
 .../wireformat/ReplicationSyncFileMessage.java  | 10 +++
 .../core/replication/ReplicationManager.java    | 82 ++++++++++++++------
 .../impl/SharedNothingLiveActivation.java       |  2 +-
 .../imported/MQTTInterceptorPropertiesTest.java | 14 +++-
 .../mqtt/imported/MQTTTestSupport.java          |  9 ++-
 .../replication/ReplicationTest.java            |  2 +-
 8 files changed, 97 insertions(+), 30 deletions(-)
----------------------------------------------------------------------