You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2018/11/02 16:13:44 UTC

[2/2] activemq-artemis git commit: ARTEMIS-2160: Addressed occurance where cluster configuration on server locator was hard coded. Covered with test.

ARTEMIS-2160: Addressed occurance where cluster configuration on server locator was hard coded. Covered with test.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5f74faa3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5f74faa3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5f74faa3

Branch: refs/heads/master
Commit: 5f74faa34a805ebf3222f7b05d2735fbbeae50bd
Parents: a2264c5
Author: Roddie Kieley <rk...@unifiedsoftworx.com>
Authored: Wed Oct 31 15:04:07 2018 -0230
Committer: Justin Bertram <jb...@apache.org>
Committed: Fri Nov 2 11:12:35 2018 -0500

----------------------------------------------------------------------
 .../core/server/cluster/ClusterController.java  | 11 ++-
 .../cluster/ClusterControllerTest.java          | 85 +++++++++++++++++++-
 .../cluster/distribution/ClusterTestBase.java   | 74 ++++++++++++-----
 3 files changed, 146 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f74faa3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
index 03eb243..15cf04e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
@@ -185,8 +185,11 @@ public class ClusterController implements ActiveMQComponent {
       serverLocator.setConnectionTTL(config.getConnectionTTL());
       serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
       //if the cluster isn't available we want to hang around until it is
-      serverLocator.setReconnectAttempts(-1);
-      serverLocator.setInitialConnectAttempts(-1);
+      serverLocator.setReconnectAttempts(config.getReconnectAttempts());
+      serverLocator.setInitialConnectAttempts(config.getInitialConnectAttempts());
+      serverLocator.setRetryInterval(config.getRetryInterval());
+      serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
+      serverLocator.setMaxRetryInterval(config.getMaxRetryInterval());
       //this is used for replication so need to use the server packet decoder
       serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
       serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool());
@@ -438,4 +441,8 @@ public class ClusterController implements ActiveMQComponent {
       return this.replicationLocator;
    }
 
+   public ServerLocator getServerLocator(SimpleString name) {
+      return locators.get(name);
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f74faa3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
index 91857ef..f7cbd62 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
@@ -16,12 +16,19 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster;
 
+import java.util.List;
 import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
 import org.apache.activemq.artemis.core.server.cluster.ClusterController;
+import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
 import org.junit.Before;
@@ -29,6 +36,9 @@ import org.junit.Test;
 
 public class ClusterControllerTest extends ClusterTestBase {
 
+   private ClusterConnectionConfiguration clusterConf0;
+   private ClusterConnectionConfiguration clusterConf1;
+
    @Override
    @Before
    public void setUp() throws Exception {
@@ -45,13 +55,74 @@ public class ClusterControllerTest extends ClusterTestBase {
 
       getServer(1).getConfiguration().setClusterPassword("something different");
 
-      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 0);
-      setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 1);
+      clusterConf0 = new ClusterConnectionConfiguration()
+         .setName("cluster0")
+         .setAddress("queues")
+         .setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND)
+         .setMaxHops(1)
+         .setInitialConnectAttempts(8)
+         .setReconnectAttempts(10)
+         .setRetryInterval(250)
+         .setMaxRetryInterval(4000)
+         .setRetryIntervalMultiplier(2.0);
+
+      clusterConf1 = new ClusterConnectionConfiguration()
+         .setName("cluster0")
+         .setAddress("queues")
+         .setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND)
+         .setMaxHops(1)
+         .setInitialConnectAttempts(8)
+         .setReconnectAttempts(10)
+         .setRetryInterval(250)
+         .setMaxRetryInterval(4000)
+         .setRetryIntervalMultiplier(2.0);
+
+      setupClusterConnection(clusterConf0, true, 0);
+      setupClusterConnection(clusterConf1, true, 1);
 
       startServers(0);
       startServers(1);
    }
 
+   private boolean clusterConnectionConfigurationIsSameBeforeAfterStart(ClusterConnectionConfiguration clusterConnectionConfigurationBeforeStart, int node) {
+      boolean clusterConnectionConfigurationIsSame = false;
+
+      Configuration serverNodeConfiguration = getServer(node).getConfiguration();
+      ActiveMQServer serverNode = getServer(node);
+      ClusterManager clusterManager = serverNode.getClusterManager();
+      ClusterController clusterController = clusterManager.getClusterController();
+      ServerLocator serverNodeLocator = clusterController.getServerLocator(new SimpleString(clusterConnectionConfigurationBeforeStart.getName()));
+      List<ClusterConnectionConfiguration> serverNodeClusterConnectionConfigurations = serverNodeConfiguration.getClusterConfigurations();
+
+      do {
+         if (serverNodeLocator.getInitialConnectAttempts() != clusterConnectionConfigurationBeforeStart.getInitialConnectAttempts()) {
+            break;
+         }
+
+         if (serverNodeLocator.getReconnectAttempts() != clusterConnectionConfigurationBeforeStart.getReconnectAttempts()) {
+            break;
+         }
+
+         if (serverNodeLocator.getRetryInterval() != clusterConnectionConfigurationBeforeStart.getRetryInterval()) {
+            break;
+         }
+         if (serverNodeLocator.getMaxRetryInterval() != clusterConnectionConfigurationBeforeStart.getMaxRetryInterval()) {
+            break;
+         }
+
+         Double serverNodeClusterConnectionConfigurationRIM = serverNodeLocator.getRetryIntervalMultiplier();
+         Double clusterConnectionConfigurationBeforeStartRIM = clusterConnectionConfigurationBeforeStart.getRetryIntervalMultiplier();
+         if (0 != serverNodeClusterConnectionConfigurationRIM.compareTo(clusterConnectionConfigurationBeforeStartRIM)) {
+            break;
+         }
+
+         clusterConnectionConfigurationIsSame = true;
+      }
+      while (false);
+
+      return clusterConnectionConfigurationIsSame;
+   }
+
    @Test
    public void controlWithDifferentConnector() throws Exception {
       try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) {
@@ -76,4 +147,14 @@ public class ClusterControllerTest extends ClusterTestBase {
          }
       }
    }
+
+   @Test
+   public void verifyServerLocatorsClusterConfiguration() {
+      if (false == clusterConnectionConfigurationIsSameBeforeAfterStart(clusterConf0, 0)) {
+         fail("serverLocator is not configured as per clusterConf0");
+      }
+      if (false == clusterConnectionConfigurationIsSameBeforeAfterStart(clusterConf1, 1)) {
+         fail("serverLocator is not configured as per clusterConf1");
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f74faa3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index 6e7f9b3..0e4c960 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -1755,6 +1755,36 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
       setupClusterConnection(name, address, messageLoadBalancingType, maxHops, netty, null, nodeFrom, nodesTo);
    }
 
+   private List<String> getClusterConnectionTCNames(boolean netty, ActiveMQServer serverFrom, int[] nodesTo) {
+      List<String> pairs = new ArrayList<>();
+      for (int element : nodesTo) {
+         TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
+         serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
+         pairs.add(serverTotc.getName());
+      }
+      return pairs;
+   }
+
+   protected void setupClusterConnection(ClusterConnectionConfiguration clusterConf,
+                                         final boolean netty,
+                                         final int nodeFrom,
+                                         final int... nodesTo) {
+      ActiveMQServer serverFrom = servers[nodeFrom];
+
+      if (serverFrom == null) {
+         throw new IllegalStateException("No server at node " + nodeFrom);
+      }
+
+      TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
+      serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
+
+      List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
+      Configuration config = serverFrom.getConfiguration();
+      clusterConf.setConnectorName(connectorFrom.getName()).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
+
+      config.getClusterConfigurations().add(clusterConf);
+   }
+
    protected void setupClusterConnection(final String name,
                                          final String address,
                                          final MessageLoadBalancingType messageLoadBalancingType,
@@ -1772,12 +1802,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
       TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
       serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
 
-      List<String> pairs = new ArrayList<>();
-      for (int element : nodesTo) {
-         TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
-         serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
-         pairs.add(serverTotc.getName());
-      }
+      List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
       Configuration config = serverFrom.getConfiguration();
       ClusterConnectionConfiguration clusterConf = createClusterConfig(name, address, messageLoadBalancingType, maxHops, connectorFrom, pairs);
 
@@ -1805,15 +1830,21 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
       TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
       serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
 
-      List<String> pairs = new ArrayList<>();
-      for (int element : nodesTo) {
-         TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
-         serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
-         pairs.add(serverTotc.getName());
-      }
+      List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
       Configuration config = serverFrom.getConfiguration();
 
-      ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorFrom.getName()).setRetryInterval(retryInterval).setReconnectAttempts(reconnectAttempts).setCallTimeout(100).setCallFailoverTimeout(100).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
+      ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration()
+         .setName(name)
+         .setAddress(address)
+         .setConnectorName(connectorFrom.getName())
+         .setRetryInterval(retryInterval)
+         .setReconnectAttempts(reconnectAttempts)
+         .setCallTimeout(100)
+         .setCallFailoverTimeout(100)
+         .setMessageLoadBalancingType(messageLoadBalancingType)
+         .setMaxHops(maxHops)
+         .setConfirmationWindowSize(1024)
+         .setStaticConnectors(pairs);
 
       config.getClusterConfigurations().add(clusterConf);
    }
@@ -1824,7 +1855,15 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
                                                               final int maxHops,
                                                               TransportConfiguration connectorFrom,
                                                               List<String> pairs) {
-      return new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorFrom.getName()).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
+      return new ClusterConnectionConfiguration()
+         .setName(name)
+         .setAddress(address)
+         .setConnectorName(connectorFrom.getName())
+         .setRetryInterval(250)
+         .setMessageLoadBalancingType(messageLoadBalancingType)
+         .setMaxHops(maxHops)
+         .setConfirmationWindowSize(1024)
+         .setStaticConnectors(pairs);
    }
 
    protected void setupClusterConnectionWithBackups(final String name,
@@ -1843,12 +1882,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
       TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
       serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom);
 
-      List<String> pairs = new ArrayList<>();
-      for (int element : nodesTo) {
-         TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
-         serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
-         pairs.add(serverTotc.getName());
-      }
+      List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
       Configuration config = serverFrom.getConfiguration();
 
       ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(name).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);