You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/08/01 01:42:13 UTC

activemq-artemis git commit: ARTEMIS-1995 Client fail over fails when live shut down too soon

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 73b3ebff1 -> 1282bebf3


ARTEMIS-1995 Client fail over fails when live shut down too soon

In a live-backup scenario, if the live is restarted and shutdown too soon,
the client have a chance to fail on failover because it's internal topology
is inconsistent with the final status. The client keeps connecting to live
already shut down, never trying to connect to the backup.

It's a porting from HORNETQ-1572.

(cherry picked from commit 983232d27330515a50a3cdb9d7cd664ebf3ca6f2)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1282bebf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1282bebf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1282bebf

Branch: refs/heads/2.6.x
Commit: 1282bebf3fca23faffa7379177540a8ae78d2830
Parents: 73b3ebf
Author: Howard Gao <ho...@gmail.com>
Authored: Mon Jul 30 14:48:09 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jul 31 21:42:06 2018 -0400

----------------------------------------------------------------------
 .../client/impl/ClientSessionFactoryImpl.java   |  25 +++--
 .../cluster/failover/FailoverTest.java          | 106 +++++++++++++++++++
 .../failover/LiveToLiveFailoverTest.java        |   5 +
 3 files changed, 125 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1282bebf/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 5c972e3..81b6c44 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -77,11 +77,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
    private final ClientProtocolManager clientProtocolManager;
 
-   private final TransportConfiguration connectorConfig;
+   private TransportConfiguration connectorConfig;
 
    private TransportConfiguration currentConnectorConfig;
 
-   private TransportConfiguration backupConfig;
+   private volatile TransportConfiguration backupConfig;
 
    private ConnectorFactory connectorFactory;
 
@@ -175,8 +175,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
       this.clientProtocolManager.setSessionFactory(this);
 
-      this.connectorConfig = connectorConfig;
-
       this.currentConnectorConfig = connectorConfig;
 
       connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
@@ -881,6 +879,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
       }
    }
 
+   //The order of connector configs to try to get a connection:
+   //currentConnectorConfig, backupConfig and then lastConnectorConfig.
+   //On each successful connect, the current and last will be
+   //updated properly.
    @Override
    public RemotingConnection getConnection() {
       if (closed)
@@ -1101,8 +1103,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
                // Switching backup as live
                connector = backupConnector;
+               connectorConfig = currentConnectorConfig;
                currentConnectorConfig = backupConfig;
-               backupConfig = null;
                connectorFactory = backupConnectorFactory;
                return transportConnection;
             }
@@ -1113,23 +1115,24 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
          }
 
 
-         if (currentConnectorConfig.equals(connectorConfig)) {
+         if (currentConnectorConfig.equals(connectorConfig) || connectorConfig == null) {
 
             // There was no changes on current and original connectors, just return null here and let the retry happen at the first portion of this method on the next retry
             return null;
          }
 
-         ConnectorFactory originalConnectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
+         ConnectorFactory lastConnectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
 
-         Connector originalConnector = createConnector(originalConnectorFactory, connectorConfig);
+         Connector lastConnector = createConnector(lastConnectorFactory, connectorConfig);
 
-         transportConnection = openTransportConnection(originalConnector);
+         transportConnection = openTransportConnection(lastConnector);
 
          if (transportConnection != null) {
             logger.debug("Returning into original connector");
-            connector = originalConnector;
-            backupConfig = null;
+            connector = lastConnector;
+            TransportConfiguration temp = currentConnectorConfig;
             currentConnectorConfig = connectorConfig;
+            connectorConfig = temp;
             return transportConnection;
          } else {
             logger.debug("no connection been made, returning null");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1282bebf/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index 20f5fda..0a37abf 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover;
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -45,6 +46,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy;
@@ -651,6 +653,110 @@ public class FailoverTest extends FailoverTestBase {
       Assert.assertEquals(0, sf.numConnections());
    }
 
+   @Test(timeout = 10000)
+   public void testFailLiveTooSoon() throws Exception {
+      ServerLocator locator = getServerLocator();
+
+      locator.setReconnectAttempts(-1);
+      locator.setRetryInterval(10);
+
+      sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+
+      waitForBackupConfig(sf);
+
+      TransportConfiguration initialLive = getFieldFromSF(sf, "currentConnectorConfig");
+      TransportConfiguration initialBackup = getFieldFromSF(sf, "backupConfig");
+
+      System.out.println("initlive: " + initialLive);
+      System.out.println("initback: " + initialBackup);
+
+      TransportConfiguration last = getFieldFromSF(sf, "connectorConfig");
+      TransportConfiguration current = getFieldFromSF(sf, "currentConnectorConfig");
+
+      System.out.println("now last: " + last);
+      System.out.println("now current: " + current);
+      assertTrue(current.equals(initialLive));
+
+      ClientSession session = createSession(sf, true, true);
+
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, true);
+
+      //crash 1
+      crash();
+
+      //make sure failover is ok
+      createSession(sf, true, true).close();
+
+      last = getFieldFromSF(sf, "connectorConfig");
+      current = getFieldFromSF(sf, "currentConnectorConfig");
+
+      System.out.println("now after live crashed last: " + last);
+      System.out.println("now current: " + current);
+
+      assertTrue(current.equals(initialBackup));
+
+      //fail back
+      beforeRestart(liveServer);
+      adaptLiveConfigForReplicatedFailBack(liveServer);
+      liveServer.getServer().start();
+
+      Assert.assertTrue("live initialized...", liveServer.getServer().waitForActivation(40, TimeUnit.SECONDS));
+      int i = 0;
+      while (!backupServer.isStarted() && i++ < 100) {
+         Thread.sleep(100);
+      }
+      liveServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
+      Assert.assertTrue(backupServer.isStarted());
+
+      //make sure failover is ok
+      createSession(sf, true, true).close();
+
+      last = getFieldFromSF(sf, "connectorConfig");
+      current = getFieldFromSF(sf, "currentConnectorConfig");
+
+      System.out.println("now after live back again last: " + last);
+      System.out.println("now current: " + current);
+
+      //cannot use equals here because the config's name (uuid) changes
+      //after failover
+      assertTrue(current.isSameParams(initialLive));
+
+      //now manually corrupt the backup in sf
+      setSFFieldValue(sf, "backupConfig", null);
+
+      //crash 2
+      crash();
+
+      beforeRestart(backupServer);
+      createSession(sf, true, true).close();
+
+      sf.close();
+      Assert.assertEquals(0, sf.numSessions());
+      Assert.assertEquals(0, sf.numConnections());
+   }
+
+   protected void waitForBackupConfig(ClientSessionFactoryInternal sf) throws NoSuchFieldException, IllegalAccessException, InterruptedException {
+      TransportConfiguration initialBackup = getFieldFromSF(sf, "backupConfig");
+      int cnt = 50;
+      while (initialBackup == null && cnt > 0) {
+         cnt--;
+         Thread.sleep(200);
+         initialBackup = getFieldFromSF(sf, "backupConfig");
+      }
+   }
+
+   protected void setSFFieldValue(ClientSessionFactoryInternal sf, String tcName, Object value) throws NoSuchFieldException, IllegalAccessException {
+      Field tcField = ClientSessionFactoryImpl.class.getDeclaredField(tcName);
+      tcField.setAccessible(true);
+      tcField.set(sf, value);
+   }
+
+   protected TransportConfiguration getFieldFromSF(ClientSessionFactoryInternal sf, String tcName) throws NoSuchFieldException, IllegalAccessException {
+      Field tcField = ClientSessionFactoryImpl.class.getDeclaredField(tcName);
+      tcField.setAccessible(true);
+      return (TransportConfiguration) tcField.get(sf);
+   }
+
    /**
     * Basic fail-back test.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1282bebf/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java
index e65602f..101a85e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java
@@ -362,6 +362,11 @@ public class LiveToLiveFailoverTest extends FailoverTest {
    @Ignore
    public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception {
    }
+
+   @Override
+   @Ignore
+   public void testFailLiveTooSoon() throws Exception {
+   }
 }