You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/09/26 18:49:02 UTC
[14/17] activemq-artemis git commit: ARTEMIS-1353 Initial replication
of large messages out of executor
ARTEMIS-1353 Initial replication of large messages out of executor
This is based on the work @jbertram made at the github pr #1466 and the discussions we had there
(cherry picked from commit ce6942a9aa9375efaa449424fe89de2db3f22e36)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/492b55e0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/492b55e0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/492b55e0
Branch: refs/heads/1.x
Commit: 492b55e09affb03a943c3516a5a3bf513024ca8b
Parents: 5db0c87
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Aug 18 15:01:33 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Sep 26 14:28:07 2017 -0400
----------------------------------------------------------------------
.../artemis/core/protocol/core/Packet.java | 7 ++
.../core/protocol/core/impl/PacketImpl.java | 1 +
.../wireformat/ReplicationSyncFileMessage.java | 10 +++
.../core/replication/ReplicationManager.java | 81 ++++++++++++++------
.../impl/SharedNothingLiveActivation.java | 2 +-
.../replication/ReplicationTest.java | 2 +-
6 files changed, 77 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
index a86c5c1..efb9aa6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
@@ -93,4 +93,11 @@ public interface Packet {
* @return true if confirmation is required
*/
boolean isRequiresConfirmations();
+
+
+
+ /** The packe wasn't used because the stream is closed,
+ * this gives a chance to sub classes to cleanup anything that won't be used. */
+ default void release() {
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 99c052b..afbaf53 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -354,6 +354,7 @@ public class PacketImpl implements Packet {
return result;
}
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
index 4d3c32f..b81782b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
@@ -159,6 +159,16 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
if (dataSize > 0) {
buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
}
+
+ release();
+ }
+
+ @Override
+ public void release() {
+ if (byteBuffer != null) {
+ byteBuffer.release();
+ byteBuffer = null;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index e1027d4..d298a24 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -26,6 +26,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
@@ -92,8 +93,7 @@ public final class ReplicationManager implements ActiveMQComponent {
public boolean toBoolean() {
return true;
}
- },
- ADD {
+ }, ADD {
@Override
public boolean toBoolean() {
return false;
@@ -129,6 +129,8 @@ public final class ReplicationManager implements ActiveMQComponent {
private final long timeout;
+ private final long initialReplicationSyncTimeout;
+
private volatile boolean inSync = true;
private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);
@@ -138,8 +140,10 @@ public final class ReplicationManager implements ActiveMQComponent {
*/
public ReplicationManager(CoreRemotingConnection remotingConnection,
final long timeout,
+ final long initialReplicationSyncTimeout,
final ExecutorFactory executorFactory) {
this.executorFactory = executorFactory;
+ this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
this.remotingConnection = remotingConnection;
this.replicationStream = executorFactory.getExecutor();
@@ -178,7 +182,7 @@ public final class ReplicationManager implements ActiveMQComponent {
boolean sync,
final boolean lineUp) throws Exception {
if (enabled) {
- sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp, true);
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
}
}
@@ -339,10 +343,10 @@ public final class ReplicationManager implements ActiveMQComponent {
}
private OperationContext sendReplicatePacket(final Packet packet) {
- return sendReplicatePacket(packet, true, true);
+ return sendReplicatePacket(packet, true);
}
- private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean useExecutor) {
+ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
if (!enabled)
return null;
boolean runItNow = false;
@@ -353,22 +357,17 @@ public final class ReplicationManager implements ActiveMQComponent {
}
if (enabled) {
- if (useExecutor) {
- replicationStream.execute(() -> {
- if (enabled) {
- pendingTokens.add(repliToken);
- flowControl(packet.expectedEncodeSize());
- replicatingChannel.send(packet);
- }
- });
- } else {
- pendingTokens.add(repliToken);
- flowControl(packet.expectedEncodeSize());
- replicatingChannel.send(packet);
- }
+ replicationStream.execute(() -> {
+ if (enabled) {
+ pendingTokens.add(repliToken);
+ flowControl(packet.expectedEncodeSize());
+ replicatingChannel.send(packet);
+ }
+ });
} else {
// Already replicating channel failed, so just play the action now
runItNow = true;
+ packet.release();
}
// Execute outside lock
@@ -396,7 +395,6 @@ public final class ReplicationManager implements ActiveMQComponent {
}
}
-
return flowWorked;
}
@@ -511,6 +509,24 @@ public final class ReplicationManager implements ActiveMQComponent {
sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
}
+ private class FlushAction implements Runnable {
+
+ ReusableLatch latch = new ReusableLatch(1);
+
+ public void reset() {
+ latch.setCount(1);
+ }
+
+ public boolean await(long timeout, TimeUnit unit) throws Exception {
+ return latch.await(timeout, unit);
+ }
+
+ @Override
+ public void run() {
+ latch.countDown();
+ }
+ }
+
/**
* Sends large files in reasonably sized chunks to the backup during replication synchronization.
*
@@ -532,15 +548,19 @@ public final class ReplicationManager implements ActiveMQComponent {
file.open();
}
int size = 32 * 1024;
- final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
+
+ int flowControlSize = 10;
+
+ int packetsSent = 0;
+ FlushAction action = new FlushAction();
try {
- try (final FileInputStream fis = new FileInputStream(file.getJavaFile());
- final FileChannel channel = fis.getChannel()) {
+ try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) {
// We can afford having a single buffer here for this entire loop
// because sendReplicatePacket will encode the packet as a NettyBuffer
// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
while (true) {
+ final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
buffer.clear();
ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
final int bytesRead = channel.read(byteBuffer);
@@ -558,18 +578,31 @@ public final class ReplicationManager implements ActiveMQComponent {
// We cannot simply send everything of a file through the executor,
// otherwise we would run out of memory.
// so we don't use the executor here
- sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, false);
+ sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
+ packetsSent++;
+
+ if (packetsSent % flowControlSize == 0) {
+ flushReplicationStream(action);
+ }
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break;
}
}
+ flushReplicationStream(action);
} finally {
- buffer.release();
if (file.isOpen())
file.close();
}
}
+ private void flushReplicationStream(FlushAction action) throws Exception {
+ action.reset();
+ replicationStream.execute(action);
+ if (!action.await(this.timeout, TimeUnit.MILLISECONDS)) {
+ throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
+ }
+ }
+
/**
* Reserve the following fileIDs in the backup server.
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
index c984ae2..b532e57 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
@@ -157,7 +157,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
ReplicationFailureListener listener = new ReplicationFailureListener();
rc.addCloseListener(listener);
rc.addFailureListener(listener);
- replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), activeMQServer.getExecutorFactory());
+ replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getExecutorFactory());
replicationManager.start();
Thread t = new Thread(new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/492b55e0/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 398e895..46cb085 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -189,7 +189,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
setupServer(false);
try {
ClientSessionFactory sf = createSessionFactory(locator);
- manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), factory);
+ manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory);
addActiveMQComponent(manager);
manager.start();
Assert.fail("Exception was expected");