You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/13 00:52:25 UTC

[2/2] activemq-artemis git commit: ARTEMIS-2084: Failover will not work with network cable disconnect on core protocol

ARTEMIS-2084: Failover will not work with network cable disconnect on core protocol

(cherry picked from commit 6361079aa0533f31ad8b1b8e390fc70fda5ad217)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/124fd28c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/124fd28c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/124fd28c

Branch: refs/heads/2.6.x
Commit: 124fd28cf90a26a1823b56eb5341790242542f3a
Parents: 9d530fc
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Sep 6 18:05:06 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Sep 12 20:52:15 2018 -0400

----------------------------------------------------------------------
 .../client/impl/ClientSessionFactoryImpl.java   | 12 +++++++-
 .../remoting/impl/netty/NettyConnector.java     |  8 ++++++
 .../artemis/tests/util/ActiveMQTestBase.java    |  8 ++++++
 .../cluster/failover/FailoverTestBase.java      |  4 +--
 .../failover/NetworkFailureFailoverTest.java    | 30 ++++++++++++++------
 .../ra/ActiveMQRAClusteredTestBase.java         |  3 +-
 6 files changed, 52 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/124fd28c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 4723c88..daac8f3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.remoting.impl.TransportConfigurationUtil;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
@@ -1060,7 +1061,16 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
    }
 
    protected Connector createConnector(ConnectorFactory connectorFactory, TransportConfiguration configuration) {
-      return connectorFactory.createConnector(configuration.getParams(), new DelegatingBufferHandler(), this, closeExecutor, threadPool, scheduledThreadPool, clientProtocolManager);
+      Connector connector = connectorFactory.createConnector(configuration.getParams(), new DelegatingBufferHandler(), this, closeExecutor, threadPool, scheduledThreadPool, clientProtocolManager);
+      if (connector instanceof NettyConnector) {
+         NettyConnector nettyConnector = (NettyConnector) connector;
+         if (nettyConnector.getConnectTimeoutMillis() < 0) {
+            nettyConnector.setConnectTimeoutMillis((int)serverLocator.getConnectionTTL());
+         }
+
+      }
+
+      return connector;
    }
 
    private void checkTransportKeys(final ConnectorFactory factory, final TransportConfiguration tc) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/124fd28c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index ebb274a..6400b44 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -818,6 +818,14 @@ public class NettyConnector extends AbstractConnector {
 
    // Public --------------------------------------------------------
 
+   public int getConnectTimeoutMillis() {
+      return connectTimeoutMillis;
+   }
+
+   public void setConnectTimeoutMillis(int connectTimeoutMillis) {
+      this.connectTimeoutMillis = connectTimeoutMillis;
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/124fd28c/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 2cd5d56..9075ac6 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -552,6 +552,14 @@ public abstract class ActiveMQTestBase extends Assert {
       return params;
    }
 
+
+   /** This exists as an extension point for tests, so tests can replace it */
+   protected ClusterConnectionConfiguration createBasicClusterConfig(String connectorName,
+                                                                                      String... connectors) {
+      return basicClusterConnectionConfig(connectorName, connectors);
+   }
+
+
    protected static final ClusterConnectionConfiguration basicClusterConnectionConfig(String connectorName,
                                                                                       String... connectors) {
       ArrayList<String> connectors0 = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/124fd28c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
index 70e625a..2a75f94 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
@@ -165,11 +165,11 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
       TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
       TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
 
-      backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
+      backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(createBasicClusterConfig(backupConnector.getName(), liveConnector.getName()));
 
       backupServer = createTestableServer(backupConfig);
 
-      liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector);
+      liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(createBasicClusterConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector);
 
       liveServer = createTestableServer(liveConfig);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/124fd28c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java
index 1011502..c2e539f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java
@@ -13,6 +13,7 @@
 
 package org.apache.activemq.artemis.tests.integration.cluster.failover;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -35,10 +36,12 @@ import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.Topology;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.junit.Wait;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.util.network.NetUtil;
@@ -138,8 +141,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
          server1Params.put(TransportConstants.HOST_PROP_NAME, "localhost");
       }
 
-      server1Params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
-
       return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
    }
 
@@ -151,7 +152,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
       Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
       Map<String, Object> params = new HashMap<>();
       params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
-      params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
       TransportConfiguration tc = createTransportConfiguration(true, false, params);
 
       final AtomicInteger countSent = new AtomicInteger(0);
@@ -195,8 +195,8 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
       locator.setReconnectAttempts(-1);
       locator.setConfirmationWindowSize(-1);
       locator.setProducerWindowSize(-1);
-      locator.setClientFailureCheckPeriod(100);
       locator.setConnectionTTL(1000);
+      locator.setClientFailureCheckPeriod(100);
       ClientSessionFactoryInternal sfProducer = createSessionFactoryAndWaitForTopology(locator, 2);
       sfProducer.addFailureListener(new SessionFailureListener() {
          @Override
@@ -312,7 +312,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
       Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
       Map<String, Object> params = new HashMap<>();
       params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
-      params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
       TransportConfiguration tc = createTransportConfiguration(true, false, params);
 
       final AtomicInteger countSent = new AtomicInteger(0);
@@ -442,7 +441,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
       Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
       Map<String, Object> params = new HashMap<>();
       params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
-      params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
       TransportConfiguration tc = createTransportConfiguration(true, false, params);
 
       final AtomicInteger countSent = new AtomicInteger(0);
@@ -459,7 +457,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
                   try {
                      NetUtil.netDown(LIVE_IP);
                      System.out.println("Blocking traffic");
-                     Thread.sleep(3000); // this is important to let stuff to block
                      blockedAt.set(sentMessages.get());
                      latchDown.countDown();
                   } catch (Exception e) {
@@ -555,7 +552,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
       Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
       Map<String, Object> params = new HashMap<>();
       params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
-      params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
       TransportConfiguration tc = createTransportConfiguration(true, false, params);
 
       final AtomicInteger countSent = new AtomicInteger(0);
@@ -685,4 +681,22 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
       t.join();
    }
 
+
+
+   @Override
+   protected ClusterConnectionConfiguration createBasicClusterConfig(String connectorName,
+                                                                         String... connectors) {
+      ArrayList<String> connectors0 = new ArrayList<>();
+      for (String c : connectors) {
+         connectors0.add(c);
+      }
+      ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().
+         setName("cluster1").setAddress("jms").setConnectorName(connectorName).
+         setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).setClientFailureCheckPeriod(100).setConnectionTTL(1000).
+         setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).
+         setStaticConnectors(connectors0);
+
+      return clusterConnectionConfiguration;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/124fd28c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java
index 9e3abf2..cd14afe 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java
@@ -25,7 +25,6 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.Before;
 
 public class ActiveMQRAClusteredTestBase extends ActiveMQRATestBase {
@@ -73,7 +72,7 @@ public class ActiveMQRAClusteredTestBase extends ActiveMQRATestBase {
          index = 1;
       }
 
-      ConfigurationImpl configuration = createBasicConfig(index).setJMXManagementEnabled(false).clearAcceptorConfigurations().addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmMap)).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyMap)).addConnectorConfiguration(secondaryConnectorName, secondaryConnector).addConnectorConfiguration(primaryConnectorName, primaryConnector).addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(secondaryConnectorName, primaryConnectorName).setReconnectAttempts(0));
+      ConfigurationImpl configuration = createBasicConfig(index).setJMXManagementEnabled(false).clearAcceptorConfigurations().addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmMap)).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyMap)).addConnectorConfiguration(secondaryConnectorName, secondaryConnector).addConnectorConfiguration(primaryConnectorName, primaryConnector).addClusterConfiguration(basicClusterConnectionConfig(secondaryConnectorName, primaryConnectorName).setReconnectAttempts(0));
 
       recreateDataDirectories(getTestDir(), index, false);