You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/07/08 15:27:47 UTC

[activemq-artemis] branch master updated (a69b2ae -> 3396ac3)

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git.


    from a69b2ae  ARTEMIS-2836 - console diagram view, remove broken links and verbose logging
     new 6f8ff55  ARTEMIS-2835 Porting HORNETQ-1575 and HORNETQ-1578
     new 8c25dae  ARTEMIS-2835 Porting HORNETQ-1575 and HORNETQ-1578
     new 3396ac3  This closes #3211

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../core/client/impl/ClientSessionFactoryImpl.java | 23 +++++-
 .../core/client/impl/ServerLocatorImpl.java        | 57 +++++++++++----
 .../artemis/core/server/ActiveMQServerLogger.java  |  4 ++
 .../server/impl/SharedNothingLiveActivation.java   |  5 ++
 .../integration/jms/cluster/JMSFailoverTest.java   | 82 +++++++++++++++++++++-
 5 files changed, 153 insertions(+), 18 deletions(-)


[activemq-artemis] 02/03: ARTEMIS-2835 Porting HORNETQ-1575 and HORNETQ-1578

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 8c25daee985d18e9ceb48fc50a4bd8b558075e87
Author: Howard Gao <ho...@gmail.com>
AuthorDate: Tue Jul 7 17:11:35 2020 +0800

    ARTEMIS-2835 Porting HORNETQ-1575 and HORNETQ-1578
    
    2 of 2) - Porting of HORNETQ-1578
    
    Exceptions are swallowed, making it hard to diagnose issues
---
 .../apache/activemq/artemis/core/server/ActiveMQServerLogger.java    | 4 ++++
 .../artemis/core/server/impl/SharedNothingLiveActivation.java        | 5 +++++
 2 files changed, 9 insertions(+)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 3ae605e..ff4c728 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -2100,4 +2100,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224104, value = "Error starting the Acceptor {0} {1}", format = Message.Format.MESSAGE_FORMAT)
    void errorStartingAcceptor(String name, Object configuration);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 224105, value = "Connecting to cluster failed")
+   void failedConnectingToCluster(@Cause Exception e);
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
index 660d442..5e3f225 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.activemq.artemis.api.core.ActiveMQAlreadyReplicatingException;
 import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
 import org.apache.activemq.artemis.api.core.Pair;
@@ -330,6 +331,10 @@ public class SharedNothingLiveActivation extends LiveActivation {
             // Just try connecting
             listener.latch.await(5, TimeUnit.SECONDS);
          } catch (Exception notConnected) {
+            if (!(notConnected instanceof ActiveMQException) || ActiveMQExceptionType.INTERNAL_ERROR.equals(((ActiveMQException) notConnected).getType())) {
+               // report all exceptions that aren't ActiveMQException and all INTERNAL_ERRORs
+               ActiveMQServerLogger.LOGGER.failedConnectingToCluster(notConnected);
+            }
             return false;
          }
 


[activemq-artemis] 03/03: This closes #3211

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 3396ac355a7f07b9ae4e202c5fed5120c984d141
Merge: a69b2ae 8c25dae
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed Jul 8 11:27:36 2020 -0400

    This closes #3211

 .../core/client/impl/ClientSessionFactoryImpl.java | 23 +++++-
 .../core/client/impl/ServerLocatorImpl.java        | 57 +++++++++++----
 .../artemis/core/server/ActiveMQServerLogger.java  |  4 ++
 .../server/impl/SharedNothingLiveActivation.java   |  5 ++
 .../integration/jms/cluster/JMSFailoverTest.java   | 82 +++++++++++++++++++++-
 5 files changed, 153 insertions(+), 18 deletions(-)


[activemq-artemis] 01/03: ARTEMIS-2835 Porting HORNETQ-1575 and HORNETQ-1578

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 6f8ff55dec629504d34b7dd3757bff24fa16c575
Author: Howard Gao <ho...@gmail.com>
AuthorDate: Tue Jul 7 17:10:45 2020 +0800

    ARTEMIS-2835 Porting HORNETQ-1575 and HORNETQ-1578
    
    1 of 2) - Porting of HORNETMQ-1575
    
    In a live-backup scenario, when live is down and backup becomes live, clients
    using HA Connection Factories can failover automatically. However if a
    client decides to create a new connection by itself (as in camel jms case)
    there is a chance that the new connection is pointing to the dead live
    and the connection won't be successful. The reason is that if the old
    connection is gone the backup will not get a chance to announce itself
    back to client so it fails on initial connection.
    
    The fix is to let CF remember the old topology and use it on any
    initial connection attempts.
---
 .../core/client/impl/ClientSessionFactoryImpl.java | 23 +++++-
 .../core/client/impl/ServerLocatorImpl.java        | 57 +++++++++++----
 .../integration/jms/cluster/JMSFailoverTest.java   | 82 +++++++++++++++++++++-
 3 files changed, 144 insertions(+), 18 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 583d8fb..75981cf 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -163,6 +163,19 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
                                    final ScheduledExecutorService scheduledThreadPool,
                                    final List<Interceptor> incomingInterceptors,
                                    final List<Interceptor> outgoingInterceptors) {
+      this(serverLocator, new Pair<>(connectorConfig, null),
+               locatorConfig, reconnectAttempts, threadPool,
+               scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
+   }
+
+   ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
+                          final Pair<TransportConfiguration, TransportConfiguration> connectorConfig,
+                          final ServerLocatorConfig locatorConfig,
+                          final int reconnectAttempts,
+                          final Executor threadPool,
+                          final ScheduledExecutorService scheduledThreadPool,
+                          final List<Interceptor> incomingInterceptors,
+                          final List<Interceptor> outgoingInterceptors) {
       createTrace = new Exception();
 
       this.serverLocator = serverLocator;
@@ -171,11 +184,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
       this.clientProtocolManager.setSessionFactory(this);
 
-      this.currentConnectorConfig = connectorConfig;
+      this.currentConnectorConfig = connectorConfig.getA();
 
-      connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
+      connectorFactory = instantiateConnectorFactory(connectorConfig.getA().getFactoryClassName());
 
-      checkTransportKeys(connectorFactory, connectorConfig);
+      checkTransportKeys(connectorFactory, connectorConfig.getA());
 
       this.callTimeout = locatorConfig.callTimeout;
 
@@ -216,6 +229,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
       confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0);
 
       connectionReadyForWrites = true;
+
+      if (connectorConfig.getB() != null) {
+         this.backupConfig = connectorConfig.getB();
+      }
    }
 
    @Override
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index 3ac249f..d345e06 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -165,8 +165,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
 
    private TransportConfiguration clusterTransportConfiguration;
 
-   private boolean useTopologyForLoadBalancing;
-
    /** For tests only */
    public DiscoveryGroup getDiscoveryGroup() {
       return discoveryGroup;
@@ -422,7 +420,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       clusterTransportConfiguration = locator.clusterTransportConfiguration;
    }
 
-   private TransportConfiguration selectConnector() {
+   private synchronized Pair<TransportConfiguration, TransportConfiguration> selectConnector(boolean useInitConnector) {
       Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
 
       flushTopology();
@@ -432,14 +430,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       }
 
       synchronized (this) {
-         if (usedTopology != null && config.useTopologyForLoadBalancing) {
+         if (usedTopology != null && config.useTopologyForLoadBalancing && !useInitConnector) {
             if (logger.isTraceEnabled()) {
                logger.trace("Selecting connector from topology.");
             }
             int pos = loadBalancingPolicy.select(usedTopology.length);
             Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos];
 
-            return pair.getA();
+            return pair;
          } else {
             if (logger.isTraceEnabled()) {
                logger.trace("Selecting connector from initial connectors.");
@@ -447,7 +445,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
 
             int pos = loadBalancingPolicy.select(initialConnectors.length);
 
-            return initialConnectors[pos];
+            return new Pair(initialConnectors[pos], null);
          }
       }
    }
@@ -658,10 +656,19 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       synchronized (this) {
          boolean retry = true;
          int attempts = 0;
+         boolean topologyArrayTried = !config.useTopologyForLoadBalancing || topologyArray == null || topologyArray.length == 0;
+         boolean staticTried = false;
+         boolean shouldTryStatic = !config.useTopologyForLoadBalancing || !receivedTopology || topologyArray == null || topologyArray.length == 0;
+
          while (retry && !isClosed()) {
             retry = false;
 
-            TransportConfiguration tc = selectConnector();
+            /*
+             * The logic is: If receivedTopology is false, try static first.
+             * if receivedTopology is true, try topologyArray first
+             */
+            Pair<TransportConfiguration, TransportConfiguration> tc = selectConnector(shouldTryStatic);
+
             if (tc == null) {
                throw ActiveMQClientMessageBundle.BUNDLE.noTCForSessionFactory();
             }
@@ -682,12 +689,32 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
                try {
                   if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) {
                      attempts++;
-
-                     int connectorsSize = getConnectorsSize();
                      int maxAttempts = config.initialConnectAttempts == 0 ? 1 : config.initialConnectAttempts;
 
-                     if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) {
-                        throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
+                     if (shouldTryStatic) {
+                        //we know static is used
+                        if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * this.getNumInitialConnectors()) {
+                           if (topologyArrayTried) {
+                              //stop retry and throw exception
+                              throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
+                           } else {
+                              //lets try topologyArray
+                              staticTried = true;
+                              shouldTryStatic = false;
+                              attempts = 0;
+                           }
+                        }
+                     } else {
+                        //we know topologyArray is used
+                        if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * getConnectorsSize()) {
+                           if (staticTried) {
+                              throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
+                           } else {
+                              topologyArrayTried = true;
+                              shouldTryStatic = true;
+                              attempts = 0;
+                           }
+                        }
                      }
                      if (factory.waitForRetry(config.retryInterval)) {
                         throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
@@ -1414,7 +1441,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
          if (topology.isEmpty()) {
             // Resetting the topology to its original condition as it was brand new
             receivedTopology = false;
-            topologyArray = null;
          } else {
             updateArraysAndPairs(eventTime);
 
@@ -1492,6 +1518,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       synchronized (topologyArrayGuard) {
          Collection<TopologyMemberImpl> membersCopy = topology.getMembers();
 
+         if (membersCopy.size() == 0) {
+            //it could happen when live is down, in that case we keeps the old copy
+            //and don't update
+            return;
+         }
+
          Pair<TransportConfiguration, TransportConfiguration>[] topologyArrayLocal = (Pair<TransportConfiguration, TransportConfiguration>[]) Array.newInstance(Pair.class, membersCopy.size());
 
          int count = 0;
@@ -1557,7 +1589,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
 
       if (!clusterConnection && isEmpty) {
          receivedTopology = false;
-         topologyArray = null;
       }
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java
index 7f22235..7a6ca62 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java
@@ -26,6 +26,8 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import java.lang.reflect.Field;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -34,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -41,6 +44,9 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.api.jms.JMSFactoryType;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
+import org.apache.activemq.artemis.core.client.impl.Topology;
+import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
 import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
@@ -254,6 +260,7 @@ public class JMSFailoverTest extends ActiveMQTestBase {
       jbcfLive.setBlockOnDurableSend(true);
 
       ActiveMQConnectionFactory jbcfBackup = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams));
+
       jbcfBackup.setBlockOnNonDurableSend(true);
       jbcfBackup.setBlockOnDurableSend(true);
       jbcfBackup.setInitialConnectAttempts(-1);
@@ -437,6 +444,74 @@ public class JMSFailoverTest extends ActiveMQTestBase {
 
    }
 
+   @Test
+   public void testCreateNewConnectionAfterFailover() throws Exception {
+      ActiveMQConnectionFactory jbcf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, livetc);
+      jbcf.setInitialConnectAttempts(5);
+      jbcf.setRetryInterval(1000);
+      jbcf.setReconnectAttempts(-1);
+
+      Connection conn1 = null, conn2 = null, conn3 = null;
+
+      try {
+         conn1 = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
+
+         conn2 = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         ClientSession coreSession1 = ((ActiveMQSession)sess1).getCoreSession();
+         ClientSession coreSession2 = ((ActiveMQSession)sess2).getCoreSession();
+
+         Topology fullTopology = jbcf.getServerLocator().getTopology();
+         Collection<TopologyMemberImpl> members = fullTopology.getMembers();
+         assertEquals(1, members.size());
+         TopologyMemberImpl member = members.iterator().next();
+         TransportConfiguration tcLive = member.getLive();
+         TransportConfiguration tcBackup = member.getBackup();
+
+         System.out.println("live tc: " + tcLive);
+         System.out.println("Backup tc: " + tcBackup);
+
+         JMSUtil.crash(liveServer, coreSession1, coreSession2);
+
+         waitForServerToStart(backupServer);
+
+         //now pretending that the live down event hasn't been propagated to client
+         simulateLiveDownHasNotReachClient((ServerLocatorImpl) jbcf.getServerLocator(), tcLive, tcBackup);
+
+         //now create a new connection after live is down
+         try {
+            conn3 = jbcf.createConnection();
+         } catch (Exception e) {
+            fail("The new connection should be established successfully after failover");
+         }
+      } finally {
+         if (conn1 != null) {
+            conn1.close();
+         }
+         if (conn2 != null) {
+            conn2.close();
+         }
+         if (conn3 != null) {
+            conn3.close();
+         }
+      }
+   }
+
+   private void simulateLiveDownHasNotReachClient(ServerLocatorImpl locator, TransportConfiguration tcLive, TransportConfiguration tcBackup) throws NoSuchFieldException, IllegalAccessException {
+      Field f = locator.getClass().getDeclaredField("topologyArray");
+      f.setAccessible(true);
+
+      Pair<TransportConfiguration, TransportConfiguration>[] value = (Pair<TransportConfiguration, TransportConfiguration>[]) f.get(locator);
+      assertEquals(1, value.length);
+      Pair<TransportConfiguration, TransportConfiguration> member = value[0];
+      member.setA(tcLive);
+      member.setB(tcBackup);
+      f.set(locator, value);
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -463,7 +538,10 @@ public class JMSFailoverTest extends ActiveMQTestBase {
 
       backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
 
-      backupConf = createBasicConfig().addAcceptorConfiguration(backupAcceptortc).addConnectorConfiguration(livetc.getName(), livetc).addConnectorConfiguration(backuptc.getName(), backuptc).setSecurityEnabled(false).setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, backupParams)).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessage [...]
+      backuptc.getParams().put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      backupAcceptortc.getParams().put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+      backupConf = createBasicConfig().addConnectorConfiguration(livetc.getName(), livetc).addConnectorConfiguration(backuptc.getName(), backuptc).setSecurityEnabled(false).setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, backupParams)).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).setPersis [...]
 
       backupServer = addServer(new InVMNodeManagerServer(backupConf, nodeManager));
 
@@ -484,7 +562,7 @@ public class JMSFailoverTest extends ActiveMQTestBase {
       liveJMSServer.setRegistry(new JndiBindingRegistry(ctx1));
 
       liveJMSServer.getActiveMQServer().setIdentity("JMSLive");
-      log.debug("Starting life");
+      log.debug("Starting live");
 
       liveJMSServer.start();