You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/17 18:05:13 UTC

[activemq-artemis] 02/04: ARTEMIS-2462 re-applying tests on SNF Delete Queue

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit dd20f89bd011d673d4fb6e664e9e68a8ca8348d7
Author: Howard Gao <ho...@gmail.com>
AuthorDate: Mon Sep 16 21:44:25 2019 -0400

    ARTEMIS-2462 re-applying tests on SNF Delete Queue
---
 .../server/cluster/impl/ClusterConnectionImpl.java |   6 +-
 .../config/impl/FileConfigurationParserTest.java   |  31 +++++
 .../integration/server/ScaleDownRemoveSFTest.java  | 127 +++++++++++++++++++++
 3 files changed, 163 insertions(+), 1 deletion(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 6ee2da4..d6f34c9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -710,7 +710,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
 
                // New node - create a new flow record
 
-               final SimpleString queueName = new SimpleString(storeAndForwardPrefix + name + "." + nodeID);
+               final SimpleString queueName = getSfQueueName(nodeID);
 
                Binding queueBinding = postOffice.getBinding(queueName);
 
@@ -741,6 +741,10 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
       }
    }
 
+   public SimpleString getSfQueueName(String nodeID) {
+      return new SimpleString(storeAndForwardPrefix + name + "." + nodeID);
+   }
+
    @Override
    public synchronized void informClusterOfBackup() {
       String nodeID = server.getNodeID().toString();
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index df1ee08..6e41d1c 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -28,7 +28,9 @@ import org.apache.activemq.artemis.core.config.BridgeConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.FileDeploymentManager;
 import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
 import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
 import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -272,6 +274,35 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
       testParsingOverFlow("<bridges> \n" + "  <bridge name=\"price-forward-bridge\"> \n" + "    <queue-name>priceForwarding</queue-name>  \n" + "    <forwarding-address>newYorkPriceUpdates</forwarding-address>\n" + "    <producer-window-size>2147483648</producer-window-size>\n" + "    <static-connectors> \n" + "      <connector-ref>netty</connector-ref> \n" + "    </static-connectors> \n" + "  </bridge> \n" + "</bridges>\n");
    }
 
+   @Test
+   public void testParsingScaleDownConfig() throws Exception {
+      FileConfigurationParser parser = new FileConfigurationParser();
+
+      String configStr = firstPart + "<ha-policy>\n" +
+               "   <live-only>\n" +
+               "      <scale-down>\n" +
+               "         <connectors>\n" +
+               "            <connector-ref>server0-connector</connector-ref>\n" +
+               "         </connectors>\n" +
+               "      </scale-down>\n" +
+               "   </live-only>\n" +
+               "</ha-policy>\n" + lastPart;
+      ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
+
+      Configuration config = parser.parseMainConfig(input);
+
+      HAPolicyConfiguration haConfig = config.getHAPolicyConfiguration();
+      assertTrue(haConfig instanceof LiveOnlyPolicyConfiguration);
+
+      LiveOnlyPolicyConfiguration liveOnlyCfg = (LiveOnlyPolicyConfiguration) haConfig;
+      ScaleDownConfiguration scaledownCfg = liveOnlyCfg.getScaleDownConfiguration();
+      List<String> connectors = scaledownCfg.getConnectors();
+      assertEquals(1, connectors.size());
+      String connector = connectors.get(0);
+      assertEquals("server0-connector", connector);
+   }
+
+
    private void testParsingOverFlow(String config) throws Exception {
       FileConfigurationParser parser = new FileConfigurationParser();
       String firstPartWithoutAddressSettings = firstPart.substring(0, firstPart.indexOf("<address-settings"));
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
new file mode 100644
index 0000000..ed9c3e6
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.server;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class ScaleDownRemoveSFTest extends ClusterTestBase {
+
+   public ScaleDownRemoveSFTest() {
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration();
+      setupLiveServer(0, isFileStorage(), isNetty(), true);
+      setupLiveServer(1, isFileStorage(), isNetty(), true);
+      LiveOnlyPolicyConfiguration haPolicyConfiguration0 = (LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration();
+      haPolicyConfiguration0.setScaleDownConfiguration(scaleDownConfiguration);
+      LiveOnlyPolicyConfiguration haPolicyConfiguration1 = (LiveOnlyPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration();
+      haPolicyConfiguration1.setScaleDownConfiguration(new ScaleDownConfiguration());
+
+      setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+      haPolicyConfiguration0.getScaleDownConfiguration().getConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
+      haPolicyConfiguration1.getScaleDownConfiguration().getConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
+      servers[0].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
+      servers[1].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
+      startServers(0, 1);
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      super.tearDown();
+   }
+
+
+   protected boolean isNetty() {
+      return true;
+   }
+
+   @Test
+   public void testScaleDownCheckSF() throws Exception {
+      final int TEST_SIZE = 2;
+      final String addressName = "testAddress";
+      final String queueName1 = "testQueue1";
+
+      // create 2 queues on each node mapped to the same address
+      createQueue(0, addressName, queueName1, null, true);
+      createQueue(1, addressName, queueName1, null, true);
+
+      // send messages to node 0
+      send(0, addressName, TEST_SIZE, true, null);
+
+      // consume a message from queue 1
+      addConsumer(1, 0, queueName1, null, false);
+      ClientMessage clientMessage = consumers[1].getConsumer().receive(250);
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+      consumers[1].getSession().commit();
+
+      Assert.assertEquals(TEST_SIZE - 1, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()));
+
+      //check sf queue on server1 exists
+      ClusterConnectionImpl clusterconn1 = (ClusterConnectionImpl) servers[1].getClusterManager().getClusterConnection("cluster0");
+      SimpleString sfQueueName = clusterconn1.getSfQueueName(servers[0].getNodeID().toString());
+
+      System.out.println("[sf queue on server 1]: " + sfQueueName);
+      QueueQueryResult result = servers[1].queueQuery(sfQueueName);
+      assertTrue(result.isExists());
+
+      // trigger scaleDown from node 0 to node 1
+      servers[0].stop();
+
+      addConsumer(0, 1, queueName1, null);
+      clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+
+      // ensure there are no more messages on queue 1
+      clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNull(clientMessage);
+      removeConsumer(0);
+
+      //check
+      result = servers[1].queueQuery(sfQueueName);
+      AddressQueryResult result2 = servers[1].addressQuery(sfQueueName);
+      assertFalse(result.isExists());
+      assertFalse(result2.isExists());
+
+   }
+
+}