You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/10/21 09:47:04 UTC
[activemq-artemis] branch main updated: ARTEMIS-3495 Fix backup
cluster controller connection loops
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new dca3fac ARTEMIS-3495 Fix backup cluster controller connection loops
dca3fac is described below
commit dca3facb55f6d848988d5a25b5640b92bad8b01d
Author: Domenico Francesco Bruscino <br...@apache.org>
AuthorDate: Mon Oct 18 16:48:59 2021 +0200
ARTEMIS-3495 Fix backup cluster controller connection loops
Skip backup connector equivalent to cluster connector for cluster connections.
---
.../core/client/impl/ClientSessionFactoryImpl.java | 4 +-
.../core/client/impl/ServerLocatorImpl.java | 5 ++
.../artemis/core/server/cluster/BackupManager.java | 7 +++
.../core/server/cluster/ClusterController.java | 10 +++-
.../core/server/cluster/ClusterManager.java | 2 +-
.../server/cluster/impl/ClusterConnectionImpl.java | 8 +++
.../integration/cluster/failover/FailoverTest.java | 57 ++++++++++++++++++++++
7 files changed, 89 insertions(+), 4 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 80a2428..48e70ff 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -289,7 +289,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
localConnector = connectorFactory.createConnector(currentConnectorConfig.getParams(), new DelegatingBufferHandler(), this, closeExecutor, threadPool, scheduledThreadPool, clientProtocolManager);
}
- if (localConnector.isEquivalent(live.getParams()) && backUp != null && !localConnector.isEquivalent(backUp.getParams())) {
+ if (localConnector.isEquivalent(live.getParams()) && backUp != null && !localConnector.isEquivalent(backUp.getParams())
+ // check if a server is trying to set its cluster connector config as backup connector config
+ && !(serverLocator.getClusterTransportConfiguration() != null && serverLocator.getClusterTransportConfiguration().isSameParams(backUp))) {
if (logger.isDebugEnabled()) {
logger.debug("Setting up backup config = " + backUp + " for live = " + live);
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index 363849f..62fbef1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -170,6 +170,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return discoveryGroup;
}
+ /** For tests only */
+ public Set<ClientSessionFactoryInternal> getFactories() {
+ return factories;
+ }
+
private final Exception traceException = new Exception();
private ServerLocatorConfig config = new ServerLocatorConfig();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
index 62b9563..1bf9919 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
@@ -195,6 +195,12 @@ public class BackupManager implements ActiveMQComponent {
private volatile boolean announcingBackup;
private volatile boolean backupAnnounced = false;
+
+ public TransportConfiguration getConnector() {
+ return connector;
+ }
+
+
@Override
public String toString() {
return "BackupConnector{" + "name='" + config.getName() + '\'' + ", connector=" + connector + '}';
@@ -363,6 +369,7 @@ public class BackupManager implements ActiveMQComponent {
}
ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
locator.setClusterConnection(true);
+ locator.setClusterTransportConfiguration(getConnector());
locator.setRetryInterval(config.getRetryInterval());
locator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
locator.setConnectionTTL(config.getConnectionTTL());
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
index ec9f153..6547613 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
@@ -80,6 +80,11 @@ public class ClusterController implements ActiveMQComponent {
private boolean started;
private SimpleString replicatedClusterName;
+ /** For tests only */
+ public ServerLocator getDefaultLocator() {
+ return defaultLocator;
+ }
+
public ClusterController(ActiveMQServer server,
ScheduledExecutorService scheduledExecutor,
boolean useQuorumManager) {
@@ -207,9 +212,10 @@ public class ClusterController implements ActiveMQComponent {
*/
public void addClusterConnection(SimpleString name,
TransportConfiguration[] tcConfigs,
- ClusterConnectionConfiguration config) {
+ ClusterConnectionConfiguration config,
+ TransportConfiguration connector) {
ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(tcConfigs);
- configAndAdd(name, serverLocator, config, null);
+ configAndAdd(name, serverLocator, config, connector);
}
private void configAndAdd(SimpleString name,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index 44a81e4..a617e23 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -639,7 +639,7 @@ public class ClusterManager implements ActiveMQComponent {
clusterConnection = new ClusterConnectionImpl(this, tcConfigs, connector, new SimpleString(config.getName()), new SimpleString(config.getAddress()), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicat [...]
- clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs, config);
+ clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs, config, connector);
}
if (defaultClusterConnection == null) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 2e3f8e3..8820885 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -179,6 +179,13 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
private boolean splitBrainDetection;
+
+ /** For tests only */
+ public ServerLocatorInternal getServerLocator() {
+ return serverLocator;
+ }
+
+
public ClusterConnectionImpl(final ClusterManager manager,
final TransportConfiguration[] staticTranspConfigs,
final TransportConfiguration connector,
@@ -1553,6 +1560,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
}
ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
locator.setClusterConnection(true);
+ locator.setClusterTransportConfiguration(connector);
return locator;
}
return null;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index bc6e151..73b7bf6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -46,12 +46,14 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
@@ -59,6 +61,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
+import org.apache.activemq.artemis.core.server.cluster.BackupManager;
+import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicaPolicy;
@@ -67,7 +71,9 @@ import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationBackupPolic
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationPrimaryPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
+import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.files.FileMoveManager;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
@@ -80,6 +86,7 @@ import org.apache.activemq.artemis.utils.RetryRule;
import org.apache.activemq.artemis.utils.Wait;
import org.jboss.logging.Logger;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -2459,6 +2466,56 @@ public class FailoverTest extends FailoverTestBase {
Assert.assertEquals("message0", cm.getBodyBuffer().readString());
}
+ @Test(timeout = 120000)
+ public void testBackupConnections() throws Exception {
+ Assume.assumeTrue(backupServer.getServer().getHAPolicy().isBackup());
+
+ createSessionFactory();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ sf.addFailoverListener(eventType -> {
+ if (eventType == FailoverEventType.FAILOVER_COMPLETED) {
+ latch.countDown();
+ }
+ });
+
+ BackupManager backupManager = ((ActiveMQServerImpl)backupServer.getServer()).getBackupManager();
+ ClusterController backupClusterController = backupServer.getServer().getClusterManager().getClusterController();
+ ClusterConnectionImpl backupClusterConnection = (ClusterConnectionImpl)backupServer.getServer().getClusterManager().getClusterConnections().stream().findFirst().get();
+
+ for (BackupManager.BackupConnector backupConnector : backupManager.getBackupConnectors()) {
+ for (ClientSessionFactoryInternal factory : ((ServerLocatorImpl)backupConnector.getBackupServerLocator()).getFactories()) {
+ Assert.assertNotNull(factory.getConnection());
+ }
+ }
+
+ for (ClientSessionFactoryInternal factory : ((ServerLocatorImpl)backupClusterController.getDefaultLocator()).getFactories()) {
+ Assert.assertNotNull(factory.getConnection());
+ }
+
+ Assert.assertNull(backupClusterConnection.getServerLocator());
+
+ Assert.assertNotNull(sf.getConnection());
+
+ crash();
+
+ latch.await();
+
+ for (BackupManager.BackupConnector backupConnector : backupManager.getBackupConnectors()) {
+ Assert.assertNull(backupConnector.getBackupServerLocator());
+ }
+
+ for (ClientSessionFactoryInternal factory : ((ServerLocatorImpl)backupServer.getServer().getClusterManager().getClusterController().getDefaultLocator()).getFactories()) {
+ Assert.assertNull(factory.getConnection());
+ }
+
+ for (ClientSessionFactoryInternal factory : ((ServerLocatorImpl)backupClusterConnection.getServerLocator()).getFactories()) {
+ Assert.assertNull(factory.getConnection());
+ }
+
+ Assert.assertNotNull(sf.getConnection());
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------