You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2018/11/02 16:13:44 UTC
[2/2] activemq-artemis git commit: ARTEMIS-2160: Addressed occurance
where cluster configuration on server locator was hard coded. Covered with
test.
ARTEMIS-2160: Addressed occurance where cluster configuration on server locator was hard coded. Covered with test.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5f74faa3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5f74faa3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5f74faa3
Branch: refs/heads/master
Commit: 5f74faa34a805ebf3222f7b05d2735fbbeae50bd
Parents: a2264c5
Author: Roddie Kieley <rk...@unifiedsoftworx.com>
Authored: Wed Oct 31 15:04:07 2018 -0230
Committer: Justin Bertram <jb...@apache.org>
Committed: Fri Nov 2 11:12:35 2018 -0500
----------------------------------------------------------------------
.../core/server/cluster/ClusterController.java | 11 ++-
.../cluster/ClusterControllerTest.java | 85 +++++++++++++++++++-
.../cluster/distribution/ClusterTestBase.java | 74 ++++++++++++-----
3 files changed, 146 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f74faa3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
index 03eb243..15cf04e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
@@ -185,8 +185,11 @@ public class ClusterController implements ActiveMQComponent {
serverLocator.setConnectionTTL(config.getConnectionTTL());
serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
//if the cluster isn't available we want to hang around until it is
- serverLocator.setReconnectAttempts(-1);
- serverLocator.setInitialConnectAttempts(-1);
+ serverLocator.setReconnectAttempts(config.getReconnectAttempts());
+ serverLocator.setInitialConnectAttempts(config.getInitialConnectAttempts());
+ serverLocator.setRetryInterval(config.getRetryInterval());
+ serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
+ serverLocator.setMaxRetryInterval(config.getMaxRetryInterval());
//this is used for replication so need to use the server packet decoder
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool());
@@ -438,4 +441,8 @@ public class ClusterController implements ActiveMQComponent {
return this.replicationLocator;
}
+ public ServerLocator getServerLocator(SimpleString name) {
+ return locators.get(name);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f74faa3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
index 91857ef..f7cbd62 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
@@ -16,12 +16,19 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster;
+import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
+import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.junit.Before;
@@ -29,6 +36,9 @@ import org.junit.Test;
public class ClusterControllerTest extends ClusterTestBase {
+ private ClusterConnectionConfiguration clusterConf0;
+ private ClusterConnectionConfiguration clusterConf1;
+
@Override
@Before
public void setUp() throws Exception {
@@ -45,13 +55,74 @@ public class ClusterControllerTest extends ClusterTestBase {
getServer(1).getConfiguration().setClusterPassword("something different");
- setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 0);
- setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 1);
+ clusterConf0 = new ClusterConnectionConfiguration()
+ .setName("cluster0")
+ .setAddress("queues")
+ .setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND)
+ .setMaxHops(1)
+ .setInitialConnectAttempts(8)
+ .setReconnectAttempts(10)
+ .setRetryInterval(250)
+ .setMaxRetryInterval(4000)
+ .setRetryIntervalMultiplier(2.0);
+
+ clusterConf1 = new ClusterConnectionConfiguration()
+ .setName("cluster0")
+ .setAddress("queues")
+ .setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND)
+ .setMaxHops(1)
+ .setInitialConnectAttempts(8)
+ .setReconnectAttempts(10)
+ .setRetryInterval(250)
+ .setMaxRetryInterval(4000)
+ .setRetryIntervalMultiplier(2.0);
+
+ setupClusterConnection(clusterConf0, true, 0);
+ setupClusterConnection(clusterConf1, true, 1);
startServers(0);
startServers(1);
}
+ private boolean clusterConnectionConfigurationIsSameBeforeAfterStart(ClusterConnectionConfiguration clusterConnectionConfigurationBeforeStart, int node) {
+ boolean clusterConnectionConfigurationIsSame = false;
+
+ Configuration serverNodeConfiguration = getServer(node).getConfiguration();
+ ActiveMQServer serverNode = getServer(node);
+ ClusterManager clusterManager = serverNode.getClusterManager();
+ ClusterController clusterController = clusterManager.getClusterController();
+ ServerLocator serverNodeLocator = clusterController.getServerLocator(new SimpleString(clusterConnectionConfigurationBeforeStart.getName()));
+ List<ClusterConnectionConfiguration> serverNodeClusterConnectionConfigurations = serverNodeConfiguration.getClusterConfigurations();
+
+ do {
+ if (serverNodeLocator.getInitialConnectAttempts() != clusterConnectionConfigurationBeforeStart.getInitialConnectAttempts()) {
+ break;
+ }
+
+ if (serverNodeLocator.getReconnectAttempts() != clusterConnectionConfigurationBeforeStart.getReconnectAttempts()) {
+ break;
+ }
+
+ if (serverNodeLocator.getRetryInterval() != clusterConnectionConfigurationBeforeStart.getRetryInterval()) {
+ break;
+ }
+ if (serverNodeLocator.getMaxRetryInterval() != clusterConnectionConfigurationBeforeStart.getMaxRetryInterval()) {
+ break;
+ }
+
+ Double serverNodeClusterConnectionConfigurationRIM = serverNodeLocator.getRetryIntervalMultiplier();
+ Double clusterConnectionConfigurationBeforeStartRIM = clusterConnectionConfigurationBeforeStart.getRetryIntervalMultiplier();
+ if (0 != serverNodeClusterConnectionConfigurationRIM.compareTo(clusterConnectionConfigurationBeforeStartRIM)) {
+ break;
+ }
+
+ clusterConnectionConfigurationIsSame = true;
+ }
+ while (false);
+
+ return clusterConnectionConfigurationIsSame;
+ }
+
@Test
public void controlWithDifferentConnector() throws Exception {
try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) {
@@ -76,4 +147,14 @@ public class ClusterControllerTest extends ClusterTestBase {
}
}
}
+
+ @Test
+ public void verifyServerLocatorsClusterConfiguration() {
+ if (false == clusterConnectionConfigurationIsSameBeforeAfterStart(clusterConf0, 0)) {
+ fail("serverLocator is not configured as per clusterConf0");
+ }
+ if (false == clusterConnectionConfigurationIsSameBeforeAfterStart(clusterConf1, 1)) {
+ fail("serverLocator is not configured as per clusterConf1");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5f74faa3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index 6e7f9b3..0e4c960 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -1755,6 +1755,36 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
setupClusterConnection(name, address, messageLoadBalancingType, maxHops, netty, null, nodeFrom, nodesTo);
}
+ private List<String> getClusterConnectionTCNames(boolean netty, ActiveMQServer serverFrom, int[] nodesTo) {
+ List<String> pairs = new ArrayList<>();
+ for (int element : nodesTo) {
+ TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
+ serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
+ pairs.add(serverTotc.getName());
+ }
+ return pairs;
+ }
+
+ protected void setupClusterConnection(ClusterConnectionConfiguration clusterConf,
+ final boolean netty,
+ final int nodeFrom,
+ final int... nodesTo) {
+ ActiveMQServer serverFrom = servers[nodeFrom];
+
+ if (serverFrom == null) {
+ throw new IllegalStateException("No server at node " + nodeFrom);
+ }
+
+ TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
+ serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
+
+ List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
+ Configuration config = serverFrom.getConfiguration();
+ clusterConf.setConnectorName(connectorFrom.getName()).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
+
+ config.getClusterConfigurations().add(clusterConf);
+ }
+
protected void setupClusterConnection(final String name,
final String address,
final MessageLoadBalancingType messageLoadBalancingType,
@@ -1772,12 +1802,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
- List<String> pairs = new ArrayList<>();
- for (int element : nodesTo) {
- TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
- serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
- pairs.add(serverTotc.getName());
- }
+ List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
Configuration config = serverFrom.getConfiguration();
ClusterConnectionConfiguration clusterConf = createClusterConfig(name, address, messageLoadBalancingType, maxHops, connectorFrom, pairs);
@@ -1805,15 +1830,21 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
- List<String> pairs = new ArrayList<>();
- for (int element : nodesTo) {
- TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
- serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
- pairs.add(serverTotc.getName());
- }
+ List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
Configuration config = serverFrom.getConfiguration();
- ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorFrom.getName()).setRetryInterval(retryInterval).setReconnectAttempts(reconnectAttempts).setCallTimeout(100).setCallFailoverTimeout(100).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
+ ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration()
+ .setName(name)
+ .setAddress(address)
+ .setConnectorName(connectorFrom.getName())
+ .setRetryInterval(retryInterval)
+ .setReconnectAttempts(reconnectAttempts)
+ .setCallTimeout(100)
+ .setCallFailoverTimeout(100)
+ .setMessageLoadBalancingType(messageLoadBalancingType)
+ .setMaxHops(maxHops)
+ .setConfirmationWindowSize(1024)
+ .setStaticConnectors(pairs);
config.getClusterConfigurations().add(clusterConf);
}
@@ -1824,7 +1855,15 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
final int maxHops,
TransportConfiguration connectorFrom,
List<String> pairs) {
- return new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorFrom.getName()).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
+ return new ClusterConnectionConfiguration()
+ .setName(name)
+ .setAddress(address)
+ .setConnectorName(connectorFrom.getName())
+ .setRetryInterval(250)
+ .setMessageLoadBalancingType(messageLoadBalancingType)
+ .setMaxHops(maxHops)
+ .setConfirmationWindowSize(1024)
+ .setStaticConnectors(pairs);
}
protected void setupClusterConnectionWithBackups(final String name,
@@ -1843,12 +1882,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom);
- List<String> pairs = new ArrayList<>();
- for (int element : nodesTo) {
- TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
- serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
- pairs.add(serverTotc.getName());
- }
+ List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
Configuration config = serverFrom.getConfiguration();
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(name).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);