You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/02/25 15:19:13 UTC
[activemq-artemis] 02/03: ARTEMIS-3138 Shared Nothing Live broker
shouldn't try to connect to itself (2)
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 6c7231928ffdd1b057c51a306ba55649446d74a1
Author: franz1981 <ni...@gmail.com>
AuthorDate: Wed Feb 24 17:43:51 2021 +0100
ARTEMIS-3138 Shared Nothing Live broker shouldn't try to connect to itself (2)
---
.../core/client/impl/ServerLocatorImpl.java | 12 ++++++-----
.../core/server/cluster/ClusterController.java | 23 +++++++++++-----------
.../core/server/cluster/ClusterManager.java | 2 +-
3 files changed, 20 insertions(+), 17 deletions(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index 5a436ab..2ac0fd3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -1546,20 +1546,22 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
if (receivedTopology) {
return;
}
- TransportConfiguration[] newInitialconnectors = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, newConnectors.size());
- int count = 0;
- for (DiscoveryEntry entry : newConnectors) {
- newInitialconnectors[count++] = entry.getConnector();
+ final List<TransportConfiguration> newInitialconnectors = new ArrayList<>(newConnectors.size());
+ for (DiscoveryEntry entry : newConnectors) {
if (ha && topology.getMember(entry.getNodeID()) == null) {
TopologyMemberImpl member = new TopologyMemberImpl(entry.getNodeID(), null, null, entry.getConnector(), null);
// on this case we set it as zero as any update coming from server should be accepted
topology.updateMember(0, entry.getNodeID(), member);
}
+ // ignore its own transport connector
+ if (!entry.getConnector().equals(clusterTransportConfiguration)) {
+ newInitialconnectors.add(entry.getConnector());
+ }
}
- this.initialConnectors = newInitialconnectors.length == 0 ? null : newInitialconnectors;
+ this.initialConnectors = newInitialconnectors.toArray(new TransportConfiguration[newInitialconnectors.size()]);
if (clusterConnection && !receivedTopology && this.getNumInitialConnectors() > 0) {
// The node is alone in the cluster. We create a connection to the new node
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
index 6322db7..6ef9f26 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
@@ -158,15 +158,17 @@ public class ClusterController implements ActiveMQComponent {
/**
* add a locator for a cluster connection.
*
- * @param name the cluster connection name
- * @param dg the discovery group to use
- * @param config the cluster connection config
+ * @param name the cluster connection name
+ * @param dg the discovery group to use
+ * @param config the cluster connection config
+ * @param connector the cluster connector configuration
*/
public void addClusterConnection(SimpleString name,
DiscoveryGroupConfiguration dg,
- ClusterConnectionConfiguration config) {
+ ClusterConnectionConfiguration config,
+ TransportConfiguration connector) {
ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(dg);
- configAndAdd(name, serverLocator, config);
+ configAndAdd(name, serverLocator, config, connector);
}
/**
@@ -179,12 +181,13 @@ public class ClusterController implements ActiveMQComponent {
TransportConfiguration[] tcConfigs,
ClusterConnectionConfiguration config) {
ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(tcConfigs);
- configAndAdd(name, serverLocator, config);
+ configAndAdd(name, serverLocator, config, null);
}
private void configAndAdd(SimpleString name,
ServerLocatorInternal serverLocator,
- ClusterConnectionConfiguration config) {
+ ClusterConnectionConfiguration config,
+ TransportConfiguration connector) {
serverLocator.setConnectionTTL(config.getConnectionTTL());
serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
//if the cluster isn't available we want to hang around until it is
@@ -198,10 +201,8 @@ public class ClusterController implements ActiveMQComponent {
//this is used for replication so need to use the server packet decoder
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator, server.getStorageManager()));
serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool());
- SimpleString nodeID = server.getNodeID();
- if (nodeID != null) {
- // this is used to allow a live server to ignore it's same connector ref
- serverLocator.setNodeID(nodeID.toString());
+ if (connector != null) {
+ serverLocator.setClusterTransportConfiguration(connector);
}
try {
serverLocator.initialize();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index 522c2d2..ac48302 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -609,7 +609,7 @@ public final class ClusterManager implements ActiveMQComponent {
clusterConnection = new ClusterConnectionImpl(this, dg, connector, new SimpleString(config.getName()), new SimpleString(config.getAddress() != null ? config.getAddress() : ""), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailover [...]
- clusterController.addClusterConnection(clusterConnection.getName(), dg, config);
+ clusterController.addClusterConnection(clusterConnection.getName(), dg, config, connector);
} else {
TransportConfiguration[] tcConfigs = config.getTransportConfigurations(configuration);