You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/04/09 15:11:38 UTC

[1/3] activemq-artemis git commit: [ARTEMIS-1791] Large message files are not removed after redistribution across a cluster

Repository: activemq-artemis
Updated Branches:
  refs/heads/master b05306dc0 -> ce4670f29


[ARTEMIS-1791] Large message files are not removed after redistribution across a cluster

Issue: https://issues.apache.org/jira/browse/ARTEMIS-1791

Adding test


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c69d6b04
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c69d6b04
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c69d6b04

Branch: refs/heads/master
Commit: c69d6b047665e7a1b1d05727cb00fa384d5cb456
Parents: b05306d
Author: Ingo Weiss <in...@redhat.com>
Authored: Mon Apr 9 07:46:15 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Apr 9 11:06:09 2018 -0400

----------------------------------------------------------------------
 .../LargeMessageRedistributionTest.java         | 45 ++++++++++++++++++++
 1 file changed, 45 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c69d6b04/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
index 7e444a0..cba1b82 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/LargeMessageRedistributionTest.java
@@ -16,11 +16,56 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.distribution;
 
+import java.io.File;
+import java.util.Arrays;
+
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.junit.Wait;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.junit.Test;
+
 public class LargeMessageRedistributionTest extends MessageRedistributionTest {
 
+   private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+
    @Override
    public boolean isLargeMessage() {
       return true;
    }
 
+   @Test
+   public void testRedistributionLargeMessageDirCleanup() throws Exception {
+      final long delay = 1000;
+      final int numMessages = 5;
+
+      setRedistributionDelay(delay);
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      createQueue(1, "queues.testaddress", "queue0", null, false);
+
+      waitForBindings(0, "queues.testaddress", 1, 0, true);
+      waitForBindings(1, "queues.testaddress", 1, 0, true);
+
+      waitForBindings(0, "queues.testaddress", 1, 0, false);
+      waitForBindings(1, "queues.testaddress", 1, 0, false);
+
+      send(0, "queues.testaddress", numMessages, false, null);
+      addConsumer(0, 0, "queue0", null);
+
+      verifyReceiveAll(numMessages, 0);
+      removeConsumer(0);
+
+      addConsumer(1, 1, "queue0", null);
+      verifyReceiveAll(numMessages, 1);
+      removeConsumer(1);
+
+      Wait.assertEquals(0, () -> getServer(0).getConfiguration().getLargeMessagesLocation().listFiles().length);
+      Wait.assertEquals(numMessages, () -> getServer(1).getConfiguration().getLargeMessagesLocation().listFiles().length);
+   }
 }


[3/3] activemq-artemis git commit: This closes #2000

Posted by cl...@apache.org.
This closes #2000


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ce4670f2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ce4670f2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ce4670f2

Branch: refs/heads/master
Commit: ce4670f294e7317dcfee802ae011464c7f41ec78
Parents: b05306d de5c0d5
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Apr 9 11:11:45 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Apr 9 11:11:45 2018 -0400

----------------------------------------------------------------------
 .../artemis/core/postoffice/Bindings.java       |  2 +
 .../core/postoffice/impl/BindingsImpl.java      |  5 +++
 .../core/postoffice/impl/PostOfficeImpl.java    | 25 ++++++++---
 .../core/server/cluster/impl/Redistributor.java |  1 +
 .../LargeMessageRedistributionTest.java         | 45 ++++++++++++++++++++
 .../impl/WildcardAddressManagerUnitTest.java    |  5 +++
 6 files changed, 78 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[2/3] activemq-artemis git commit: ARTEMIS-1791 Large message files are not removed after redistribution across a cluster

Posted by cl...@apache.org.
ARTEMIS-1791 Large message files are not removed after redistribution across a cluster


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/de5c0d51
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/de5c0d51
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/de5c0d51

Branch: refs/heads/master
Commit: de5c0d51b976a2a3c60235da56cfd854418007a7
Parents: c69d6b0
Author: Howard Gao <ho...@gmail.com>
Authored: Mon Apr 9 11:07:49 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Apr 9 11:06:27 2018 -0400

----------------------------------------------------------------------
 .../artemis/core/postoffice/Bindings.java       |  2 ++
 .../core/postoffice/impl/BindingsImpl.java      |  5 ++++
 .../core/postoffice/impl/PostOfficeImpl.java    | 25 ++++++++++++++++----
 .../core/server/cluster/impl/Redistributor.java |  1 +
 .../impl/WildcardAddressManagerUnitTest.java    |  5 ++++
 5 files changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
index f3592c4..30a2680 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
@@ -39,4 +39,6 @@ public interface Bindings extends UnproposalListener {
    boolean redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception;
 
    void route(Message message, RoutingContext context) throws Exception;
+
+   boolean allowRedistribute();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 2e2b31c..478c700 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -155,6 +155,11 @@ public final class BindingsImpl implements Bindings {
    }
 
    @Override
+   public boolean allowRedistribute() {
+      return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND);
+   }
+
+   @Override
    public boolean redistribute(final Message message,
                                final Queue originatingQueue,
                                final RoutingContext context) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index b2bfe37..f1f7a38 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -970,14 +970,29 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    public Pair<RoutingContext, Message> redistribute(final Message message,
                                                      final Queue originatingQueue,
                                                      final Transaction tx) throws Exception {
-      // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
-      // arrived the target node
-      // as described on https://issues.jboss.org/browse/JBPAPP-6130
-      Message copyRedistribute = message.copy(storageManager.generateID());
 
       Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());
 
-      if (bindings != null) {
+      if (bindings != null && bindings.allowRedistribute()) {
+         // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
+         // arrived the target node
+         // as described on https://issues.jboss.org/browse/JBPAPP-6130
+         Message copyRedistribute = message.copy(storageManager.generateID());
+         if (tx != null) {
+            tx.addOperation(new TransactionOperationAbstract() {
+               @Override
+               public void afterRollback(Transaction tx) {
+                  try {
+                     //this will cause large message file to be
+                     //cleaned up
+                     copyRedistribute.decrementRefCount();
+                  } catch (Exception e) {
+                     logger.warn("Failed to clean up message: " + copyRedistribute);
+                  }
+               }
+            });
+         }
+
          RoutingContext context = new RoutingContextImpl(tx);
 
          boolean routed = bindings.redistribute(copyRedistribute, originatingQueue, context);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index cfb9eee..7982018 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -150,6 +150,7 @@ public class Redistributor implements Consumer {
       final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);
 
       if (routingInfo == null) {
+         tx.rollback();
          return HandleStatus.BUSY;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
index 1c13cbd..40fadf9 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
@@ -345,6 +345,11 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
       public void route(Message message, RoutingContext context) throws Exception {
          System.out.println("routing message: " + message);
       }
+
+      @Override
+      public boolean allowRedistribute() {
+         return false;
+      }
    }
 
 }