You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2018/03/22 15:48:22 UTC
[2/2] activemq-artemis git commit: ARTEMIS-1748: Cleanup connections
when client connector is destroyed
ARTEMIS-1748: Cleanup connections when client connector is destroyed
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/be6cb7a2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/be6cb7a2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/be6cb7a2
Branch: refs/heads/master
Commit: be6cb7a22f22dbdec775546d9288bbdf5150dc68
Parents: 2f9d373
Author: Ulf Lilleengen <lu...@redhat.com>
Authored: Thu Mar 15 13:06:53 2018 +0100
Committer: Justin Bertram <jb...@apache.org>
Committed: Thu Mar 22 10:46:40 2018 -0500
----------------------------------------------------------------------
.../client/AMQPClientConnectionFactory.java | 2 +
.../client/ProtonClientConnectionManager.java | 9 +++-
.../amqp/AmqpOutboundConnectionTest.java | 55 +++++++++++++++++---
3 files changed, 57 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be6cb7a2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
index c633db8..af65937 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -59,6 +59,8 @@ public class AMQPClientConnectionFactory {
eventHandler.ifPresent(amqpConnection::addEventHandler);
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
+ delegate.addFailureListener(connectionCallback);
+ delegate.addCloseListener(connectionCallback);
connectionCallback.setProtonConnectionDelegate(delegate);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be6cb7a2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
index df0de77..f19aace 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/ProtonClientConnectionManager.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.client;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
@@ -63,7 +64,7 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
RemotingConnection connection = connectionMap.remove(connectionID);
if (connection != null) {
log.info("Connection " + connection.getRemoteAddress() + " destroyed");
- connection.disconnect(false);
+ connection.fail(new ActiveMQRemoteDisconnectException());
} else {
log.error("Connection with id " + connectionID + " not found in connectionDestroyed");
}
@@ -93,7 +94,7 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
public void stop() {
for (RemotingConnection connection : connectionMap.values()) {
- connection.disconnect(false);
+ connection.destroy();
}
}
@@ -106,4 +107,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
log.error("Connection with id " + connectionID + " not found in bufferReceived()!");
}
}
+
+ public RemotingConnection getConnection(Object connectionId) {
+ return connectionMap.get(connectionId);
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be6cb7a2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
index 8f98715..1156e18 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
@@ -23,7 +23,11 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -34,6 +38,7 @@ import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolMana
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Connection;
@@ -45,16 +50,21 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testOutboundConnection() throws Throwable {
- runOutboundConnectionTest(false);
+ runOutboundConnectionTest(false, true);
+ }
+
+ @Test(timeout = 60000)
+ public void testOutboundConnectionServerClose() throws Throwable {
+ runOutboundConnectionTest(false, false);
}
@Test(timeout = 60000)
public void testOutboundConnectionWithSecurity() throws Throwable {
- runOutboundConnectionTest(true);
+ runOutboundConnectionTest(true, true);
}
- private void runOutboundConnectionTest(boolean withSecurity) throws Exception {
+ private void runOutboundConnectionTest(boolean withSecurity, boolean closeFromClient) throws Exception {
final ActiveMQServer remote;
try {
securityEnabled = withSecurity;
@@ -92,17 +102,48 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager);
connector.start();
- connector.createConnection();
+
+ Object connectionId = connector.createConnection().getID();
+ assertNotNull(connectionId);
+ RemotingConnection remotingConnection = lifeCycleListener.getConnection(connectionId);
+
+ AtomicReference<ActiveMQException> ex = new AtomicReference<>();
+ AtomicBoolean closed = new AtomicBoolean(false);
+ remotingConnection.addCloseListener(() -> closed.set(true));
+ remotingConnection.addFailureListener(new FailureListener() {
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+ ex.set(exception);
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+ ex.set(exception);
+ }
+ });
try {
Wait.assertEquals(1, remote::getConnectionCount);
Wait.assertTrue(connectionOpened::get);
- lifeCycleListener.stop();
+ if (closeFromClient) {
+ lifeCycleListener.stop();
+ } else {
+ remote.stop();
+ }
Wait.assertEquals(0, remote::getConnectionCount);
+ assertTrue(remotingConnection.isDestroyed());
+ if (!closeFromClient) {
+ assertTrue(ex.get() instanceof ActiveMQRemoteDisconnectException);
+ } else {
+ assertNull(ex.get());
+ }
} finally {
- lifeCycleListener.stop();
- remote.stop();
+ if (closeFromClient) {
+ remote.stop();
+ } else {
+ lifeCycleListener.stop();
+ }
}
}