You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/17 18:05:11 UTC

[activemq-artemis] branch master updated (3cbd5a3 -> 320381a)

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git.


    from 3cbd5a3  This closes #2840
     new d55ec37  Revert "ARTEMIS-2462 Allow store-forward queue to be deleted afte scaledown"
     new dd20f89  ARTEMIS-2462 re-applying tests on SNF Delete Queue
     new b846f35  ARTEMIS-2462 Applying fix on delete SNF queue after ScaleDown
     new 320381a  This closes #2841

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/config/ActiveMQDefaultConfiguration.java   |   7 --
 .../core/protocol/core/impl/PacketImpl.java        |   1 -
 .../artemis/core/config/ConfigurationUtils.java    |   4 +-
 .../core/config/ScaleDownConfiguration.java        |  11 ---
 .../deployers/impl/FileConfigurationParser.java    |   4 +-
 .../artemis/core/protocol/ServerPacketDecoder.java |   6 --
 .../impl/wireformat/ScaleDownAnnounceMessage.java  |   8 +-
 .../wireformat/ScaleDownAnnounceMessageV2.java     |  32 -------
 .../apache/activemq/artemis/core/server/Queue.java |   4 +-
 .../core/server/cluster/ClusterConnection.java     |   8 --
 .../core/server/cluster/ClusterControl.java        |   7 +-
 .../core/server/cluster/ClusterController.java     |  10 +-
 .../core/server/cluster/ha/ScaleDownPolicy.java    |  12 +--
 .../core/server/cluster/impl/BridgeImpl.java       |  41 ++++----
 .../cluster/impl/ClusterConnectionBridge.java      |  19 ++--
 .../server/cluster/impl/ClusterConnectionImpl.java |  21 ----
 .../server/impl/BackupRecoveryJournalLoader.java   |   9 +-
 .../core/server/impl/LiveOnlyActivation.java       |   3 +-
 .../artemis/core/server/impl/QueueImpl.java        |  31 +-----
 .../artemis/core/server/impl/ScaleDownHandler.java |   6 +-
 .../server/impl/SharedNothingBackupActivation.java |   2 +-
 .../server/impl/SharedStoreBackupActivation.java   |   2 +-
 .../resources/schema/artemis-configuration.xsd     |   7 --
 .../config/impl/FileConfigurationParserTest.java   |   2 -
 .../core/config/impl/FileConfigurationTest.java    |   1 -
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   9 +-
 docs/user-manual/en/ha.md                          |  24 -----
 .../integration/server/ScaleDown3NodeTest.java     | 106 +++++----------------
 .../integration/server/ScaleDownRemoveSFTest.java  |  35 +------
 .../tests/unit/core/postoffice/impl/FakeQueue.java |  10 +-
 30 files changed, 95 insertions(+), 347 deletions(-)
 delete mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessageV2.java


[activemq-artemis] 03/04: ARTEMIS-2462 Applying fix on delete SNF queue after ScaleDown

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit b846f356bba44f22eb2c1206959d8f193a1da62f
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Sep 16 22:27:06 2019 -0400

    ARTEMIS-2462 Applying fix on delete SNF queue after ScaleDown
---
 .../apache/activemq/artemis/core/server/Queue.java |   3 +
 .../core/server/cluster/impl/BridgeImpl.java       |  37 +++----
 .../cluster/impl/ClusterConnectionBridge.java      |  14 ++-
 .../artemis/core/server/impl/QueueImpl.java        |   5 +
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   4 +
 .../integration/server/ScaleDown3NodeTest.java     | 106 +++++----------------
 .../integration/server/ScaleDownRemoveSFTest.java  |  12 +--
 .../tests/unit/core/postoffice/impl/FakeQueue.java |   5 +
 8 files changed, 76 insertions(+), 110 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 9d165e7..4d769f2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -202,6 +202,9 @@ public interface Queue extends Bindable,CriticalComponent {
 
    void deleteQueue(boolean removeConsumers) throws Exception;
 
+   /** This method will push a removeAddress call into server's remove address */
+   void removeAddress() throws Exception;
+
    void destroyPaging() throws Exception;
 
    long getMessageCount();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 1024118..c43610e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -723,30 +723,35 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       }
 
       if (scaleDownTargetNodeID != null && !scaleDownTargetNodeID.equals(nodeUUID.toString())) {
-         synchronized (this) {
-            try {
-               logger.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID);
-               ((QueueImpl) queue).moveReferencesBetweenSnFQueues(SimpleString.toSimpleString(scaleDownTargetNodeID));
-
-               // stop the bridge from trying to reconnect and clean up all the bindings
-               fail(true);
-            } catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
-            }
-         }
+         scaleDown(scaleDownTargetNodeID);
       } else if (scaleDownTargetNodeID != null) {
          // the disconnected node is scaling down to me, no need to reconnect to it
          logger.debug("Received scaleDownTargetNodeID: " + scaleDownTargetNodeID + "; cancelling reconnect.");
-         fail(true);
+         fail(true, true);
       } else {
          logger.debug("Received invalid scaleDownTargetNodeID: " + scaleDownTargetNodeID);
 
-         fail(me.getType() == ActiveMQExceptionType.DISCONNECTED);
+         fail(me.getType() == ActiveMQExceptionType.DISCONNECTED, false);
       }
 
       tryScheduleRetryReconnect(me.getType());
    }
 
+   protected void scaleDown(String scaleDownTargetNodeID) {
+      synchronized (this) {
+         try {
+            logger.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID);
+            ((QueueImpl) queue).moveReferencesBetweenSnFQueues(SimpleString.toSimpleString(scaleDownTargetNodeID));
+
+            // stop the bridge from trying to reconnect and clean up all the bindings
+            fail(true, true);
+
+         } catch (Exception e) {
+            ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+         }
+      }
+   }
+
    protected void tryScheduleRetryReconnect(final ActiveMQExceptionType type) {
       scheduleRetryConnect();
    }
@@ -865,7 +870,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       return transformer;
    }
 
-   protected void fail(final boolean permanently) {
+   protected void fail(final boolean permanently, boolean scaleDown) {
       logger.debug(this + "\n\t::fail being called, permanently=" + permanently);
       //we need to make sure we remove the node from the topology so any incoming quorum requests are voted correctly
       if (targetNodeID != null) {
@@ -1050,7 +1055,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
                } catch (Throwable ignored) {
                }
             }
-            fail(false);
+            fail(false, false);
             scheduleRetryConnect();
          }
       }
@@ -1069,7 +1074,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
       if (reconnectAttemptsInUse >= 0 && retryCount > reconnectAttemptsInUse) {
          ActiveMQServerLogger.LOGGER.bridgeAbortStart(name, retryCount, reconnectAttempts);
-         fail(true);
+         fail(true, false);
          return;
       }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 96caf46..ade5d0c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -397,13 +397,23 @@ public class ClusterConnectionBridge extends BridgeImpl {
    }
 
    @Override
-   protected void fail(final boolean permanently) {
+   protected void fail(final boolean permanently, final boolean scaleDown) {
       logger.debug("Cluster Bridge " + this.getName() + " failed, permanently=" + permanently);
-      super.fail(permanently);
+      super.fail(permanently, scaleDown);
 
       if (permanently) {
          logger.debug("cluster node for bridge " + this.getName() + " is permanently down");
          clusterConnection.removeRecord(targetNodeID);
+
+         if (scaleDown) {
+            try {
+               queue.deleteQueue(true);
+               queue.removeAddress();
+
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
       } else {
          clusterConnection.disconnectRecord(targetNodeID);
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index ced4ec7..f48e430 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2076,6 +2076,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public void removeAddress() throws Exception {
+      server.removeAddressInfo(getAddress(), null);
+   }
+
+   @Override
    public void deleteQueue(boolean removeConsumers) throws Exception {
       synchronized (this) {
          if (this.queueDestroyed)
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 7f92981..b3ae240 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -795,6 +795,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public void removeAddress() throws Exception {
+      }
+
+      @Override
       public long getAcknowledgeAttempts() {
          return 0;
       }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
index c695b83..ebbef7d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -76,6 +77,10 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       IntegrationTestLogger.LOGGER.info("Node 1: " + servers[1].getClusterManager().getNodeId());
       IntegrationTestLogger.LOGGER.info("Node 2: " + servers[2].getClusterManager().getNodeId());
       IntegrationTestLogger.LOGGER.info("===============================");
+
+      servers[0].setIdentity("Node0");
+      servers[1].setIdentity("Node1");
+      servers[2].setIdentity("Node2");
    }
 
    protected boolean isNetty() {
@@ -117,7 +122,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       createQueue(2, addressName, queueName1, null, false, servers[2].getConfiguration().getClusterUser(), servers[2].getConfiguration().getClusterPassword());
 
       // pause the SnF queue so that when the server tries to redistribute a message it won't actually go across the cluster bridge
-      String snfAddress = servers[0].getInternalNamingPrefix() + "sf.cluster0." + servers[0].getNodeID().toString();
+      final String snfAddress = servers[0].getInternalNamingPrefix() + "sf.cluster0." + servers[0].getNodeID().toString();
       Queue snfQueue = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))).getQueue();
       snfQueue.pause();
 
@@ -156,20 +161,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       // add a consumer to node 0 to trigger redistribution here
       addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
 
-      // allow some time for redistribution to move the message to the SnF queue
-      long timeout = 10000;
-      long start = System.currentTimeMillis();
-      long messageCount = 0;
-
-      while (System.currentTimeMillis() - start < timeout) {
-         // ensure the message is not in the queue on node 2
-         messageCount = getMessageCount(snfQueue);
-         if (messageCount < TEST_SIZE) {
-            Thread.sleep(200);
-         } else {
-            break;
-         }
-      }
+      Wait.assertEquals(TEST_SIZE, snfQueue::getMessageCount);
 
       // ensure the message is in the SnF queue
       Assert.assertEquals(TEST_SIZE, getMessageCount(snfQueue));
@@ -179,37 +171,16 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       removeConsumer(0);
       servers[0].stop();
 
-      start = System.currentTimeMillis();
+      Queue queueServer2 = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue();
 
-      while (System.currentTimeMillis() - start < timeout) {
-         // ensure the message is not in the queue on node 2
-         messageCount = getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
-         if (messageCount > 0) {
-            Thread.sleep(200);
-         } else {
-            break;
-         }
-      }
-
-      Assert.assertEquals(0, messageCount);
+      Wait.assertEquals(0, queueServer2::getMessageCount);
 
       // get the messages from queue 1 on node 1
       addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
 
-      // allow some time for redistribution to move the message to node 1
-      start = System.currentTimeMillis();
-      while (System.currentTimeMillis() - start < timeout) {
-         // ensure the message is not in the queue on node 2
-         messageCount = getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
-         if (messageCount < TEST_SIZE) {
-            Thread.sleep(200);
-         } else {
-            break;
-         }
-      }
-
       // ensure the message is in queue 1 on node 1 as expected
-      Assert.assertEquals(TEST_SIZE, messageCount);
+      Queue queueServer1 = ((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue();
+      Wait.assertEquals(TEST_SIZE, queueServer1::getMessageCount);
 
       for (int i = 0; i < TEST_SIZE; i++) {
          ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
@@ -229,6 +200,12 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
       Assert.assertNull(clientMessage);
       removeConsumer(0);
+
+      Wait.assertTrue(() -> (servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))) == null);
+      Wait.assertTrue(() -> (servers[1].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))) == null);
+
+      Assert.assertFalse(servers[1].queueQuery(SimpleString.toSimpleString(snfAddress)).isExists());
+      Assert.assertFalse(servers[1].addressQuery(SimpleString.toSimpleString(snfAddress)).isExists());
    }
 
    @Test
@@ -278,23 +255,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
       addConsumer(1, 0, queueName3, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
 
-      // allow some time for redistribution to move the message to the SnF queue
-      long timeout = 10000;
-      long start = System.currentTimeMillis();
-      long messageCount = 0;
-
-      while (System.currentTimeMillis() - start < timeout) {
-         // ensure the message is not in the queue on node 2
-         messageCount = getMessageCount(snfQueue);
-         if (messageCount < TEST_SIZE * 2) {
-            Thread.sleep(200);
-         } else {
-            break;
-         }
-      }
-
       // ensure the message is in the SnF queue
-      Assert.assertEquals(TEST_SIZE * 2, getMessageCount(snfQueue));
+      Wait.assertEquals(TEST_SIZE * 2, snfQueue::getMessageCount);
 
       // trigger scaleDown from node 0 to node 1
       IntegrationTestLogger.LOGGER.info("============ Stopping " + servers[0].getNodeID());
@@ -302,20 +264,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       removeConsumer(1);
       servers[0].stop();
 
-      start = System.currentTimeMillis();
-
-      while (System.currentTimeMillis() - start < timeout) {
-         // ensure the messages are not in the queues on node 2
-         messageCount = getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
-         messageCount += getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue());
-         if (messageCount > 0) {
-            Thread.sleep(200);
-         } else {
-            break;
-         }
-      }
-
-      Assert.assertEquals(0, messageCount);
+      Wait.assertEquals(0, () -> getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()) +
+         getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue()));
 
       Assert.assertEquals(TEST_SIZE, getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue()));
 
@@ -323,21 +273,9 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
       addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
       addConsumer(1, 1, queueName3, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
 
-      // allow some time for redistribution to move the message to node 1
-      start = System.currentTimeMillis();
-      while (System.currentTimeMillis() - start < timeout) {
-         // ensure the message is not in the queue on node 2
-         messageCount = getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
-         messageCount += getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue());
-         if (messageCount < TEST_SIZE * 2) {
-            Thread.sleep(200);
-         } else {
-            break;
-         }
-      }
-
+      Wait.assertEquals(TEST_SIZE * 2, () -> getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()) +
+         getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue()));
       // ensure the message is in queue 1 on node 1 as expected
-      Assert.assertEquals(TEST_SIZE * 2, messageCount);
 
       for (int i = 0; i < TEST_SIZE; i++) {
          ClientMessage clientMessage = consumers[0].getConsumer().receive(1000);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
index ed9c3e6..145f31d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
@@ -21,8 +21,6 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
 import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
-import org.apache.activemq.artemis.core.server.AddressQueryResult;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -100,8 +98,8 @@ public class ScaleDownRemoveSFTest extends ClusterTestBase {
       SimpleString sfQueueName = clusterconn1.getSfQueueName(servers[0].getNodeID().toString());
 
       System.out.println("[sf queue on server 1]: " + sfQueueName);
-      QueueQueryResult result = servers[1].queueQuery(sfQueueName);
-      assertTrue(result.isExists());
+
+      Assert.assertTrue(servers[1].queueQuery(sfQueueName).isExists());
 
       // trigger scaleDown from node 0 to node 1
       servers[0].stop();
@@ -117,10 +115,8 @@ public class ScaleDownRemoveSFTest extends ClusterTestBase {
       removeConsumer(0);
 
       //check
-      result = servers[1].queueQuery(sfQueueName);
-      AddressQueryResult result2 = servers[1].addressQuery(sfQueueName);
-      assertFalse(result.isExists());
-      assertFalse(result2.isExists());
+      Assert.assertFalse(servers[1].queueQuery(sfQueueName).isExists());
+      Assert.assertFalse(servers[1].addressQuery(sfQueueName).isExists());
 
    }
 
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 3137df9..5795343 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -60,6 +60,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public void removeAddress() throws Exception {
+
+   }
+
+   @Override
    public long getDelayBeforeDispatch() {
       return 0;
    }


[activemq-artemis] 02/04: ARTEMIS-2462 re-applying tests on SNF Delete Queue

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit dd20f89bd011d673d4fb6e664e9e68a8ca8348d7
Author: Howard Gao <ho...@gmail.com>
AuthorDate: Mon Sep 16 21:44:25 2019 -0400

    ARTEMIS-2462 re-applying tests on SNF Delete Queue
---
 .../server/cluster/impl/ClusterConnectionImpl.java |   6 +-
 .../config/impl/FileConfigurationParserTest.java   |  31 +++++
 .../integration/server/ScaleDownRemoveSFTest.java  | 127 +++++++++++++++++++++
 3 files changed, 163 insertions(+), 1 deletion(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 6ee2da4..d6f34c9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -710,7 +710,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
 
                // New node - create a new flow record
 
-               final SimpleString queueName = new SimpleString(storeAndForwardPrefix + name + "." + nodeID);
+               final SimpleString queueName = getSfQueueName(nodeID);
 
                Binding queueBinding = postOffice.getBinding(queueName);
 
@@ -741,6 +741,10 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
       }
    }
 
+   public SimpleString getSfQueueName(String nodeID) {
+      return new SimpleString(storeAndForwardPrefix + name + "." + nodeID);
+   }
+
    @Override
    public synchronized void informClusterOfBackup() {
       String nodeID = server.getNodeID().toString();
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index df1ee08..6e41d1c 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -28,7 +28,9 @@ import org.apache.activemq.artemis.core.config.BridgeConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.FileDeploymentManager;
 import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
 import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
 import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -272,6 +274,35 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
       testParsingOverFlow("<bridges> \n" + "  <bridge name=\"price-forward-bridge\"> \n" + "    <queue-name>priceForwarding</queue-name>  \n" + "    <forwarding-address>newYorkPriceUpdates</forwarding-address>\n" + "    <producer-window-size>2147483648</producer-window-size>\n" + "    <static-connectors> \n" + "      <connector-ref>netty</connector-ref> \n" + "    </static-connectors> \n" + "  </bridge> \n" + "</bridges>\n");
    }
 
+   @Test
+   public void testParsingScaleDownConfig() throws Exception {
+      FileConfigurationParser parser = new FileConfigurationParser();
+
+      String configStr = firstPart + "<ha-policy>\n" +
+               "   <live-only>\n" +
+               "      <scale-down>\n" +
+               "         <connectors>\n" +
+               "            <connector-ref>server0-connector</connector-ref>\n" +
+               "         </connectors>\n" +
+               "      </scale-down>\n" +
+               "   </live-only>\n" +
+               "</ha-policy>\n" + lastPart;
+      ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
+
+      Configuration config = parser.parseMainConfig(input);
+
+      HAPolicyConfiguration haConfig = config.getHAPolicyConfiguration();
+      assertTrue(haConfig instanceof LiveOnlyPolicyConfiguration);
+
+      LiveOnlyPolicyConfiguration liveOnlyCfg = (LiveOnlyPolicyConfiguration) haConfig;
+      ScaleDownConfiguration scaledownCfg = liveOnlyCfg.getScaleDownConfiguration();
+      List<String> connectors = scaledownCfg.getConnectors();
+      assertEquals(1, connectors.size());
+      String connector = connectors.get(0);
+      assertEquals("server0-connector", connector);
+   }
+
+
    private void testParsingOverFlow(String config) throws Exception {
       FileConfigurationParser parser = new FileConfigurationParser();
       String firstPartWithoutAddressSettings = firstPart.substring(0, firstPart.indexOf("<address-settings"));
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
new file mode 100644
index 0000000..ed9c3e6
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.server;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class ScaleDownRemoveSFTest extends ClusterTestBase {
+
+   public ScaleDownRemoveSFTest() {
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration();
+      setupLiveServer(0, isFileStorage(), isNetty(), true);
+      setupLiveServer(1, isFileStorage(), isNetty(), true);
+      LiveOnlyPolicyConfiguration haPolicyConfiguration0 = (LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration();
+      haPolicyConfiguration0.setScaleDownConfiguration(scaleDownConfiguration);
+      LiveOnlyPolicyConfiguration haPolicyConfiguration1 = (LiveOnlyPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration();
+      haPolicyConfiguration1.setScaleDownConfiguration(new ScaleDownConfiguration());
+
+      setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+      haPolicyConfiguration0.getScaleDownConfiguration().getConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
+      haPolicyConfiguration1.getScaleDownConfiguration().getConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
+      servers[0].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
+      servers[1].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
+      startServers(0, 1);
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      super.tearDown();
+   }
+
+
+   protected boolean isNetty() {
+      return true;
+   }
+
+   @Test
+   public void testScaleDownCheckSF() throws Exception {
+      final int TEST_SIZE = 2;
+      final String addressName = "testAddress";
+      final String queueName1 = "testQueue1";
+
+      // create 2 queues on each node mapped to the same address
+      createQueue(0, addressName, queueName1, null, true);
+      createQueue(1, addressName, queueName1, null, true);
+
+      // send messages to node 0
+      send(0, addressName, TEST_SIZE, true, null);
+
+      // consume a message from queue 1
+      addConsumer(1, 0, queueName1, null, false);
+      ClientMessage clientMessage = consumers[1].getConsumer().receive(250);
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+      consumers[1].getSession().commit();
+
+      Assert.assertEquals(TEST_SIZE - 1, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()));
+
+      //check sf queue on server1 exists
+      ClusterConnectionImpl clusterconn1 = (ClusterConnectionImpl) servers[1].getClusterManager().getClusterConnection("cluster0");
+      SimpleString sfQueueName = clusterconn1.getSfQueueName(servers[0].getNodeID().toString());
+
+      System.out.println("[sf queue on server 1]: " + sfQueueName);
+      QueueQueryResult result = servers[1].queueQuery(sfQueueName);
+      assertTrue(result.isExists());
+
+      // trigger scaleDown from node 0 to node 1
+      servers[0].stop();
+
+      addConsumer(0, 1, queueName1, null);
+      clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+
+      // ensure there are no more messages on queue 1
+      clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNull(clientMessage);
+      removeConsumer(0);
+
+      //check
+      result = servers[1].queueQuery(sfQueueName);
+      AddressQueryResult result2 = servers[1].addressQuery(sfQueueName);
+      assertFalse(result.isExists());
+      assertFalse(result2.isExists());
+
+   }
+
+}


[activemq-artemis] 01/04: Revert "ARTEMIS-2462 Allow store-forward queue to be deleted afte scaledown"

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit d55ec37195dff04377a43957cc0afa9d0de14b89
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Sep 16 21:19:14 2019 -0400

    Revert "ARTEMIS-2462 Allow store-forward queue to be deleted afte scaledown"
    
    This reverts commit 397cef699aea5a59cfc3970b7735aaf2068bf1ff.
---
 .../api/config/ActiveMQDefaultConfiguration.java   |   7 -
 .../core/protocol/core/impl/PacketImpl.java        |   1 -
 .../artemis/core/config/ConfigurationUtils.java    |   4 +-
 .../core/config/ScaleDownConfiguration.java        |  11 --
 .../deployers/impl/FileConfigurationParser.java    |   4 +-
 .../artemis/core/protocol/ServerPacketDecoder.java |   6 -
 .../impl/wireformat/ScaleDownAnnounceMessage.java  |   8 +-
 .../wireformat/ScaleDownAnnounceMessageV2.java     |  32 -----
 .../apache/activemq/artemis/core/server/Queue.java |   1 -
 .../core/server/cluster/ClusterConnection.java     |   8 --
 .../core/server/cluster/ClusterControl.java        |   7 +-
 .../core/server/cluster/ClusterController.java     |  10 +-
 .../core/server/cluster/ha/ScaleDownPolicy.java    |  12 +-
 .../core/server/cluster/impl/BridgeImpl.java       |   4 -
 .../cluster/impl/ClusterConnectionBridge.java      |   5 -
 .../server/cluster/impl/ClusterConnectionImpl.java |  27 +---
 .../server/impl/BackupRecoveryJournalLoader.java   |   9 +-
 .../core/server/impl/LiveOnlyActivation.java       |   3 +-
 .../artemis/core/server/impl/QueueImpl.java        |  26 ----
 .../artemis/core/server/impl/ScaleDownHandler.java |   6 +-
 .../server/impl/SharedNothingBackupActivation.java |   2 +-
 .../server/impl/SharedStoreBackupActivation.java   |   2 +-
 .../resources/schema/artemis-configuration.xsd     |   7 -
 .../config/impl/FileConfigurationParserTest.java   |  33 -----
 .../core/config/impl/FileConfigurationTest.java    |   1 -
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   5 -
 docs/user-manual/en/ha.md                          |  24 ----
 .../integration/server/ScaleDownRemoveSFTest.java  | 148 ---------------------
 .../tests/unit/core/postoffice/impl/FakeQueue.java |   5 -
 29 files changed, 19 insertions(+), 399 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index a327084..d41e8d3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -430,9 +430,6 @@ public final class ActiveMQDefaultConfiguration {
    // its possible that you only want a server to partake in scale down as a receiver, via a group. In this case set scale-down to false
    private static boolean DEFAULT_SCALE_DOWN_ENABLED = true;
 
-   // will the target node delete the store-and-forward queue for the scaled down node.
-   private static boolean DEFAULT_SCALE_DOWN_CLEANUP_SF_QUEUE = false;
-
    // How long to wait for a decision
    private static int DEFAULT_GROUPING_HANDLER_TIMEOUT = 5000;
 
@@ -1534,8 +1531,4 @@ public final class ActiveMQDefaultConfiguration {
    public static long getDefaultRetryReplicationWait() {
       return DEFAULT_RETRY_REPLICATION_WAIT;
    }
-
-   public static boolean isDefaultCleanupSfQueue() {
-      return DEFAULT_SCALE_DOWN_CLEANUP_SF_QUEUE;
-   }
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index ef2fae8..a7a3253 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -277,7 +277,6 @@ public class PacketImpl implements Packet {
 
    public static final byte SESS_BINDINGQUERY_RESP_V4 = -15;
 
-   public static final byte SCALEDOWN_ANNOUNCEMENT_V2 = -16;
 
    // Static --------------------------------------------------------
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
index 5697460..a314947 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
@@ -125,9 +125,9 @@ public final class ConfigurationUtils {
    public static ScaleDownPolicy getScaleDownPolicy(ScaleDownConfiguration scaleDownConfiguration) {
       if (scaleDownConfiguration != null) {
          if (scaleDownConfiguration.getDiscoveryGroup() != null) {
-            return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.isCleanupSfQueue());
+            return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled());
          } else {
-            return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.isCleanupSfQueue());
+            return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled());
          }
       }
       return null;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java
index d0ea7d6..5f58e36 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java
@@ -34,8 +34,6 @@ public class ScaleDownConfiguration implements Serializable {
 
    private boolean enabled = ActiveMQDefaultConfiguration.isDefaultScaleDownEnabled();
 
-   private boolean cleanupSfQueue = ActiveMQDefaultConfiguration.isDefaultCleanupSfQueue();
-
    public List<String> getConnectors() {
       return connectors;
    }
@@ -85,13 +83,4 @@ public class ScaleDownConfiguration implements Serializable {
       this.enabled = enabled;
       return this;
    }
-
-   public Boolean isCleanupSfQueue() {
-      return this.cleanupSfQueue;
-   }
-
-   public ScaleDownConfiguration setCleanupSfQueue(Boolean cleanupSfQueue) {
-      this.cleanupSfQueue = cleanupSfQueue;
-      return this;
-   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index bd095e0..2b4b481 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1579,8 +1579,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
          Element scaleDownElement = (Element) scaleDownNode.item(0);
 
-         scaleDownConfiguration.setCleanupSfQueue(getBoolean(scaleDownElement, "cleanup-sf-queue", scaleDownConfiguration.isCleanupSfQueue()));
-
          scaleDownConfiguration.setEnabled(getBoolean(scaleDownElement, "enabled", scaleDownConfiguration.isEnabled()));
 
          NodeList discoveryGroupRef = scaleDownElement.getElementsByTagName("discovery-group-ref");
@@ -1794,6 +1792,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
       int clusterNotificationAttempts = getInteger(e, "notification-attempts", ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), Validators.GT_ZERO);
 
+      String scaleDownConnector = e.getAttribute("scale-down-connector");
+
       String discoveryGroupName = null;
 
       List<String> staticConnectorNames = new ArrayList<>();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index 853b3ae..0428abe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessageV2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
@@ -77,7 +76,6 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REP
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE_V2;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
@@ -254,10 +252,6 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
             packet = new ScaleDownAnnounceMessage();
             break;
          }
-         case SCALEDOWN_ANNOUNCEMENT_V2: {
-            packet = new ScaleDownAnnounceMessageV2();
-            break;
-         }
          default: {
             packet = super.decode(packetType, connection);
          }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessage.java
index 3c18adb..7a6f147 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessage.java
@@ -22,17 +22,13 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 
 public class ScaleDownAnnounceMessage extends PacketImpl {
 
-   protected SimpleString targetNodeId;
-   protected SimpleString scaledDownNodeId;
+   private SimpleString targetNodeId;
+   private SimpleString scaledDownNodeId;
 
    public ScaleDownAnnounceMessage() {
       super(SCALEDOWN_ANNOUNCEMENT);
    }
 
-   public ScaleDownAnnounceMessage(byte type) {
-      super(type);
-   }
-
    public ScaleDownAnnounceMessage(SimpleString targetNodeId, SimpleString scaledDownNodeId) {
       super(SCALEDOWN_ANNOUNCEMENT);
       this.targetNodeId = targetNodeId;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessageV2.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessageV2.java
deleted file mode 100644
index a5c09cd..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessageV2.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-
-public class ScaleDownAnnounceMessageV2 extends ScaleDownAnnounceMessage {
-
-   public ScaleDownAnnounceMessageV2() {
-      super(SCALEDOWN_ANNOUNCEMENT_V2);
-   }
-
-   public ScaleDownAnnounceMessageV2(SimpleString targetNodeId, SimpleString scaledDownNodeId) {
-      this();
-      this.targetNodeId = targetNodeId;
-      this.scaledDownNodeId = scaledDownNodeId;
-   }
-}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index adcb72e..9d165e7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -454,5 +454,4 @@ public interface Queue extends Bindable,CriticalComponent {
 
    }
 
-   boolean internalDelete();
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
index 9c4ea18..6171476 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
@@ -25,7 +25,6 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
 import org.apache.activemq.artemis.core.client.impl.Topology;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
 import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
 
@@ -97,11 +96,4 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis
     * @return
     */
    BridgeMetrics getBridgeMetrics(String nodeId);
-
-   /**
-    * Remove the store-and-forward queue after scale down
-    */
-   void removeSfQueue(SimpleString scaledDownNodeId);
-
-   void removeSfQueue(Queue queue);
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java
index 991b8b3..07f0fc2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java
@@ -35,7 +35,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnoun
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessageV2;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -196,10 +195,8 @@ public class ClusterControl implements AutoCloseable {
       return requestBackup(backupRequestMessage);
    }
 
-   public void announceScaleDown(SimpleString targetNodeId, SimpleString scaledDownNodeId, boolean isCleanupSfQueue) {
-
-      ScaleDownAnnounceMessage announceMessage = isCleanupSfQueue ? new ScaleDownAnnounceMessageV2(targetNodeId, scaledDownNodeId) : new ScaleDownAnnounceMessage(targetNodeId, scaledDownNodeId);
-
+   public void announceScaleDown(SimpleString targetNodeId, SimpleString scaledDownNodeId) {
+      ScaleDownAnnounceMessage announceMessage = new ScaleDownAnnounceMessage(targetNodeId, scaledDownNodeId);
       clusterChannel.send(announceMessage);
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
index 572a919..86cd0df 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.cluster;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -401,19 +400,12 @@ public class ClusterController implements ActiveMQComponent {
                Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote());
                ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString());
                clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote));
-            } else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT || packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2) {
+            } else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) {
                ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) packet;
                //we don't really need to check as it should always be true
                if (server.getNodeID().equals(message.getTargetNodeId())) {
                   server.addScaledDownNode(message.getScaledDownNodeId());
                }
-               if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2) {
-                  ClusterManager clusterManager = ClusterController.this.server.getClusterManager();
-                  Set<ClusterConnection> ccs = clusterManager.getClusterConnections();
-                  for (ClusterConnection cc : ccs) {
-                     cc.removeSfQueue(message.getScaledDownNodeId());
-                  }
-               }
             } else if (channelHandler != null) {
                channelHandler.handlePacket(packet);
             }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java
index a7db3e6..0ef96d5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java
@@ -41,25 +41,21 @@ public class ScaleDownPolicy {
 
    private boolean enabled;
 
-   private boolean isCleanupSfQueue;
-
    public ScaleDownPolicy() {
    }
 
-   public ScaleDownPolicy(List<String> connectors, String groupName, String clusterName, boolean enabled, boolean isCleanupSfQueue) {
+   public ScaleDownPolicy(List<String> connectors, String groupName, String clusterName, boolean enabled) {
       this.connectors = connectors;
       this.groupName = groupName;
       this.clusterName = clusterName;
       this.enabled = enabled;
-      this.isCleanupSfQueue = isCleanupSfQueue;
    }
 
-   public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled, boolean isCleanupSfQueue) {
+   public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled) {
       this.discoveryGroup = discoveryGroup;
       this.groupName = groupName;
       this.clusterName = clusterName;
       this.enabled = enabled;
-      this.isCleanupSfQueue = isCleanupSfQueue;
    }
 
    public List<String> getConnectors() {
@@ -128,8 +124,4 @@ public class ScaleDownPolicy {
                                                                     ActiveMQServer activeMQServer) {
       return activeMQServer.getConfiguration().getTransportConfigurations(connectorNames);
    }
-
-   public boolean isCleanupSfQueue() {
-      return this.isCleanupSfQueue;
-   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 6c692e7..1024118 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -1113,9 +1113,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
    }
 
-   protected void postStop() {
-   }
-
 
    // Inner classes -------------------------------------------------
 
@@ -1232,7 +1229,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
             logger.trace("Removing consumer on stopRunnable " + this + " from queue " + queue);
          }
          ActiveMQServerLogger.LOGGER.bridgeStopped(name);
-         postStop();
       }
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 5603dfd..96caf46 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -382,11 +382,6 @@ public class ClusterConnectionBridge extends BridgeImpl {
       super.nodeUP(member, last);
    }
 
-   @Override
-   protected void postStop() {
-      clusterConnection.removeSfQueue(queue);
-   }
-
 
    @Override
    protected void afterConnect() throws Exception {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index a8bf90e..6ee2da4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -710,7 +710,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
 
                // New node - create a new flow record
 
-               final SimpleString queueName = getSfQueueName(nodeID);
+               final SimpleString queueName = new SimpleString(storeAndForwardPrefix + name + "." + nodeID);
 
                Binding queueBinding = postOffice.getBinding(queueName);
 
@@ -741,10 +741,6 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
       }
    }
 
-   public SimpleString getSfQueueName(String nodeID) {
-      return new SimpleString(storeAndForwardPrefix + name + "." + nodeID);
-   }
-
    @Override
    public synchronized void informClusterOfBackup() {
       String nodeID = server.getNodeID().toString();
@@ -774,27 +770,6 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
       return record != null && record.getBridge() != null ? record.getBridge().getMetrics() : null;
    }
 
-   @Override
-   public void removeSfQueue(SimpleString scaledDownNodeId) {
-      SimpleString sfQName = getSfQueueName(scaledDownNodeId.toString());
-      Binding binding = server.getPostOffice().getBinding(sfQName);
-
-      if (binding != null) {
-         removeSfQueue((Queue) binding.getBindable());
-      }
-   }
-
-   @Override
-   public void removeSfQueue(Queue queue) {
-      if (queue.internalDelete()) {
-         try {
-            server.removeAddressInfo(queue.getAddress(), null);
-         } catch (Exception e) {
-            logger.debug("Failed to remove sf address: " + queue.getAddress(), e);
-         }
-      }
-   }
-
    private void createNewRecord(final long eventUID,
                                 final String targetNodeID,
                                 final TransportConfiguration connector,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
index 8d700ed..77b9f3f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
@@ -36,7 +36,6 @@ import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.cluster.ClusterController;
-import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -50,7 +49,6 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
    private ActiveMQServer parentServer;
    private ServerLocator locator;
    private final ClusterController clusterController;
-   private ScaleDownPolicy scaleDownPolicy;
 
    public BackupRecoveryJournalLoader(PostOffice postOffice,
                                       PagingManager pagingManager,
@@ -62,14 +60,12 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
                                       Configuration configuration,
                                       ActiveMQServer parentServer,
                                       ServerLocatorInternal locator,
-                                      ClusterController clusterController,
-                                      ScaleDownPolicy scaleDownPolicy) {
+                                      ClusterController clusterController) {
 
       super(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration);
       this.parentServer = parentServer;
       this.locator = locator;
       this.clusterController = clusterController;
-      this.scaleDownPolicy = scaleDownPolicy;
    }
 
    @Override
@@ -91,12 +87,11 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
    public void postLoad(Journal messageJournal,
                         ResourceManager resourceManager,
                         Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
-
       ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer.getStorageManager());
       locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
 
       try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) {
-         scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID(), this.scaleDownPolicy);
+         scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID());
       }
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
index bea060c..8dd160d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
@@ -179,7 +179,6 @@ public class LiveOnlyActivation extends Activation {
          DuplicateIDCache duplicateIDCache = activeMQServer.getPostOffice().getDuplicateIDCache(address);
          duplicateIDMap.put(address, duplicateIDCache.getMap());
       }
-
-      return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, activeMQServer.getResourceManager(), duplicateIDMap, activeMQServer.getManagementService().getManagementAddress(), null, this.liveOnlyPolicy.getScaleDownPolicy());
+      return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, activeMQServer.getResourceManager(), duplicateIDMap, activeMQServer.getManagementService().getManagementAddress(), null);
    }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 2f6e93b..ced4ec7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -315,8 +315,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private volatile long ringSize;
 
-   private Boolean removeSf;
-
    /**
     * This is to avoid multi-thread races on calculating direct delivery,
     * to guarantee ordering will be always be correct
@@ -2557,7 +2555,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    @Override
    public void setInternalQueue(boolean internalQueue) {
       this.internalQueue = internalQueue;
-      this.removeSf = null;
    }
 
    // Public
@@ -3487,29 +3484,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
-   /**
-    * Delete the store and forward queue
-    * Only the second caller (if there is one) of this method does the actual deletion.
-    * The logic makes sure the sf queue is deleted only after bridge is stopped.
-    */
-   @Override
-   public synchronized boolean internalDelete() {
-      if (this.isInternalQueue()) {
-         if (removeSf == null) {
-            removeSf = false;
-         } else if (removeSf == false) {
-            try {
-               deleteQueue();
-               removeSf = true;
-               return true;
-            } catch (Exception e) {
-               logger.debug("Error removing sf queue " + getName(), e);
-            }
-         }
-      }
-      return false;
-   }
-
    private boolean checkExpired(final MessageReference reference) {
       try {
          if (reference.getMessage().isExpired()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index d84d199..db51dcc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -57,7 +57,6 @@ import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
 import org.apache.activemq.artemis.core.server.cluster.ClusterController;
-import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperation;
@@ -92,15 +91,14 @@ public class ScaleDownHandler {
                          ResourceManager resourceManager,
                          Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
                          SimpleString managementAddress,
-                         SimpleString targetNodeId,
-                         ScaleDownPolicy scaleDownPolicy) throws Exception {
+                         SimpleString targetNodeId) throws Exception {
       ClusterControl clusterControl = clusterController.connectToNodeInCluster((ClientSessionFactoryInternal) sessionFactory);
       clusterControl.authorize();
       long num = scaleDownMessages(sessionFactory, targetNodeId, clusterControl.getClusterUser(), clusterControl.getClusterPassword());
       ActiveMQServerLogger.LOGGER.infoScaledDownMessages(num);
       scaleDownTransactions(sessionFactory, resourceManager, clusterControl.getClusterUser(), clusterControl.getClusterPassword());
       scaleDownDuplicateIDs(duplicateIDMap, sessionFactory, managementAddress, clusterControl.getClusterUser(), clusterControl.getClusterPassword());
-      clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), nodeManager.getNodeId(), scaleDownPolicy.isCleanupSfQueue());
+      clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), nodeManager.getNodeId());
       return num;
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
index 7adc190..587b8f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
@@ -411,7 +411,7 @@ public final class SharedNothingBackupActivation extends Activation {
                                             Configuration configuration,
                                             ActiveMQServer parentServer) throws ActiveMQException {
       if (replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled()) {
-         return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(replicaPolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController(), replicaPolicy.getScaleDownPolicy());
+         return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(replicaPolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController());
       } else {
          return super.createJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer);
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java
index 20ad7b3..c978ff6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java
@@ -172,7 +172,7 @@ public final class SharedStoreBackupActivation extends Activation {
                                             Configuration configuration,
                                             ActiveMQServer parentServer) throws ActiveMQException {
       if (sharedStoreSlavePolicy.getScaleDownPolicy() != null && sharedStoreSlavePolicy.getScaleDownPolicy().isEnabled()) {
-         return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(sharedStoreSlavePolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController(), sharedStoreSlavePolicy.getScaleDownPolicy());
+         return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(sharedStoreSlavePolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController());
       } else {
          return super.createJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer);
       }
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index fc76b6f..fea5cc6 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2881,13 +2881,6 @@
                </xsd:complexType>
             </xsd:element>
          </xsd:choice>
-         <xsd:element name="cleanup-sf-queue" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
-            <xsd:annotation>
-               <xsd:documentation>
-                  Tells the target node whether delete the store and forward queue after scale down.
-               </xsd:documentation>
-            </xsd:annotation>
-         </xsd:element>
       </xsd:sequence>
       <xsd:attributeGroup ref="xml:specialAttrs"/>
    </xsd:complexType>
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index 4129ee6..df1ee08 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -28,9 +28,7 @@ import org.apache.activemq.artemis.core.config.BridgeConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.FileDeploymentManager;
 import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
-import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
-import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
 import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
 import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -274,37 +272,6 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
       testParsingOverFlow("<bridges> \n" + "  <bridge name=\"price-forward-bridge\"> \n" + "    <queue-name>priceForwarding</queue-name>  \n" + "    <forwarding-address>newYorkPriceUpdates</forwarding-address>\n" + "    <producer-window-size>2147483648</producer-window-size>\n" + "    <static-connectors> \n" + "      <connector-ref>netty</connector-ref> \n" + "    </static-connectors> \n" + "  </bridge> \n" + "</bridges>\n");
    }
 
-   @Test
-   public void testParsingScaleDownConfig() throws Exception {
-      FileConfigurationParser parser = new FileConfigurationParser();
-
-      String configStr = firstPart + "<ha-policy>\n" +
-               "   <live-only>\n" +
-               "      <scale-down>\n" +
-               "         <connectors>\n" +
-               "            <connector-ref>server0-connector</connector-ref>\n" +
-               "         </connectors>\n" +
-               "         <cleanup-sf-queue>true</cleanup-sf-queue>\n" +
-               "      </scale-down>\n" +
-               "   </live-only>\n" +
-               "</ha-policy>\n" + lastPart;
-      ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
-
-      Configuration config = parser.parseMainConfig(input);
-
-      HAPolicyConfiguration haConfig = config.getHAPolicyConfiguration();
-      assertTrue(haConfig instanceof LiveOnlyPolicyConfiguration);
-
-      LiveOnlyPolicyConfiguration liveOnlyCfg = (LiveOnlyPolicyConfiguration) haConfig;
-      ScaleDownConfiguration scaledownCfg = liveOnlyCfg.getScaleDownConfiguration();
-      assertTrue(scaledownCfg.isCleanupSfQueue());
-      List<String> connectors = scaledownCfg.getConnectors();
-      assertEquals(1, connectors.size());
-      String connector = connectors.get(0);
-      assertEquals("server0-connector", connector);
-   }
-
-
    private void testParsingOverFlow(String config) throws Exception {
       FileConfigurationParser parser = new FileConfigurationParser();
       String firstPartWithoutAddressSettings = firstPart.substring(0, firstPart.indexOf("<address-settings"));
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 352688d..e0f6372 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -299,7 +299,6 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertNotNull(lopc.getScaleDownConfiguration());
       assertEquals(lopc.getScaleDownConfiguration().getGroupName(), "boo!");
       assertEquals(lopc.getScaleDownConfiguration().getDiscoveryGroup(), "dg1");
-      assertFalse(lopc.getScaleDownConfiguration().isCleanupSfQueue());
 
       for (ClusterConnectionConfiguration ccc : conf.getClusterConfigurations()) {
          if (ccc.getName().equals("cluster-connection1")) {
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index e266ef5..7f92981 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -889,11 +889,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public boolean internalDelete() {
-         return false;
-      }
-
-      @Override
       public void unproposed(SimpleString groupID) {
 
       }
diff --git a/docs/user-manual/en/ha.md b/docs/user-manual/en/ha.md
index 6df4648..1b82426 100644
--- a/docs/user-manual/en/ha.md
+++ b/docs/user-manual/en/ha.md
@@ -695,30 +695,6 @@ transactions are there for the client when it reconnects. The normal
 reconnect settings apply when the client is reconnecting so these should
 be high enough to deal with the time needed to scale down.
 
-#### Automatic Deleting Store-and-Forward Queue after Scale Down
-
-By default after the node is scaled down to a target node the internal
-SF queue is not deleted. There is a boolean configuration parameter called 
-"cleanup-sf-queue" that can be used in case you want to delete it.
-
-To do so you need to add this parameter to the scale-down policy and
-set it to "true". For example:
-
-```xml
-<ha-policy>
-   <live-only>
-      <scale-down>
-         ...
-         <cleanup-sf-queue>true</cleanup-sf-queue>
-      </scale-down>
-   </live-only>
-</ha-policy>
-```
-
-With the above config in place when the scale down node is
-stopped, it will send a message to the target node once the scale down
-is complete. The target node will then properly delete the SF queue and its address.
-
 ## Failover Modes
 
 Apache ActiveMQ Artemis defines two types of client failover:
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
deleted file mode 100644
index 92d2635..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.server;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.client.ClientMessage;
-import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
-import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
-import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
-import org.apache.activemq.artemis.core.server.AddressQueryResult;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
-import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-@RunWith(value = Parameterized.class)
-public class ScaleDownRemoveSFTest extends ClusterTestBase {
-
-   @Parameterized.Parameters(name = "RemoveOption={0}")
-   public static Collection getParameters() {
-      return Arrays.asList(new Object[][]{{"default"}, {"true"}, {"false"}});
-   }
-
-   public ScaleDownRemoveSFTest(String option) {
-      this.option = option;
-   }
-
-   private String option;
-
-   @Override
-   @Before
-   public void setUp() throws Exception {
-      super.setUp();
-
-      ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration();
-      if (!"default".equals(option)) {
-         scaleDownConfiguration.setCleanupSfQueue("true".equals(this.option));
-      }
-      setupLiveServer(0, isFileStorage(), isNetty(), true);
-      setupLiveServer(1, isFileStorage(), isNetty(), true);
-      LiveOnlyPolicyConfiguration haPolicyConfiguration0 = (LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration();
-      haPolicyConfiguration0.setScaleDownConfiguration(scaleDownConfiguration);
-      LiveOnlyPolicyConfiguration haPolicyConfiguration1 = (LiveOnlyPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration();
-      haPolicyConfiguration1.setScaleDownConfiguration(new ScaleDownConfiguration());
-
-      setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
-      setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
-      haPolicyConfiguration0.getScaleDownConfiguration().getConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
-      haPolicyConfiguration1.getScaleDownConfiguration().getConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
-      servers[0].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
-      servers[1].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
-      startServers(0, 1);
-      setupSessionFactory(0, isNetty());
-      setupSessionFactory(1, isNetty());
-   }
-
-   @Override
-   @After
-   public void tearDown() throws Exception {
-      super.tearDown();
-   }
-
-
-   protected boolean isNetty() {
-      return true;
-   }
-
-   @Test
-   public void testScaleDownCheckSF() throws Exception {
-      final int TEST_SIZE = 2;
-      final String addressName = "testAddress";
-      final String queueName1 = "testQueue1";
-
-      // create 2 queues on each node mapped to the same address
-      createQueue(0, addressName, queueName1, null, true);
-      createQueue(1, addressName, queueName1, null, true);
-
-      // send messages to node 0
-      send(0, addressName, TEST_SIZE, true, null);
-
-      // consume a message from queue 1
-      addConsumer(1, 0, queueName1, null, false);
-      ClientMessage clientMessage = consumers[1].getConsumer().receive(250);
-      Assert.assertNotNull(clientMessage);
-      clientMessage.acknowledge();
-      consumers[1].getSession().commit();
-
-      Assert.assertEquals(TEST_SIZE - 1, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()));
-
-      //check sf queue on server1 exists
-      ClusterConnectionImpl clusterconn1 = (ClusterConnectionImpl) servers[1].getClusterManager().getClusterConnection("cluster0");
-      SimpleString sfQueueName = clusterconn1.getSfQueueName(servers[0].getNodeID().toString());
-
-      System.out.println("[sf queue on server 1]: " + sfQueueName);
-      QueueQueryResult result = servers[1].queueQuery(sfQueueName);
-      assertTrue(result.isExists());
-
-      // trigger scaleDown from node 0 to node 1
-      servers[0].stop();
-
-      addConsumer(0, 1, queueName1, null);
-      clientMessage = consumers[0].getConsumer().receive(250);
-      Assert.assertNotNull(clientMessage);
-      clientMessage.acknowledge();
-
-      // ensure there are no more messages on queue 1
-      clientMessage = consumers[0].getConsumer().receive(250);
-      Assert.assertNull(clientMessage);
-      removeConsumer(0);
-
-      //check
-      result = servers[1].queueQuery(sfQueueName);
-      AddressQueryResult result2 = servers[1].addressQuery(sfQueueName);
-      if ("true".equals(option)) {
-         assertFalse(result.isExists());
-         assertFalse(result2.isExists());
-      } else {
-         assertTrue(result.isExists());
-         assertTrue(result2.isExists());
-      }
-
-   }
-
-}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 7b7890f..3137df9 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -197,11 +197,6 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public boolean internalDelete() {
-      return false;
-   }
-
-   @Override
    public boolean isPersistedPause() {
       return false;
    }


[activemq-artemis] 04/04: This closes #2841

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 320381a2c61b167a9e438c7ccb2540012afc51c7
Merge: 3cbd5a3 b846f35
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Sep 17 14:05:00 2019 -0400

    This closes #2841

 .../api/config/ActiveMQDefaultConfiguration.java   |   7 --
 .../core/protocol/core/impl/PacketImpl.java        |   1 -
 .../artemis/core/config/ConfigurationUtils.java    |   4 +-
 .../core/config/ScaleDownConfiguration.java        |  11 ---
 .../deployers/impl/FileConfigurationParser.java    |   4 +-
 .../artemis/core/protocol/ServerPacketDecoder.java |   6 --
 .../impl/wireformat/ScaleDownAnnounceMessage.java  |   8 +-
 .../wireformat/ScaleDownAnnounceMessageV2.java     |  32 -------
 .../apache/activemq/artemis/core/server/Queue.java |   4 +-
 .../core/server/cluster/ClusterConnection.java     |   8 --
 .../core/server/cluster/ClusterControl.java        |   7 +-
 .../core/server/cluster/ClusterController.java     |  10 +-
 .../core/server/cluster/ha/ScaleDownPolicy.java    |  12 +--
 .../core/server/cluster/impl/BridgeImpl.java       |  41 ++++----
 .../cluster/impl/ClusterConnectionBridge.java      |  19 ++--
 .../server/cluster/impl/ClusterConnectionImpl.java |  21 ----
 .../server/impl/BackupRecoveryJournalLoader.java   |   9 +-
 .../core/server/impl/LiveOnlyActivation.java       |   3 +-
 .../artemis/core/server/impl/QueueImpl.java        |  31 +-----
 .../artemis/core/server/impl/ScaleDownHandler.java |   6 +-
 .../server/impl/SharedNothingBackupActivation.java |   2 +-
 .../server/impl/SharedStoreBackupActivation.java   |   2 +-
 .../resources/schema/artemis-configuration.xsd     |   7 --
 .../config/impl/FileConfigurationParserTest.java   |   2 -
 .../core/config/impl/FileConfigurationTest.java    |   1 -
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   9 +-
 docs/user-manual/en/ha.md                          |  24 -----
 .../integration/server/ScaleDown3NodeTest.java     | 106 +++++----------------
 .../integration/server/ScaleDownRemoveSFTest.java  |  35 +------
 .../tests/unit/core/postoffice/impl/FakeQueue.java |  10 +-
 30 files changed, 95 insertions(+), 347 deletions(-)