You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/04/09 15:11:38 UTC
[1/3] activemq-artemis git commit: [ARTEMIS-1791] Large message files
are not removed after redistribution across a cluster
Repository: activemq-artemis
Updated Branches:
refs/heads/master b05306dc0 -> ce4670f29
[ARTEMIS-1791] Large message files are not removed after redistribution across a cluster
Issue: https://issues.apache.org/jira/browse/ARTEMIS-1791
Adding test
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c69d6b04
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c69d6b04
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c69d6b04
Branch: refs/heads/master
Commit: c69d6b047665e7a1b1d05727cb00fa384d5cb456
Parents: b05306d
Author: Ingo Weiss <in...@redhat.com>
Authored: Mon Apr 9 07:46:15 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Apr 9 11:06:09 2018 -0400
----------------------------------------------------------------------
.../LargeMessageRedistributionTest.java | 45 ++++++++++++++++++++
1 file changed, 45 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c69d6b04/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
index 7e444a0..cba1b82 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
@@ -16,11 +16,56 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+import java.io.File;
+import java.util.Arrays;
+
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.junit.Wait;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.junit.Test;
+
public class LargeMessageRedistributionTest extends MessageRedistributionTest {
+ private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+
@Override
public boolean isLargeMessage() {
return true;
}
+ @Test
+ public void testRedistributionLargeMessageDirCleanup() throws Exception {
+ final long delay = 1000;
+ final int numMessages = 5;
+
+ setRedistributionDelay(delay);
+ setupCluster(MessageLoadBalancingType.ON_DEMAND);
+
+ startServers(0, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, false);
+ waitForBindings(1, "queues.testaddress", 1, 0, false);
+
+ send(0, "queues.testaddress", numMessages, false, null);
+ addConsumer(0, 0, "queue0", null);
+
+ verifyReceiveAll(numMessages, 0);
+ removeConsumer(0);
+
+ addConsumer(1, 1, "queue0", null);
+ verifyReceiveAll(numMessages, 1);
+ removeConsumer(1);
+
+ Wait.assertEquals(0, () -> getServer(0).getConfiguration().getLargeMessagesLocation().listFiles().length);
+ Wait.assertEquals(numMessages, () -> getServer(1).getConfiguration().getLargeMessagesLocation().listFiles().length);
+ }
}
[3/3] activemq-artemis git commit: This closes #2000
Posted by cl...@apache.org.
This closes #2000
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ce4670f2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ce4670f2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ce4670f2
Branch: refs/heads/master
Commit: ce4670f294e7317dcfee802ae011464c7f41ec78
Parents: b05306d de5c0d5
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Apr 9 11:11:45 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Apr 9 11:11:45 2018 -0400
----------------------------------------------------------------------
.../artemis/core/postoffice/Bindings.java | 2 +
.../core/postoffice/impl/BindingsImpl.java | 5 +++
.../core/postoffice/impl/PostOfficeImpl.java | 25 ++++++++---
.../core/server/cluster/impl/Redistributor.java | 1 +
.../LargeMessageRedistributionTest.java | 45 ++++++++++++++++++++
.../impl/WildcardAddressManagerUnitTest.java | 5 +++
6 files changed, 78 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
[2/3] activemq-artemis git commit: ARTEMIS-1791 Large message files
are not removed after redistribution across a cluster
Posted by cl...@apache.org.
ARTEMIS-1791 Large message files are not removed after redistribution across a cluster
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/de5c0d51
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/de5c0d51
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/de5c0d51
Branch: refs/heads/master
Commit: de5c0d51b976a2a3c60235da56cfd854418007a7
Parents: c69d6b0
Author: Howard Gao <ho...@gmail.com>
Authored: Mon Apr 9 11:07:49 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Apr 9 11:06:27 2018 -0400
----------------------------------------------------------------------
.../artemis/core/postoffice/Bindings.java | 2 ++
.../core/postoffice/impl/BindingsImpl.java | 5 ++++
.../core/postoffice/impl/PostOfficeImpl.java | 25 ++++++++++++++++----
.../core/server/cluster/impl/Redistributor.java | 1 +
.../impl/WildcardAddressManagerUnitTest.java | 5 ++++
5 files changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
index f3592c4..30a2680 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
@@ -39,4 +39,6 @@ public interface Bindings extends UnproposalListener {
boolean redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception;
void route(Message message, RoutingContext context) throws Exception;
+
+ boolean allowRedistribute();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 2e2b31c..478c700 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -155,6 +155,11 @@ public final class BindingsImpl implements Bindings {
}
@Override
+ public boolean allowRedistribute() {
+ return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND);
+ }
+
+ @Override
public boolean redistribute(final Message message,
final Queue originatingQueue,
final RoutingContext context) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index b2bfe37..f1f7a38 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -970,14 +970,29 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
public Pair<RoutingContext, Message> redistribute(final Message message,
final Queue originatingQueue,
final Transaction tx) throws Exception {
- // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
- // arrived the target node
- // as described on https://issues.jboss.org/browse/JBPAPP-6130
- Message copyRedistribute = message.copy(storageManager.generateID());
Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());
- if (bindings != null) {
+ if (bindings != null && bindings.allowRedistribute()) {
+ // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
+ // arrived the target node
+ // as described on https://issues.jboss.org/browse/JBPAPP-6130
+ Message copyRedistribute = message.copy(storageManager.generateID());
+ if (tx != null) {
+ tx.addOperation(new TransactionOperationAbstract() {
+ @Override
+ public void afterRollback(Transaction tx) {
+ try {
+ //this will cause large message file to be
+ //cleaned up
+ copyRedistribute.decrementRefCount();
+ } catch (Exception e) {
+ logger.warn("Failed to clean up message: " + copyRedistribute);
+ }
+ }
+ });
+ }
+
RoutingContext context = new RoutingContextImpl(tx);
boolean routed = bindings.redistribute(copyRedistribute, originatingQueue, context);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index cfb9eee..7982018 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -150,6 +150,7 @@ public class Redistributor implements Consumer {
final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);
if (routingInfo == null) {
+ tx.rollback();
return HandleStatus.BUSY;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
index 1c13cbd..40fadf9 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
@@ -345,6 +345,11 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
public void route(Message message, RoutingContext context) throws Exception {
System.out.println("routing message: " + message);
}
+
+ @Override
+ public boolean allowRedistribute() {
+ return false;
+ }
}
}