You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/17 18:05:14 UTC

[activemq-artemis] 03/04: ARTEMIS-2462 Applying fix on delete SNF queue after ScaleDown

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit b846f356bba44f22eb2c1206959d8f193a1da62f
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Sep 16 22:27:06 2019 -0400

    ARTEMIS-2462 Applying fix on delete SNF queue after ScaleDown
---
 .../apache/activemq/artemis/core/server/Queue.java |   3 +
 .../core/server/cluster/impl/BridgeImpl.java       |  37 +++----
 .../cluster/impl/ClusterConnectionBridge.java      |  14 ++-
 .../artemis/core/server/impl/QueueImpl.java        |   5 +
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   4 +
 .../integration/server/ScaleDown3NodeTest.java     | 106 +++++----------------
 .../integration/server/ScaleDownRemoveSFTest.java  |  12 +--
 .../tests/unit/core/postoffice/impl/FakeQueue.java |   5 +
 8 files changed, 76 insertions(+), 110 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 9d165e7..4d769f2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -202,6 +202,9 @@ public interface Queue extends Bindable,CriticalComponent {
 
    void deleteQueue(boolean removeConsumers) throws Exception;
 
+   /** This method will push a removeAddress call into server's remove address */
+   void removeAddress() throws Exception;
+
    void destroyPaging() throws Exception;
 
    long getMessageCount();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 1024118..c43610e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -723,30 +723,35 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       }
 
       if (scaleDownTargetNodeID != null && !scaleDownTargetNodeID.equals(nodeUUID.toString())) {
-         synchronized (this) {
-            try {
-               logger.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID);
-               ((QueueImpl) queue).moveReferencesBetweenSnFQueues(SimpleString.toSimpleString(scaleDownTargetNodeID));
-
-               // stop the bridge from trying to reconnect and clean up all the bindings
-               fail(true);
-            } catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
-            }
-         }
+         scaleDown(scaleDownTargetNodeID);
       } else if (scaleDownTargetNodeID != null) {
          // the disconnected node is scaling down to me, no need to reconnect to it
          logger.debug("Received scaleDownTargetNodeID: " + scaleDownTargetNodeID + "; cancelling reconnect.");
-         fail(true);
+         fail(true, true);
       } else {
          logger.debug("Received invalid scaleDownTargetNodeID: " + scaleDownTargetNodeID);
 
-         fail(me.getType() == ActiveMQExceptionType.DISCONNECTED);
+         fail(me.getType() == ActiveMQExceptionType.DISCONNECTED, false);
       }
 
       tryScheduleRetryReconnect(me.getType());
    }
 
+   protected void scaleDown(String scaleDownTargetNodeID) {
+      synchronized (this) {
+         try {
+            logger.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID);
+            ((QueueImpl) queue).moveReferencesBetweenSnFQueues(SimpleString.toSimpleString(scaleDownTargetNodeID));
+
+            // stop the bridge from trying to reconnect and clean up all the bindings
+            fail(true, true);
+
+         } catch (Exception e) {
+            ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+         }
+      }
+   }
+
    protected void tryScheduleRetryReconnect(final ActiveMQExceptionType type) {
       scheduleRetryConnect();
    }
@@ -865,7 +870,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       return transformer;
    }
 
-   protected void fail(final boolean permanently) {
+   protected void fail(final boolean permanently, boolean scaleDown) {
       logger.debug(this + "\n\t::fail being called, permanently=" + permanently);
       //we need to make sure we remove the node from the topology so any incoming quorum requests are voted correctly
       if (targetNodeID != null) {
@@ -1050,7 +1055,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
                } catch (Throwable ignored) {
                }
             }
-            fail(false);
+            fail(false, false);
             scheduleRetryConnect();
          }
       }
@@ -1069,7 +1074,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
       if (reconnectAttemptsInUse >= 0 && retryCount > reconnectAttemptsInUse) {
          ActiveMQServerLogger.LOGGER.bridgeAbortStart(name, retryCount, reconnectAttempts);
-         fail(true);
+         fail(true, false);
          return;
       }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 96caf46..ade5d0c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -397,13 +397,23 @@ public class ClusterConnectionBridge extends BridgeImpl {
    }
 
    @Override
-   protected void fail(final boolean permanently) {
+   protected void fail(final boolean permanently, final boolean scaleDown) {
       logger.debug("Cluster Bridge " + this.getName() + " failed, permanently=" + permanently);
-      super.fail(permanently);
+      super.fail(permanently, scaleDown);
 
       if (permanently) {
          logger.debug("cluster node for bridge " + this.getName() + " is permanently down");
          clusterConnection.removeRecord(targetNodeID);
+
+         if (scaleDown) {
+            try {
+               queue.deleteQueue(true);
+               queue.removeAddress();
+
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
       } else {
          clusterConnection.disconnectRecord(targetNodeID);
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index ced4ec7..f48e430 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2076,6 +2076,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public void removeAddress() throws Exception {
+      server.removeAddressInfo(getAddress(), null);
+   }
+
+   @Override
    public void deleteQueue(boolean removeConsumers) throws Exception {
       synchronized (this) {
          if (this.queueDestroyed)
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 7f92981..b3ae240 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -795,6 +795,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public void removeAddress() throws Exception {
+      }
+
+      @Override
       public long getAcknowledgeAttempts() {
          return 0;
       }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
index c695b83..ebbef7d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -76,6 +77,10 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       IntegrationTestLogger.LOGGER.info("Node 1: " + servers[1].getClusterManager().getNodeId());
       IntegrationTestLogger.LOGGER.info("Node 2: " + servers[2].getClusterManager().getNodeId());
       IntegrationTestLogger.LOGGER.info("===============================");
+
+      servers[0].setIdentity("Node0");
+      servers[1].setIdentity("Node1");
+      servers[2].setIdentity("Node2");
    }
 
    protected boolean isNetty() {
@@ -117,7 +122,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       createQueue(2, addressName, queueName1, null, false, servers[2].getConfiguration().getClusterUser(), servers[2].getConfiguration().getClusterPassword());
 
       // pause the SnF queue so that when the server tries to redistribute a message it won't actually go across the cluster bridge
-      String snfAddress = servers[0].getInternalNamingPrefix() + "sf.cluster0." + servers[0].getNodeID().toString();
+      final String snfAddress = servers[0].getInternalNamingPrefix() + "sf.cluster0." + servers[0].getNodeID().toString();
       Queue snfQueue = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))).getQueue();
       snfQueue.pause();
 
@@ -156,20 +161,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       // add a consumer to node 0 to trigger redistribution here
       addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
 
-      // allow some time for redistribution to move the message to the SnF queue
-      long timeout = 10000;
-      long start = System.currentTimeMillis();
-      long messageCount = 0;
-
-      while (System.currentTimeMillis() - start < timeout) {
-         // ensure the message is not in the queue on node 2
-         messageCount = getMessageCount(snfQueue);
-         if (messageCount < TEST_SIZE) {
-            Thread.sleep(200);
-         } else {
-            break;
-         }
-      }
+      Wait.assertEquals(TEST_SIZE, snfQueue::getMessageCount);
 
       // ensure the message is in the SnF queue
       Assert.assertEquals(TEST_SIZE, getMessageCount(snfQueue));
@@ -179,37 +171,16 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       removeConsumer(0);
       servers[0].stop();
 
-      start = System.currentTimeMillis();
+      Queue queueServer2 = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue();
 
-      while (System.currentTimeMillis() - start < timeout) {
-         // ensure the message is not in the queue on node 2
-         messageCount = getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
-         if (messageCount > 0) {
-            Thread.sleep(200);
-         } else {
-            break;
-         }
-      }
-
-      Assert.assertEquals(0, messageCount);
+      Wait.assertEquals(0, queueServer2::getMessageCount);
 
       // get the messages from queue 1 on node 1
       addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
 
-      // allow some time for redistribution to move the message to node 1
-      start = System.currentTimeMillis();
-      while (System.currentTimeMillis() - start < timeout) {
-         // ensure the message is not in the queue on node 2
-         messageCount = getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
-         if (messageCount < TEST_SIZE) {
-            Thread.sleep(200);
-         } else {
-            break;
-         }
-      }
-
       // ensure the message is in queue 1 on node 1 as expected
-      Assert.assertEquals(TEST_SIZE, messageCount);
+      Queue queueServer1 = ((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue();
+      Wait.assertEquals(TEST_SIZE, queueServer1::getMessageCount);
 
       for (int i = 0; i < TEST_SIZE; i++) {
          ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
@@ -229,6 +200,12 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
       Assert.assertNull(clientMessage);
       removeConsumer(0);
+
+      Wait.assertTrue(() -> (servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))) == null);
+      Wait.assertTrue(() -> (servers[1].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))) == null);
+
+      Assert.assertFalse(servers[1].queueQuery(SimpleString.toSimpleString(snfAddress)).isExists());
+      Assert.assertFalse(servers[1].addressQuery(SimpleString.toSimpleString(snfAddress)).isExists());
    }
 
    @Test
@@ -278,23 +255,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
       addConsumer(1, 0, queueName3, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
 
-      // allow some time for redistribution to move the message to the SnF queue
-      long timeout = 10000;
-      long start = System.currentTimeMillis();
-      long messageCount = 0;
-
-      while (System.currentTimeMillis() - start < timeout) {
-         // ensure the message is not in the queue on node 2
-         messageCount = getMessageCount(snfQueue);
-         if (messageCount < TEST_SIZE * 2) {
-            Thread.sleep(200);
-         } else {
-            break;
-         }
-      }
-
       // ensure the message is in the SnF queue
-      Assert.assertEquals(TEST_SIZE * 2, getMessageCount(snfQueue));
+      Wait.assertEquals(TEST_SIZE * 2, snfQueue::getMessageCount);
 
       // trigger scaleDown from node 0 to node 1
       IntegrationTestLogger.LOGGER.info("============ Stopping " + servers[0].getNodeID());
@@ -302,20 +264,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       removeConsumer(1);
       servers[0].stop();
 
-      start = System.currentTimeMillis();
-
-      while (System.currentTimeMillis() - start < timeout) {
-         // ensure the messages are not in the queues on node 2
-         messageCount = getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
-         messageCount += getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue());
-         if (messageCount > 0) {
-            Thread.sleep(200);
-         } else {
-            break;
-         }
-      }
-
-      Assert.assertEquals(0, messageCount);
+      Wait.assertEquals(0, () -> getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()) +
+         getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue()));
 
       Assert.assertEquals(TEST_SIZE, getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue()));
 
@@ -323,21 +273,9 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
       addConsumer(1, 1, queueName3, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
 
-      // allow some time for redistribution to move the message to node 1
-      start = System.currentTimeMillis();
-      while (System.currentTimeMillis() - start < timeout) {
-         // ensure the message is not in the queue on node 2
-         messageCount = getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
-         messageCount += getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue());
-         if (messageCount < TEST_SIZE * 2) {
-            Thread.sleep(200);
-         } else {
-            break;
-         }
-      }
-
+      Wait.assertEquals(TEST_SIZE * 2, () -> getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()) +
+         getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue()));
       // ensure the message is in queue 1 on node 1 as expected
-      Assert.assertEquals(TEST_SIZE * 2, messageCount);
 
       for (int i = 0; i < TEST_SIZE; i++) {
          ClientMessage clientMessage = consumers[0].getConsumer().receive(1000);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
index ed9c3e6..145f31d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
@@ -21,8 +21,6 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
 import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
-import org.apache.activemq.artemis.core.server.AddressQueryResult;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -100,8 +98,8 @@ public class ScaleDownRemoveSFTest extends ClusterTestBase {
       SimpleString sfQueueName = clusterconn1.getSfQueueName(servers[0].getNodeID().toString());
 
       System.out.println("[sf queue on server 1]: " + sfQueueName);
-      QueueQueryResult result = servers[1].queueQuery(sfQueueName);
-      assertTrue(result.isExists());
+
+      Assert.assertTrue(servers[1].queueQuery(sfQueueName).isExists());
 
       // trigger scaleDown from node 0 to node 1
       servers[0].stop();
@@ -117,10 +115,8 @@ public class ScaleDownRemoveSFTest extends ClusterTestBase {
       removeConsumer(0);
 
       //check
-      result = servers[1].queueQuery(sfQueueName);
-      AddressQueryResult result2 = servers[1].addressQuery(sfQueueName);
-      assertFalse(result.isExists());
-      assertFalse(result2.isExists());
+      Assert.assertFalse(servers[1].queueQuery(sfQueueName).isExists());
+      Assert.assertFalse(servers[1].addressQuery(sfQueueName).isExists());
 
    }
 
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 3137df9..5795343 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -60,6 +60,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public void removeAddress() throws Exception {
+
+   }
+
+   @Override
    public long getDelayBeforeDispatch() {
       return 0;
    }