You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/08/19 16:40:38 UTC
[1/3] activemq-artemis git commit: ARTEMIS-1353 Initial replication
of large messages out of executor
Repository: activemq-artemis
Updated Branches:
refs/heads/master 507176c6c -> 8966b599c
ARTEMIS-1353 Initial replication of large messages out of executor
This is based on the work @jbertram made at the github pr #1466 and the discussions we had there
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ce6942a9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ce6942a9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ce6942a9
Branch: refs/heads/master
Commit: ce6942a9aa9375efaa449424fe89de2db3f22e36
Parents: 507176c
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Aug 18 15:01:33 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Aug 18 16:19:25 2017 -0400
----------------------------------------------------------------------
.../artemis/core/protocol/core/Packet.java | 7 ++
.../core/protocol/core/impl/PacketImpl.java | 1 +
.../wireformat/ReplicationSyncFileMessage.java | 10 +++
.../core/replication/ReplicationManager.java | 82 ++++++++++++++------
.../impl/SharedNothingLiveActivation.java | 2 +-
.../replication/ReplicationTest.java | 2 +-
6 files changed, 78 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce6942a9/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
index a86c5c1..efb9aa6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Packet.java
@@ -93,4 +93,11 @@ public interface Packet {
* @return true if confirmation is required
*/
boolean isRequiresConfirmations();
+
+
+
+ /** The packe wasn't used because the stream is closed,
+ * this gives a chance to sub classes to cleanup anything that won't be used. */
+ default void release() {
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce6942a9/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 97a7973..186a703 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -402,6 +402,7 @@ public class PacketImpl implements Packet {
return result;
}
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce6942a9/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
index 4d3c32f..b81782b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
@@ -159,6 +159,16 @@ public final class ReplicationSyncFileMessage extends PacketImpl {
if (dataSize > 0) {
buffer.writeBytes(byteBuffer, 0, byteBuffer.writerIndex());
}
+
+ release();
+ }
+
+ @Override
+ public void release() {
+ if (byteBuffer != null) {
+ byteBuffer.release();
+ byteBuffer = null;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce6942a9/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 3b6f9d6..73ad201 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -26,6 +26,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
@@ -93,8 +94,7 @@ public final class ReplicationManager implements ActiveMQComponent {
public boolean toBoolean() {
return true;
}
- },
- ADD {
+ }, ADD {
@Override
public boolean toBoolean() {
return false;
@@ -130,6 +130,8 @@ public final class ReplicationManager implements ActiveMQComponent {
private final long timeout;
+ private final long initialReplicationSyncTimeout;
+
private volatile boolean inSync = true;
private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);
@@ -139,8 +141,10 @@ public final class ReplicationManager implements ActiveMQComponent {
*/
public ReplicationManager(CoreRemotingConnection remotingConnection,
final long timeout,
+ final long initialReplicationSyncTimeout,
final ExecutorFactory executorFactory) {
this.executorFactory = executorFactory;
+ this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
this.remotingConnection = remotingConnection;
this.replicationStream = executorFactory.getExecutor();
@@ -181,7 +185,7 @@ public final class ReplicationManager implements ActiveMQComponent {
boolean sync,
final boolean lineUp) throws Exception {
if (enabled) {
- sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp, true);
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
}
}
@@ -342,10 +346,10 @@ public final class ReplicationManager implements ActiveMQComponent {
}
private OperationContext sendReplicatePacket(final Packet packet) {
- return sendReplicatePacket(packet, true, true);
+ return sendReplicatePacket(packet, true);
}
- private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp, boolean useExecutor) {
+ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) {
if (!enabled)
return null;
boolean runItNow = false;
@@ -356,22 +360,17 @@ public final class ReplicationManager implements ActiveMQComponent {
}
if (enabled) {
- if (useExecutor) {
- replicationStream.execute(() -> {
- if (enabled) {
- pendingTokens.add(repliToken);
- flowControl(packet.expectedEncodeSize());
- replicatingChannel.send(packet);
- }
- });
- } else {
- pendingTokens.add(repliToken);
- flowControl(packet.expectedEncodeSize());
- replicatingChannel.send(packet);
- }
+ replicationStream.execute(() -> {
+ if (enabled) {
+ pendingTokens.add(repliToken);
+ flowControl(packet.expectedEncodeSize());
+ replicatingChannel.send(packet);
+ }
+ });
} else {
// Already replicating channel failed, so just play the action now
runItNow = true;
+ packet.release();
}
// Execute outside lock
@@ -399,7 +398,6 @@ public final class ReplicationManager implements ActiveMQComponent {
}
}
-
return flowWorked;
}
@@ -514,6 +512,24 @@ public final class ReplicationManager implements ActiveMQComponent {
sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
}
+ private class FlushAction implements Runnable {
+
+ ReusableLatch latch = new ReusableLatch(1);
+
+ public void reset() {
+ latch.setCount(1);
+ }
+
+ public boolean await(long timeout, TimeUnit unit) throws Exception {
+ return latch.await(timeout, unit);
+ }
+
+ @Override
+ public void run() {
+ latch.countDown();
+ }
+ }
+
/**
* Sends large files in reasonably sized chunks to the backup during replication synchronization.
*
@@ -535,15 +551,20 @@ public final class ReplicationManager implements ActiveMQComponent {
file.open();
}
int size = 32 * 1024;
- final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
+
+ int flowControlSize = 10;
+
+ int packetsSent = 0;
+ FlushAction action = new FlushAction();
try {
- try (FileInputStream fis = new FileInputStream(file.getJavaFile());
- FileChannel channel = fis.getChannel()) {
+ try (FileInputStream fis = new FileInputStream(file.getJavaFile()); FileChannel channel = fis.getChannel()) {
+
// We can afford having a single buffer here for this entire loop
// because sendReplicatePacket will encode the packet as a NettyBuffer
// through ActiveMQBuffer class leaving this buffer free to be reused on the next copy
while (true) {
+ final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
buffer.clear();
ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
final int bytesRead = channel.read(byteBuffer);
@@ -561,18 +582,31 @@ public final class ReplicationManager implements ActiveMQComponent {
// We cannot simply send everything of a file through the executor,
// otherwise we would run out of memory.
// so we don't use the executor here
- sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true, false);
+ sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
+ packetsSent++;
+
+ if (packetsSent % flowControlSize == 0) {
+ flushReplicationStream(action);
+ }
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break;
}
}
+ flushReplicationStream(action);
} finally {
- buffer.release();
if (file.isOpen())
file.close();
}
}
+ private void flushReplicationStream(FlushAction action) throws Exception {
+ action.reset();
+ replicationStream.execute(action);
+ if (!action.await(this.timeout, TimeUnit.MILLISECONDS)) {
+ throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
+ }
+ }
+
/**
* Reserve the following fileIDs in the backup server.
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce6942a9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
index 355cefb..920366a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
@@ -169,7 +169,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
ReplicationFailureListener listener = new ReplicationFailureListener();
rc.addCloseListener(listener);
rc.addFailureListener(listener);
- replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), activeMQServer.getExecutorFactory());
+ replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getExecutorFactory());
replicationManager.start();
Thread t = new Thread(new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ce6942a9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 8236702..05f3730 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -190,7 +190,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
setupServer(false);
try {
ClientSessionFactory sf = createSessionFactory(locator);
- manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), factory);
+ manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory);
addActiveMQComponent(manager);
manager.start();
Assert.fail("Exception was expected");
[2/3] activemq-artemis git commit: NO-JIRA fixing MQTT Test
Posted by cl...@apache.org.
NO-JIRA fixing MQTT Test
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2033ee8c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2033ee8c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2033ee8c
Branch: refs/heads/master
Commit: 2033ee8c43279358e13530f0d168c7b415465f25
Parents: ce6942a
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Aug 18 16:36:03 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Aug 18 22:58:52 2017 -0400
----------------------------------------------------------------------
.../mqtt/imported/MQTTInterceptorPropertiesTest.java | 14 ++++++++++++--
.../integration/mqtt/imported/MQTTTestSupport.java | 9 +++++++--
2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2033ee8c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
index 375e2f2..2600952 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.mqtt.imported;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
@@ -96,7 +97,12 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
@Override
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
System.out.println("incoming");
- return checkMessageProperties(packet, expectedProperties);
+ if (packet.getClass() == MqttPublishMessage.class) {
+ return checkMessageProperties(packet, expectedProperties);
+ } else {
+ return true;
+ }
+
}
};
@@ -104,7 +110,11 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport {
@Override
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
System.out.println("outgoing");
- return checkMessageProperties(packet, expectedProperties);
+ if (packet.getClass() == MqttPublishMessage.class) {
+ return checkMessageProperties(packet, expectedProperties);
+ } else {
+ return true;
+ }
}
};
server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2033ee8c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index a45f06d..bac2e37 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -34,6 +34,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -369,7 +370,9 @@ public class MQTTTestSupport extends ActiveMQTestBase {
@Override
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
- messageCount++;
+ if (packet.getClass() == MqttPublishMessage.class) {
+ messageCount++;
+ }
return true;
}
@@ -388,7 +391,9 @@ public class MQTTTestSupport extends ActiveMQTestBase {
@Override
public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException {
- messageCount++;
+ if (packet.getClass() == MqttPublishMessage.class) {
+ messageCount++;
+ }
return true;
}
[3/3] activemq-artemis git commit: This closes #1472
Posted by cl...@apache.org.
This closes #1472
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8966b599
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8966b599
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8966b599
Branch: refs/heads/master
Commit: 8966b599c4004aceb17db65d4c3094ff0015837f
Parents: 507176c 2033ee8
Author: Clebert Suconic <cl...@apache.org>
Authored: Sat Aug 19 12:40:30 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Sat Aug 19 12:40:30 2017 -0400
----------------------------------------------------------------------
.../artemis/core/protocol/core/Packet.java | 7 ++
.../core/protocol/core/impl/PacketImpl.java | 1 +
.../wireformat/ReplicationSyncFileMessage.java | 10 +++
.../core/replication/ReplicationManager.java | 82 ++++++++++++++------
.../impl/SharedNothingLiveActivation.java | 2 +-
.../imported/MQTTInterceptorPropertiesTest.java | 14 +++-
.../mqtt/imported/MQTTTestSupport.java | 9 ++-
.../replication/ReplicationTest.java | 2 +-
8 files changed, 97 insertions(+), 30 deletions(-)
----------------------------------------------------------------------