You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/17 18:05:14 UTC
[activemq-artemis] 03/04: ARTEMIS-2462 Applying fix on delete SNF
queue after ScaleDown
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit b846f356bba44f22eb2c1206959d8f193a1da62f
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Sep 16 22:27:06 2019 -0400
ARTEMIS-2462 Applying fix on delete SNF queue after ScaleDown
---
.../apache/activemq/artemis/core/server/Queue.java | 3 +
.../core/server/cluster/impl/BridgeImpl.java | 37 +++----
.../cluster/impl/ClusterConnectionBridge.java | 14 ++-
.../artemis/core/server/impl/QueueImpl.java | 5 +
.../server/impl/ScheduledDeliveryHandlerTest.java | 4 +
.../integration/server/ScaleDown3NodeTest.java | 106 +++++----------------
.../integration/server/ScaleDownRemoveSFTest.java | 12 +--
.../tests/unit/core/postoffice/impl/FakeQueue.java | 5 +
8 files changed, 76 insertions(+), 110 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 9d165e7..4d769f2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -202,6 +202,9 @@ public interface Queue extends Bindable,CriticalComponent {
void deleteQueue(boolean removeConsumers) throws Exception;
+ /** This method will push a removeAddress call into server's remove address */
+ void removeAddress() throws Exception;
+
void destroyPaging() throws Exception;
long getMessageCount();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 1024118..c43610e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -723,30 +723,35 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
if (scaleDownTargetNodeID != null && !scaleDownTargetNodeID.equals(nodeUUID.toString())) {
- synchronized (this) {
- try {
- logger.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID);
- ((QueueImpl) queue).moveReferencesBetweenSnFQueues(SimpleString.toSimpleString(scaleDownTargetNodeID));
-
- // stop the bridge from trying to reconnect and clean up all the bindings
- fail(true);
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
- }
- }
+ scaleDown(scaleDownTargetNodeID);
} else if (scaleDownTargetNodeID != null) {
// the disconnected node is scaling down to me, no need to reconnect to it
logger.debug("Received scaleDownTargetNodeID: " + scaleDownTargetNodeID + "; cancelling reconnect.");
- fail(true);
+ fail(true, true);
} else {
logger.debug("Received invalid scaleDownTargetNodeID: " + scaleDownTargetNodeID);
- fail(me.getType() == ActiveMQExceptionType.DISCONNECTED);
+ fail(me.getType() == ActiveMQExceptionType.DISCONNECTED, false);
}
tryScheduleRetryReconnect(me.getType());
}
+ protected void scaleDown(String scaleDownTargetNodeID) {
+ synchronized (this) {
+ try {
+ logger.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID);
+ ((QueueImpl) queue).moveReferencesBetweenSnFQueues(SimpleString.toSimpleString(scaleDownTargetNodeID));
+
+ // stop the bridge from trying to reconnect and clean up all the bindings
+ fail(true, true);
+
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ }
+ }
+ }
+
protected void tryScheduleRetryReconnect(final ActiveMQExceptionType type) {
scheduleRetryConnect();
}
@@ -865,7 +870,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
return transformer;
}
- protected void fail(final boolean permanently) {
+ protected void fail(final boolean permanently, boolean scaleDown) {
logger.debug(this + "\n\t::fail being called, permanently=" + permanently);
//we need to make sure we remove the node from the topology so any incoming quorum requests are voted correctly
if (targetNodeID != null) {
@@ -1050,7 +1055,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
} catch (Throwable ignored) {
}
}
- fail(false);
+ fail(false, false);
scheduleRetryConnect();
}
}
@@ -1069,7 +1074,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
if (reconnectAttemptsInUse >= 0 && retryCount > reconnectAttemptsInUse) {
ActiveMQServerLogger.LOGGER.bridgeAbortStart(name, retryCount, reconnectAttempts);
- fail(true);
+ fail(true, false);
return;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 96caf46..ade5d0c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -397,13 +397,23 @@ public class ClusterConnectionBridge extends BridgeImpl {
}
@Override
- protected void fail(final boolean permanently) {
+ protected void fail(final boolean permanently, final boolean scaleDown) {
logger.debug("Cluster Bridge " + this.getName() + " failed, permanently=" + permanently);
- super.fail(permanently);
+ super.fail(permanently, scaleDown);
if (permanently) {
logger.debug("cluster node for bridge " + this.getName() + " is permanently down");
clusterConnection.removeRecord(targetNodeID);
+
+ if (scaleDown) {
+ try {
+ queue.deleteQueue(true);
+ queue.removeAddress();
+
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
} else {
clusterConnection.disconnectRecord(targetNodeID);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index ced4ec7..f48e430 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2076,6 +2076,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
+ public void removeAddress() throws Exception {
+ server.removeAddressInfo(getAddress(), null);
+ }
+
+ @Override
public void deleteQueue(boolean removeConsumers) throws Exception {
synchronized (this) {
if (this.queueDestroyed)
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 7f92981..b3ae240 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -795,6 +795,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public void removeAddress() throws Exception {
+ }
+
+ @Override
public long getAcknowledgeAttempts() {
return 0;
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
index c695b83..ebbef7d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -76,6 +77,10 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
IntegrationTestLogger.LOGGER.info("Node 1: " + servers[1].getClusterManager().getNodeId());
IntegrationTestLogger.LOGGER.info("Node 2: " + servers[2].getClusterManager().getNodeId());
IntegrationTestLogger.LOGGER.info("===============================");
+
+ servers[0].setIdentity("Node0");
+ servers[1].setIdentity("Node1");
+ servers[2].setIdentity("Node2");
}
protected boolean isNetty() {
@@ -117,7 +122,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
createQueue(2, addressName, queueName1, null, false, servers[2].getConfiguration().getClusterUser(), servers[2].getConfiguration().getClusterPassword());
// pause the SnF queue so that when the server tries to redistribute a message it won't actually go across the cluster bridge
- String snfAddress = servers[0].getInternalNamingPrefix() + "sf.cluster0." + servers[0].getNodeID().toString();
+ final String snfAddress = servers[0].getInternalNamingPrefix() + "sf.cluster0." + servers[0].getNodeID().toString();
Queue snfQueue = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))).getQueue();
snfQueue.pause();
@@ -156,20 +161,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
// add a consumer to node 0 to trigger redistribution here
addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
- // allow some time for redistribution to move the message to the SnF queue
- long timeout = 10000;
- long start = System.currentTimeMillis();
- long messageCount = 0;
-
- while (System.currentTimeMillis() - start < timeout) {
- // ensure the message is not in the queue on node 2
- messageCount = getMessageCount(snfQueue);
- if (messageCount < TEST_SIZE) {
- Thread.sleep(200);
- } else {
- break;
- }
- }
+ Wait.assertEquals(TEST_SIZE, snfQueue::getMessageCount);
// ensure the message is in the SnF queue
Assert.assertEquals(TEST_SIZE, getMessageCount(snfQueue));
@@ -179,37 +171,16 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
removeConsumer(0);
servers[0].stop();
- start = System.currentTimeMillis();
+ Queue queueServer2 = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue();
- while (System.currentTimeMillis() - start < timeout) {
- // ensure the message is not in the queue on node 2
- messageCount = getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
- if (messageCount > 0) {
- Thread.sleep(200);
- } else {
- break;
- }
- }
-
- Assert.assertEquals(0, messageCount);
+ Wait.assertEquals(0, queueServer2::getMessageCount);
// get the messages from queue 1 on node 1
addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
- // allow some time for redistribution to move the message to node 1
- start = System.currentTimeMillis();
- while (System.currentTimeMillis() - start < timeout) {
- // ensure the message is not in the queue on node 2
- messageCount = getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
- if (messageCount < TEST_SIZE) {
- Thread.sleep(200);
- } else {
- break;
- }
- }
-
// ensure the message is in queue 1 on node 1 as expected
- Assert.assertEquals(TEST_SIZE, messageCount);
+ Queue queueServer1 = ((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue();
+ Wait.assertEquals(TEST_SIZE, queueServer1::getMessageCount);
for (int i = 0; i < TEST_SIZE; i++) {
ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
@@ -229,6 +200,12 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNull(clientMessage);
removeConsumer(0);
+
+ Wait.assertTrue(() -> (servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))) == null);
+ Wait.assertTrue(() -> (servers[1].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))) == null);
+
+ Assert.assertFalse(servers[1].queueQuery(SimpleString.toSimpleString(snfAddress)).isExists());
+ Assert.assertFalse(servers[1].addressQuery(SimpleString.toSimpleString(snfAddress)).isExists());
}
@Test
@@ -278,23 +255,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
addConsumer(1, 0, queueName3, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
- // allow some time for redistribution to move the message to the SnF queue
- long timeout = 10000;
- long start = System.currentTimeMillis();
- long messageCount = 0;
-
- while (System.currentTimeMillis() - start < timeout) {
- // ensure the message is not in the queue on node 2
- messageCount = getMessageCount(snfQueue);
- if (messageCount < TEST_SIZE * 2) {
- Thread.sleep(200);
- } else {
- break;
- }
- }
-
// ensure the message is in the SnF queue
- Assert.assertEquals(TEST_SIZE * 2, getMessageCount(snfQueue));
+ Wait.assertEquals(TEST_SIZE * 2, snfQueue::getMessageCount);
// trigger scaleDown from node 0 to node 1
IntegrationTestLogger.LOGGER.info("============ Stopping " + servers[0].getNodeID());
@@ -302,20 +264,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
removeConsumer(1);
servers[0].stop();
- start = System.currentTimeMillis();
-
- while (System.currentTimeMillis() - start < timeout) {
- // ensure the messages are not in the queues on node 2
- messageCount = getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
- messageCount += getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue());
- if (messageCount > 0) {
- Thread.sleep(200);
- } else {
- break;
- }
- }
-
- Assert.assertEquals(0, messageCount);
+ Wait.assertEquals(0, () -> getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()) +
+ getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue()));
Assert.assertEquals(TEST_SIZE, getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue()));
@@ -323,21 +273,9 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
addConsumer(1, 1, queueName3, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
- // allow some time for redistribution to move the message to node 1
- start = System.currentTimeMillis();
- while (System.currentTimeMillis() - start < timeout) {
- // ensure the message is not in the queue on node 2
- messageCount = getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
- messageCount += getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue());
- if (messageCount < TEST_SIZE * 2) {
- Thread.sleep(200);
- } else {
- break;
- }
- }
-
+ Wait.assertEquals(TEST_SIZE * 2, () -> getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()) +
+ getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue()));
// ensure the message is in queue 1 on node 1 as expected
- Assert.assertEquals(TEST_SIZE * 2, messageCount);
for (int i = 0; i < TEST_SIZE; i++) {
ClientMessage clientMessage = consumers[0].getConsumer().receive(1000);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
index ed9c3e6..145f31d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
@@ -21,8 +21,6 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
-import org.apache.activemq.artemis.core.server.AddressQueryResult;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -100,8 +98,8 @@ public class ScaleDownRemoveSFTest extends ClusterTestBase {
SimpleString sfQueueName = clusterconn1.getSfQueueName(servers[0].getNodeID().toString());
System.out.println("[sf queue on server 1]: " + sfQueueName);
- QueueQueryResult result = servers[1].queueQuery(sfQueueName);
- assertTrue(result.isExists());
+
+ Assert.assertTrue(servers[1].queueQuery(sfQueueName).isExists());
// trigger scaleDown from node 0 to node 1
servers[0].stop();
@@ -117,10 +115,8 @@ public class ScaleDownRemoveSFTest extends ClusterTestBase {
removeConsumer(0);
//check
- result = servers[1].queueQuery(sfQueueName);
- AddressQueryResult result2 = servers[1].addressQuery(sfQueueName);
- assertFalse(result.isExists());
- assertFalse(result2.isExists());
+ Assert.assertFalse(servers[1].queueQuery(sfQueueName).isExists());
+ Assert.assertFalse(servers[1].addressQuery(sfQueueName).isExists());
}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 3137df9..5795343 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -60,6 +60,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
+ public void removeAddress() throws Exception {
+
+ }
+
+ @Override
public long getDelayBeforeDispatch() {
return 0;
}