You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/02/19 21:10:47 UTC
[activemq-artemis] branch master updated: ARTEMIS-2256 Update
Topology back after a bridge reconnected
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new d59bdca ARTEMIS-2256 Update Topology back after a bridge reconnected
new 8ff0cba This closes #2555
d59bdca is described below
commit d59bdca73f85213a74a764e9baebc81bb9e37847
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Feb 19 10:28:55 2019 -0500
ARTEMIS-2256 Update Topology back after a bridge reconnected
https://issues.apache.org/jira/browse/ARTEMIS-2256
---
.../core/client/impl/TopologyMemberImpl.java | 32 ++++++++++++++++++++++
.../core/server/cluster/impl/BridgeImpl.java | 12 ++++++++
.../integration/cluster/bridge/BridgeTest.java | 6 +++-
.../bridge/ClusteredBridgeReconnectTest.java | 30 ++++++++++++++++++++
4 files changed, 79 insertions(+), 1 deletion(-)
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
index 3cbabac..df3b09c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
@@ -37,6 +37,7 @@ public final class TopologyMemberImpl implements TopologyMember {
private final String scaleDownGroupName;
+
/**
* transient to avoid serialization changes
*/
@@ -155,4 +156,35 @@ public final class TopologyMemberImpl implements TopologyMember {
public String toString() {
return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]";
}
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TopologyMemberImpl that = (TopologyMemberImpl) o;
+
+ // note the uniqueEventId is not park of the equals and hashmap key
+
+ if (connector != null ? !connector.equals(that.connector) : that.connector != null)
+ return false;
+ if (backupGroupName != null ? !backupGroupName.equals(that.backupGroupName) : that.backupGroupName != null)
+ return false;
+ if (scaleDownGroupName != null ? !scaleDownGroupName.equals(that.scaleDownGroupName) : that.scaleDownGroupName != null)
+ return false;
+ return nodeId != null ? nodeId.equals(that.nodeId) : that.nodeId == null;
+ }
+
+ @Override
+ public int hashCode() {
+ // note the uniqueEventId is not park of the equals and hashmap key
+ int result = connector != null ? connector.hashCode() : 0;
+ result = 31 * result + (backupGroupName != null ? backupGroupName.hashCode() : 0);
+ result = 31 * result + (scaleDownGroupName != null ? scaleDownGroupName.hashCode() : 0);
+ result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
+ return result;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 9f28452..ac08185 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -135,6 +136,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
// on cases where sub-classes need a consumer
protected volatile ClientSessionInternal sessionConsumer;
+
+ // this will happen if a disconnect happened
+ // upon reconnection we need to send the nodeUP back into the topology
+ protected volatile boolean disconnectedAndDown = false;
+
protected String targetNodeID;
protected TopologyMember targetNode;
@@ -853,6 +859,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
logger.debug(this + "\n\t::fail being called, permanently=" + permanently);
//we need to make sure we remove the node from the topology so any incoming quorum requests are voted correctly
if (targetNodeID != null) {
+ this.disconnectedAndDown = true;
serverLocator.notifyNodeDown(System.currentTimeMillis(), targetNodeID);
}
if (queue != null) {
@@ -874,6 +881,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
/* Hook for doing extra stuff after connection */
protected void afterConnect() throws Exception {
+ if (disconnectedAndDown && targetNodeID != null && targetNode != null) {
+ serverLocator.notifyNodeUp(System.currentTimeMillis(), targetNodeID, targetNode.getBackupGroupName(), targetNode.getScaleDownGroupName(),
+ new Pair<>(targetNode.getLive(), targetNode.getBackup()), false);
+ disconnectedAndDown = false;
+ }
retryCount = 0;
reconnectAttemptsInUse = reconnectAttempts;
if (futureScheduledReconnection != null) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index 73116f8..c46fd01 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -1077,6 +1077,9 @@ public class BridgeTest extends ActiveMQTestBase {
sf1.close();
+ SimpleString queueName1Str = new SimpleString(queueName1);
+ Wait.assertTrue(() -> server1.locateQueue(queueName1Str) == null);
+
server1.stop();
session0.close();
@@ -1084,7 +1087,8 @@ public class BridgeTest extends ActiveMQTestBase {
sf0.close();
closeFields();
- assertEquals(0, loadQueues(server0).size());
+
+ Wait.assertEquals(0, () -> loadQueues(server0).size());
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
index a280b85..beab047 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
@@ -16,15 +16,20 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.bridge;
+import java.util.ArrayList;
+
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.TopologyMember;
+import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -81,6 +86,9 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase {
MessageFlowRecord record = connection.getRecords().values().toArray(new MessageFlowRecord[1])[0];
ClusterConnectionBridge bridge = (ClusterConnectionBridge) record.getBridge();
+ Wait.assertEquals(2, bridge.getSessionFactory().getServerLocator().getTopology().getMembers()::size);
+ ArrayList<TopologyMemberImpl> originalmembers = new ArrayList<>(bridge.getSessionFactory().getServerLocator().getTopology().getMembers());
+
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage msg = session0.createMessage(true);
producer.send(msg);
@@ -91,6 +99,28 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase {
}
}
+ Wait.assertEquals(2, bridge.getSessionFactory().getServerLocator().getTopology().getMembers()::size);
+
+ ArrayList<TopologyMemberImpl> afterReconnectedMembers = new ArrayList<>(bridge.getSessionFactory().getServerLocator().getTopology().getMembers());
+
+ boolean allFound = true;
+
+ for (TopologyMemberImpl originalMember : originalmembers) {
+ boolean found = false;
+ for (TopologyMember reconnectedMember : afterReconnectedMembers) {
+ if (originalMember.equals(reconnectedMember)) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ allFound = false;
+ }
+ }
+
+ Assert.assertTrue("The topology is slightly different after a reconnect", allFound);
+
int cons0Count = 0, cons1Count = 0;
while (true) {