You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/10/30 01:55:47 UTC

[activemq-artemis] branch master updated: ARTEMIS-2969 / ARTEMIS-2937 Dealing with Connection Timeout properly on AMQP Broker Connections

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b0f760  ARTEMIS-2969 / ARTEMIS-2937 Dealing with Connection Timeout properly on AMQP Broker Connections
     new caddd4f  This closes #3322
9b0f760 is described below

commit 9b0f7605cb4bb768cd098e127644eda5b347cd28
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Oct 29 21:11:51 2020 -0400

    ARTEMIS-2969 / ARTEMIS-2937 Dealing with Connection Timeout properly on AMQP Broker Connections
---
 .../amqp/connect/AMQPBrokerConnection.java         |  4 ++-
 .../amqp/connect/AMQPBrokerConnectionManager.java  |  2 +-
 .../core/remoting/server/RemotingService.java      |  4 +++
 .../remoting/server/impl/RemotingServiceImpl.java  |  7 +++-
 .../integration/amqp/connect/AMQPBridgeTest.java   | 11 ------
 .../amqp/connect/QpidDispatchPeerTest.java         | 41 ++++++++++++++--------
 6 files changed, 41 insertions(+), 28 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index c307a27..39fa196 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -243,6 +243,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
          }
 
          ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory);
+         server.getRemotingService().addConnectionEntry(connection, entry);
          protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
          connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler()));
 
@@ -304,7 +305,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
    }
 
    private static void uninstallMirrorController(AMQPMirrorBrokerConnectionElement replicaConfig, ActiveMQServer server) {
-
+      // TODO implement this as part of https://issues.apache.org/jira/browse/ARTEMIS-2965
    }
 
    /** The reason this method is static is the following:
@@ -498,6 +499,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
 
    @Override
    public void connectionDestroyed(Object connectionID) {
+      server.getRemotingService().removeConnection(connectionID);
       redoConnection();
    }
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
index 90382e4..aad8884 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
@@ -70,7 +70,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
 
       for (AMQPBrokerConnectConfiguration config : amqpConnectionsConfig) {
          NettyConnectorFactory factory = new NettyConnectorFactory().setServerConnector(true);
-         protonProtocolManager = (ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, null, null, null);
+         protonProtocolManager = (ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, config.getTransportConfigurations().get(0).getExtraParams(), null, null);
          NettyConnector bridgesConnector = (NettyConnector)factory.createConnector(config.getTransportConfigurations().get(0).getParams(), null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(), server.getScheduledPool(), new ClientProtocolManagerWithAMQP(protonProtocolManager));
          bridgesConnector.start();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
index ba6e390..92f44f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
@@ -25,9 +25,11 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.utils.ReusableLatch;
 
 public interface RemotingService {
@@ -123,4 +125,6 @@ public interface RemotingService {
    void destroyAcceptor(String name) throws Exception;
 
    void loadProtocolServices(List<ActiveMQComponent> protocolServices);
+
+   void addConnectionEntry(Connection connection, ConnectionEntry entry);
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 9e914a8..6a1c0f3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -562,12 +562,17 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
          logger.trace("Connection created " + connection);
       }
 
-      connections.put(connection.getID(), entry);
+      addConnectionEntry(connection, entry);
       connectionCountLatch.countUp();
       totalConnectionCount.incrementAndGet();
    }
 
    @Override
+   public void addConnectionEntry(Connection connection, ConnectionEntry entry) {
+      connections.put(connection.getID(), entry);
+   }
+
+   @Override
    public void connectionDestroyed(final Object connectionID) {
 
       if (logger.isTraceEnabled()) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeTest.java
index 085c570..e4b7fbe 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeTest.java
@@ -55,17 +55,6 @@ public class AMQPBridgeTest extends AmqpClientTestSupport {
    }
 
    @Test
-   public void testsSimpleConnect() throws Exception {
-      server.start();
-      server_2 = createServer(AMQP_PORT_2, false);
-
-      AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
-      server_2.getConfiguration().addAMQPConnection(amqpConnection);
-
-      server_2.start();
-   }
-
-   @Test
    public void testSimpleTransferPush() throws Exception {
       internalTransferPush("TEST", false);
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java
index a510bbe..369f030 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java
@@ -67,7 +67,9 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
 
    @Override
    protected ActiveMQServer createServer() throws Exception {
-      return createServer(AMQP_PORT, false);
+      ActiveMQServer server = createServer(AMQP_PORT, false);
+      server.getConfiguration().setNetworkCheckPeriod(100);
+      return server;
    }
 
    @Before
@@ -82,24 +84,34 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 60_000)
+   public void testWithMatchingDifferentNamesOnQueueKill() throws Exception {
+      internalMultipleQueues(true, true, true);
+   }
+
+   @Test(timeout = 60_000)
    public void testWithMatchingDifferentNamesOnQueue() throws Exception {
-      internalMultipleQueues(true, true);
+      internalMultipleQueues(true, true, false);
    }
 
    @Test(timeout = 60_000)
    public void testWithMatching() throws Exception {
-      internalMultipleQueues(true, false);
+      internalMultipleQueues(true, false, false);
    }
 
    @Test(timeout = 60_000)
    public void testwithQueueName() throws Exception {
-      internalMultipleQueues(false, true);
+      internalMultipleQueues(false, false, false);
+   }
+
+   @Test(timeout = 60_000)
+   public void testwithQueueNameDistinctName() throws Exception {
+      internalMultipleQueues(false, true, false);
    }
 
-   private void internalMultipleQueues(boolean useMatching, boolean distinctNaming) throws Exception {
+   private void internalMultipleQueues(boolean useMatching, boolean distinctNaming, boolean kill) throws Exception {
       final int numberOfMessages = 100;
       final int numberOfQueues = 10;
-      AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24622").setRetryInterval(10).setReconnectAttempts(-1);
+      AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24622?amqpIdleTimeout=1000").setRetryInterval(10).setReconnectAttempts(-1);
       if (useMatching) {
          amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress("queue.#").setType(AMQPBrokerConnectionAddressType.PEER));
       } else {
@@ -118,7 +130,7 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
          ConnectionFactory factoryProducer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622");
          Connection connection = null;
 
-         connection = createConnectionDumbRetry(factoryProducer, connection);
+         connection = createConnectionDumbRetry(factoryProducer);
 
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Queue queue = session.createQueue("queue.test" + dest);
@@ -135,10 +147,14 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
          connection.close();
       }
 
+      if (kill) {
+         stopQpidRouter();
+         startQpidRouter();
+      }
 
       for (int dest = 0; dest < numberOfQueues; dest++) {
          ConnectionFactory factoryConsumer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622");
-         Connection connectionConsumer = factoryConsumer.createConnection();
+         Connection connectionConsumer = createConnectionDumbRetry(factoryConsumer);
          Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Queue queueConsumer = sessionConsumer.createQueue("queue.test" + dest);
          MessageConsumer consumer = sessionConsumer.createConsumer(queueConsumer);
@@ -167,7 +183,6 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
          org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue(createQueueName(dest, distinctNaming));
          Wait.assertEquals(0, testQueueOnServer::getMessageCount);
       }
-
    }
 
    private String createQueueName(int i, boolean useDistinctName) {
@@ -178,18 +193,16 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
       }
    }
 
-   private Connection createConnectionDumbRetry(ConnectionFactory factoryProducer,
-                                                Connection connection) throws InterruptedException {
+   private Connection createConnectionDumbRetry(ConnectionFactory factoryProducer) throws InterruptedException {
       for (int i = 0; i < 100; i++) {
          try {
             // Some retry
-            connection = factoryProducer.createConnection();
-            break;
+            return factoryProducer.createConnection();
          } catch (Exception e) {
             Thread.sleep(10);
          }
       }
-      return connection;
+      return null;
    }
 
 }