You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/02/19 21:10:47 UTC

[activemq-artemis] branch master updated: ARTEMIS-2256 Update Topology back after a bridge reconnected

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new d59bdca  ARTEMIS-2256 Update Topology back after a bridge reconnected
     new 8ff0cba  This closes #2555
d59bdca is described below

commit d59bdca73f85213a74a764e9baebc81bb9e37847
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Feb 19 10:28:55 2019 -0500

    ARTEMIS-2256 Update Topology back after a bridge reconnected
    
    https://issues.apache.org/jira/browse/ARTEMIS-2256
---
 .../core/client/impl/TopologyMemberImpl.java       | 32 ++++++++++++++++++++++
 .../core/server/cluster/impl/BridgeImpl.java       | 12 ++++++++
 .../integration/cluster/bridge/BridgeTest.java     |  6 +++-
 .../bridge/ClusteredBridgeReconnectTest.java       | 30 ++++++++++++++++++++
 4 files changed, 79 insertions(+), 1 deletion(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
index 3cbabac..df3b09c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
@@ -37,6 +37,7 @@ public final class TopologyMemberImpl implements TopologyMember {
 
    private final String scaleDownGroupName;
 
+
    /**
     * transient to avoid serialization changes
     */
@@ -155,4 +156,35 @@ public final class TopologyMemberImpl implements TopologyMember {
    public String toString() {
       return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]";
    }
+
+
+   @Override
+   public boolean equals(Object o) {
+      if (this == o)
+         return true;
+      if (o == null || getClass() != o.getClass())
+         return false;
+
+      TopologyMemberImpl that = (TopologyMemberImpl) o;
+
+      // note the uniqueEventId is not park of the equals and hashmap key
+
+      if (connector != null ? !connector.equals(that.connector) : that.connector != null)
+         return false;
+      if (backupGroupName != null ? !backupGroupName.equals(that.backupGroupName) : that.backupGroupName != null)
+         return false;
+      if (scaleDownGroupName != null ? !scaleDownGroupName.equals(that.scaleDownGroupName) : that.scaleDownGroupName != null)
+         return false;
+      return nodeId != null ? nodeId.equals(that.nodeId) : that.nodeId == null;
+   }
+
+   @Override
+   public int hashCode() {
+      // note the uniqueEventId is not park of the equals and hashmap key
+      int result = connector != null ? connector.hashCode() : 0;
+      result = 31 * result + (backupGroupName != null ? backupGroupName.hashCode() : 0);
+      result = 31 * result + (scaleDownGroupName != null ? scaleDownGroupName.hashCode() : 0);
+      result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
+      return result;
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 9f28452..ac08185 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -135,6 +136,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
    // on cases where sub-classes need a consumer
    protected volatile ClientSessionInternal sessionConsumer;
 
+
+   // this will happen if a disconnect happened
+   // upon reconnection we need to send the nodeUP back into the topology
+   protected volatile boolean disconnectedAndDown = false;
+
    protected String targetNodeID;
 
    protected TopologyMember targetNode;
@@ -853,6 +859,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       logger.debug(this + "\n\t::fail being called, permanently=" + permanently);
       //we need to make sure we remove the node from the topology so any incoming quorum requests are voted correctly
       if (targetNodeID != null) {
+         this.disconnectedAndDown = true;
          serverLocator.notifyNodeDown(System.currentTimeMillis(), targetNodeID);
       }
       if (queue != null) {
@@ -874,6 +881,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
    /* Hook for doing extra stuff after connection */
    protected void afterConnect() throws Exception {
+      if (disconnectedAndDown && targetNodeID != null && targetNode != null) {
+         serverLocator.notifyNodeUp(System.currentTimeMillis(), targetNodeID, targetNode.getBackupGroupName(), targetNode.getScaleDownGroupName(),
+                                    new Pair<>(targetNode.getLive(), targetNode.getBackup()), false);
+         disconnectedAndDown = false;
+      }
       retryCount = 0;
       reconnectAttemptsInUse = reconnectAttempts;
       if (futureScheduledReconnection != null) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index 73116f8..c46fd01 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -1077,6 +1077,9 @@ public class BridgeTest extends ActiveMQTestBase {
 
       sf1.close();
 
+      SimpleString queueName1Str = new SimpleString(queueName1);
+      Wait.assertTrue(() -> server1.locateQueue(queueName1Str) == null);
+
       server1.stop();
 
       session0.close();
@@ -1084,7 +1087,8 @@ public class BridgeTest extends ActiveMQTestBase {
       sf0.close();
 
       closeFields();
-      assertEquals(0, loadQueues(server0).size());
+
+      Wait.assertEquals(0, () -> loadQueues(server0).size());
 
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
index a280b85..beab047 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java
@@ -16,15 +16,20 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.bridge;
 
+import java.util.ArrayList;
+
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.TopologyMember;
+import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
 import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
 import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
 import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -81,6 +86,9 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase {
       MessageFlowRecord record = connection.getRecords().values().toArray(new MessageFlowRecord[1])[0];
       ClusterConnectionBridge bridge = (ClusterConnectionBridge) record.getBridge();
 
+      Wait.assertEquals(2, bridge.getSessionFactory().getServerLocator().getTopology().getMembers()::size);
+      ArrayList<TopologyMemberImpl> originalmembers = new ArrayList<>(bridge.getSessionFactory().getServerLocator().getTopology().getMembers());
+
       for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
          ClientMessage msg = session0.createMessage(true);
          producer.send(msg);
@@ -91,6 +99,28 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase {
          }
       }
 
+      Wait.assertEquals(2, bridge.getSessionFactory().getServerLocator().getTopology().getMembers()::size);
+
+      ArrayList<TopologyMemberImpl> afterReconnectedMembers = new ArrayList<>(bridge.getSessionFactory().getServerLocator().getTopology().getMembers());
+
+      boolean allFound = true;
+
+      for (TopologyMemberImpl originalMember : originalmembers) {
+         boolean found = false;
+         for (TopologyMember reconnectedMember : afterReconnectedMembers) {
+            if (originalMember.equals(reconnectedMember)) {
+               found = true;
+               break;
+            }
+         }
+
+         if (!found) {
+            allFound = false;
+         }
+      }
+
+      Assert.assertTrue("The topology is slightly different after a reconnect", allFound);
+
       int cons0Count = 0, cons1Count = 0;
 
       while (true) {