You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/04/03 20:22:57 UTC

[1/3] activemq-artemis git commit: This closes #1984

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 650c79ee0 -> a8e81f226


This closes #1984


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a8e81f22
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a8e81f22
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a8e81f22

Branch: refs/heads/master
Commit: a8e81f2267d3428ba101e9c3d899838c75d40ce1
Parents: 650c79e f3e1ab3
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Apr 3 16:22:13 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Apr 3 16:22:13 2018 -0400

----------------------------------------------------------------------
 .../core/server/cluster/impl/BridgeImpl.java    | 38 ++++++++++++--------
 .../cluster/impl/ClusterConnectionBridge.java   | 19 +++++++---
 .../cluster/failover/BackupSyncJournalTest.java | 35 ++++++++++++++++--
 3 files changed, 71 insertions(+), 21 deletions(-)
----------------------------------------------------------------------



[2/3] activemq-artemis git commit: ARTEMIS-1779 Small refactoring to logic on BridgeImpl::nodeUp logic

Posted by cl...@apache.org.
ARTEMIS-1779 Small refactoring to logic on BridgeImpl::nodeUp logic


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f3e1ab33
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f3e1ab33
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f3e1ab33

Branch: refs/heads/master
Commit: f3e1ab337c954c1b73d94f90baad45416f5d9d42
Parents: 262990f
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Apr 3 08:58:39 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Apr 3 16:22:13 2018 -0400

----------------------------------------------------------------------
 .../core/server/cluster/impl/BridgeImpl.java    | 45 ++++++++++----------
 .../cluster/impl/ClusterConnectionBridge.java   | 15 +++++++
 2 files changed, 38 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f3e1ab33/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 16724bf..5e0eb17 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -93,7 +93,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
    private final SimpleString name;
 
-   private final Queue queue;
+   protected final Queue queue;
 
    private final Filter filter;
 
@@ -1010,6 +1010,27 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       scheduleRetryConnectFixedTimeout(timeout);
    }
 
+
+   // To be called by the topology update
+   // This logic will be updated on the cluster connection
+   protected void nodeUP(TopologyMember member, boolean last) {
+      ClientSessionInternal sessionToUse = session;
+      RemotingConnection connectionToUse = sessionToUse != null ? sessionToUse.getConnection() : null;
+
+      if (member != null && this.targetNodeID != null && this.targetNodeID.equals(member.getNodeId())) {
+         // this could be an update of the topology say after a backup started
+         BridgeImpl.this.targetNode = member;
+      } else {
+         // we don't need synchronization here, but we need to make sure we won't get a NPE on races
+         if (connectionToUse != null && member.isMember(connectionToUse)) {
+            this.targetNode = member;
+            this.targetNodeID = member.getNodeId();
+         }
+      }
+
+   }
+
+
    // Inner classes -------------------------------------------------
 
    protected void scheduleRetryConnectFixedTimeout(final long milliseconds) {
@@ -1159,27 +1180,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       // ClusterListener
       @Override
       public void nodeUP(TopologyMember member, boolean last) {
-         if (BridgeImpl.this.queue.isInternalQueue() && member != null && BridgeImpl.this.targetNodeID != null && !BridgeImpl.this.targetNodeID.equals(member.getNodeId())) {
-            //A ClusterConnectionBridge (identified by holding an internal queue)
-            //never re-connects to another node here. It only connects to its original
-            //target node (from the ClusterConnection) or its backups. That's why
-            //we put a return here.
-            return;
-         }
-         ClientSessionInternal sessionToUse = session;
-         RemotingConnection connectionToUse = sessionToUse != null ? sessionToUse.getConnection() : null;
-
-         if (member != null && BridgeImpl.this.targetNodeID != null && BridgeImpl.this.targetNodeID.equals(member.getNodeId())) {
-            // this could be an update of the topology say after a backup started
-            BridgeImpl.this.targetNode = member;
-         } else {
-            // we don't need synchronization here, but we need to make sure we won't get a NPE on races
-            if (connectionToUse != null && member.isMember(connectionToUse)) {
-               BridgeImpl.this.targetNode = member;
-               BridgeImpl.this.targetNodeID = member.getNodeId();
-            }
-         }
-
+         BridgeImpl.this.nodeUP(member, last);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f3e1ab33/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 88005aa..1a642fe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
@@ -352,6 +353,20 @@ public class ClusterConnectionBridge extends BridgeImpl {
       return filterString;
    }
 
+
+   @Override
+   protected void nodeUP(TopologyMember member, boolean last) {
+      if (member != null && targetNodeID != null && !this.targetNodeID.equals(member.getNodeId())) {
+         //A ClusterConnectionBridge (identified by holding an internal queue)
+         //never re-connects to another node here. It only connects to its original
+         //target node (from the ClusterConnection) or its backups. That's why
+         //we put a return here.
+         return;
+      }
+      super.nodeUP(member, last);
+   }
+
+
    @Override
    protected void afterConnect() throws Exception {
       super.afterConnect();


[3/3] activemq-artemis git commit: ARTEMIS-1779 ClusterConnectionBridge may connect to other nodes than its target

Posted by cl...@apache.org.
ARTEMIS-1779 ClusterConnectionBridge may connect to other nodes than its target

The cluster connection bridge has a TopologyListener and connects to a new node
each time it receives a nodeUp() event. It needs to put a check here to make
sure that the cluster bridge only connects to its target node and it's backups.

This issue shows up when you run LiveToLiveFailoverTest.testConsumerTransacted
test.

Also in this commit improvement of BackupSyncJournalTest so that it runs more
stable.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/262990fa
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/262990fa
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/262990fa

Branch: refs/heads/master
Commit: 262990fa6716f5bf3c72bc8a44616cef4df17c11
Parents: 650c79e
Author: Howard Gao <ho...@gmail.com>
Authored: Mon Apr 2 19:16:30 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Apr 3 16:22:13 2018 -0400

----------------------------------------------------------------------
 .../core/server/cluster/impl/BridgeImpl.java    |  7 ++++
 .../cluster/impl/ClusterConnectionBridge.java   |  4 ---
 .../cluster/failover/BackupSyncJournalTest.java | 35 ++++++++++++++++++--
 3 files changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/262990fa/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 01596fd..16724bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -1159,6 +1159,13 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       // ClusterListener
       @Override
       public void nodeUP(TopologyMember member, boolean last) {
+         if (BridgeImpl.this.queue.isInternalQueue() && member != null && BridgeImpl.this.targetNodeID != null && !BridgeImpl.this.targetNodeID.equals(member.getNodeId())) {
+            //A ClusterConnectionBridge (identified by holding an internal queue)
+            //never re-connects to another node here. It only connects to its original
+            //target node (from the ClusterConnection) or its backups. That's why
+            //we put a return here.
+            return;
+         }
          ClientSessionInternal sessionToUse = session;
          RemotingConnection connectionToUse = sessionToUse != null ? sessionToUse.getConnection() : null;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/262990fa/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index cf17bbe..88005aa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -58,7 +58,6 @@ import org.jboss.logging.Logger;
  * Such as such adding extra properties and setting up notifications between the nodes.
  */
 public class ClusterConnectionBridge extends BridgeImpl {
-
    private static final Logger logger = Logger.getLogger(ClusterConnectionBridge.class);
 
    private final ClusterConnection clusterConnection;
@@ -127,9 +126,6 @@ public class ClusterConnectionBridge extends BridgeImpl {
       this.managementNotificationAddress = managementNotificationAddress;
       this.flowRecord = flowRecord;
 
-      // we need to disable DLQ check on the clustered bridges
-      queue.setInternalQueue(true);
-
       if (logger.isTraceEnabled()) {
          logger.trace("Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator, new Exception("trace"));
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/262990fa/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
index c654472..8342c62 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -32,6 +33,8 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
+import org.apache.activemq.artemis.api.core.client.FailoverEventType;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -73,16 +76,18 @@ public class BackupSyncJournalTest extends FailoverTestBase {
       return n_msgs;
    }
 
+   protected final FailoverWaiter failoverWaiter = new FailoverWaiter();
+
    @Override
    @Before
    public void setUp() throws Exception {
       startBackupServer = false;
       super.setUp();
       setNumberOfMessages(defaultNMsgs);
-      locator = (ServerLocatorInternal) getServerLocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(15);
+      locator = (ServerLocatorInternal) getServerLocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(15).setRetryInterval(200);
       sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
+      sessionFactory.addFailoverListener(failoverWaiter);
       syncDelay = new BackupSyncDelay(backupServer, liveServer);
-
    }
 
    @Test
@@ -326,8 +331,13 @@ public class BackupSyncJournalTest extends FailoverTestBase {
       liveServer.removeInterceptor(syncDelay);
       backupServer.start();
       waitForBackup(sessionFactory, BACKUP_WAIT_TIME);
+      failoverWaiter.reset();
       crash(session);
       backupServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
+      //for some system the retryAttempts and retryInterval may be too small
+      //so that during failover all attempts have failed before the backup
+      //server is fully activated.
+      assertTrue("Session didn't failover, the maxRetryAttempts and retryInterval may be too small", failoverWaiter.waitFailoverComplete());
    }
 
    protected void createProducerSendSomeMessages() throws ActiveMQException {
@@ -384,4 +394,25 @@ public class BackupSyncJournalTest extends FailoverTestBase {
    protected TransportConfiguration getConnectorTransportConfiguration(boolean live) {
       return TransportConfigurationUtils.getInVMConnector(live);
    }
+
+   private class FailoverWaiter implements FailoverEventListener {
+
+      private CountDownLatch latch;
+
+      public void reset() {
+         latch = new CountDownLatch(1);
+      }
+
+      @Override
+      public void failoverEvent(FailoverEventType eventType) {
+         if (eventType == FailoverEventType.FAILOVER_COMPLETED) {
+            latch.countDown();
+         }
+      }
+
+      public boolean waitFailoverComplete() throws InterruptedException {
+         return latch.await(10, TimeUnit.SECONDS);
+      }
+   }
+
 }