You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2018/03/22 15:48:22 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1748: Cleanup connections when client connector is destroyed

ARTEMIS-1748: Cleanup connections when client connector is destroyed


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/be6cb7a2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/be6cb7a2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/be6cb7a2

Branch: refs/heads/master
Commit: be6cb7a22f22dbdec775546d9288bbdf5150dc68
Parents: 2f9d373
Author: Ulf Lilleengen <lu...@redhat.com>
Authored: Thu Mar 15 13:06:53 2018 +0100
Committer: Justin Bertram <jb...@apache.org>
Committed: Thu Mar 22 10:46:40 2018 -0500

----------------------------------------------------------------------
 .../client/AMQPClientConnectionFactory.java     |  2 +
 .../client/ProtonClientConnectionManager.java   |  9 +++-
 .../amqp/AmqpOutboundConnectionTest.java        | 55 +++++++++++++++++---
 3 files changed, 57 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be6cb7a2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
index c633db8..af65937 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -59,6 +59,8 @@ public class AMQPClientConnectionFactory {
       eventHandler.ifPresent(amqpConnection::addEventHandler);
 
       ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
+      delegate.addFailureListener(connectionCallback);
+      delegate.addCloseListener(connectionCallback);
 
       connectionCallback.setProtonConnectionDelegate(delegate);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be6cb7a2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
index df0de77..f19aace 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.client;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
 import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
@@ -63,7 +64,7 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
       RemotingConnection connection = connectionMap.remove(connectionID);
       if (connection != null) {
          log.info("Connection " + connection.getRemoteAddress() + " destroyed");
-         connection.disconnect(false);
+         connection.fail(new ActiveMQRemoteDisconnectException());
       } else {
          log.error("Connection with id " + connectionID + " not found in connectionDestroyed");
       }
@@ -93,7 +94,7 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
 
    public void stop() {
       for (RemotingConnection connection : connectionMap.values()) {
-         connection.disconnect(false);
+         connection.destroy();
       }
    }
 
@@ -106,4 +107,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
          log.error("Connection with id " + connectionID + " not found in bufferReceived()!");
       }
    }
+
+   public RemotingConnection getConnection(Object connectionId) {
+      return connectionMap.get(connectionId);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be6cb7a2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
index 8f98715..1156e18 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
@@ -23,7 +23,11 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -34,6 +38,7 @@ import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolMana
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
 import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
 import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.engine.Connection;
@@ -45,16 +50,21 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
 
    @Test(timeout = 60000)
    public void testOutboundConnection() throws Throwable {
-      runOutboundConnectionTest(false);
+      runOutboundConnectionTest(false, true);
+   }
+
+   @Test(timeout = 60000)
+   public void testOutboundConnectionServerClose() throws Throwable {
+      runOutboundConnectionTest(false, false);
    }
 
    @Test(timeout = 60000)
    public void testOutboundConnectionWithSecurity() throws Throwable {
-      runOutboundConnectionTest(true);
+      runOutboundConnectionTest(true, true);
    }
 
 
-   private void runOutboundConnectionTest(boolean withSecurity) throws Exception {
+   private void runOutboundConnectionTest(boolean withSecurity, boolean closeFromClient) throws Exception {
       final ActiveMQServer remote;
       try {
          securityEnabled = withSecurity;
@@ -92,17 +102,48 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
       ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
       NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager);
       connector.start();
-      connector.createConnection();
+
+      Object connectionId = connector.createConnection().getID();
+      assertNotNull(connectionId);
+      RemotingConnection remotingConnection = lifeCycleListener.getConnection(connectionId);
+
+      AtomicReference<ActiveMQException> ex = new AtomicReference<>();
+      AtomicBoolean closed = new AtomicBoolean(false);
+      remotingConnection.addCloseListener(() -> closed.set(true));
+      remotingConnection.addFailureListener(new FailureListener() {
+         @Override
+         public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+            ex.set(exception);
+         }
+
+         @Override
+         public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+            ex.set(exception);
+         }
+      });
 
       try {
          Wait.assertEquals(1, remote::getConnectionCount);
          Wait.assertTrue(connectionOpened::get);
-         lifeCycleListener.stop();
+         if (closeFromClient) {
+            lifeCycleListener.stop();
+         } else {
+            remote.stop();
+         }
 
          Wait.assertEquals(0, remote::getConnectionCount);
+         assertTrue(remotingConnection.isDestroyed());
+         if (!closeFromClient) {
+            assertTrue(ex.get() instanceof ActiveMQRemoteDisconnectException);
+         } else {
+            assertNull(ex.get());
+         }
       } finally {
-         lifeCycleListener.stop();
-         remote.stop();
+         if (closeFromClient) {
+            remote.stop();
+         } else {
+            lifeCycleListener.stop();
+         }
       }
    }