You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/10/30 01:55:47 UTC
[activemq-artemis] branch master updated: ARTEMIS-2969 /
ARTEMIS-2937 Dealing with Connection Timeout properly on AMQP Broker
Connections
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 9b0f760 ARTEMIS-2969 / ARTEMIS-2937 Dealing with Connection Timeout properly on AMQP Broker Connections
new caddd4f This closes #3322
9b0f760 is described below
commit 9b0f7605cb4bb768cd098e127644eda5b347cd28
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Oct 29 21:11:51 2020 -0400
ARTEMIS-2969 / ARTEMIS-2937 Dealing with Connection Timeout properly on AMQP Broker Connections
---
.../amqp/connect/AMQPBrokerConnection.java | 4 ++-
.../amqp/connect/AMQPBrokerConnectionManager.java | 2 +-
.../core/remoting/server/RemotingService.java | 4 +++
.../remoting/server/impl/RemotingServiceImpl.java | 7 +++-
.../integration/amqp/connect/AMQPBridgeTest.java | 11 ------
.../amqp/connect/QpidDispatchPeerTest.java | 41 ++++++++++++++--------
6 files changed, 41 insertions(+), 28 deletions(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index c307a27..39fa196 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -243,6 +243,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
}
ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory);
+ server.getRemotingService().addConnectionEntry(connection, entry);
protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler()));
@@ -304,7 +305,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
}
private static void uninstallMirrorController(AMQPMirrorBrokerConnectionElement replicaConfig, ActiveMQServer server) {
-
+ // TODO implement this as part of https://issues.apache.org/jira/browse/ARTEMIS-2965
}
/** The reason this method is static is the following:
@@ -498,6 +499,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
@Override
public void connectionDestroyed(Object connectionID) {
+ server.getRemotingService().removeConnection(connectionID);
redoConnection();
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
index 90382e4..aad8884 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
@@ -70,7 +70,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
for (AMQPBrokerConnectConfiguration config : amqpConnectionsConfig) {
NettyConnectorFactory factory = new NettyConnectorFactory().setServerConnector(true);
- protonProtocolManager = (ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, null, null, null);
+ protonProtocolManager = (ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, config.getTransportConfigurations().get(0).getExtraParams(), null, null);
NettyConnector bridgesConnector = (NettyConnector)factory.createConnector(config.getTransportConfigurations().get(0).getParams(), null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(), server.getScheduledPool(), new ClientProtocolManagerWithAMQP(protonProtocolManager));
bridgesConnector.start();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
index ba6e390..92f44f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
@@ -25,9 +25,11 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.ReusableLatch;
public interface RemotingService {
@@ -123,4 +125,6 @@ public interface RemotingService {
void destroyAcceptor(String name) throws Exception;
void loadProtocolServices(List<ActiveMQComponent> protocolServices);
+
+ void addConnectionEntry(Connection connection, ConnectionEntry entry);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 9e914a8..6a1c0f3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -562,12 +562,17 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
logger.trace("Connection created " + connection);
}
- connections.put(connection.getID(), entry);
+ addConnectionEntry(connection, entry);
connectionCountLatch.countUp();
totalConnectionCount.incrementAndGet();
}
@Override
+ public void addConnectionEntry(Connection connection, ConnectionEntry entry) {
+ connections.put(connection.getID(), entry);
+ }
+
+ @Override
public void connectionDestroyed(final Object connectionID) {
if (logger.isTraceEnabled()) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeTest.java
index 085c570..e4b7fbe 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeTest.java
@@ -55,17 +55,6 @@ public class AMQPBridgeTest extends AmqpClientTestSupport {
}
@Test
- public void testsSimpleConnect() throws Exception {
- server.start();
- server_2 = createServer(AMQP_PORT_2, false);
-
- AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
- server_2.getConfiguration().addAMQPConnection(amqpConnection);
-
- server_2.start();
- }
-
- @Test
public void testSimpleTransferPush() throws Exception {
internalTransferPush("TEST", false);
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java
index a510bbe..369f030 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java
@@ -67,7 +67,9 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
@Override
protected ActiveMQServer createServer() throws Exception {
- return createServer(AMQP_PORT, false);
+ ActiveMQServer server = createServer(AMQP_PORT, false);
+ server.getConfiguration().setNetworkCheckPeriod(100);
+ return server;
}
@Before
@@ -82,24 +84,34 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
}
@Test(timeout = 60_000)
+ public void testWithMatchingDifferentNamesOnQueueKill() throws Exception {
+ internalMultipleQueues(true, true, true);
+ }
+
+ @Test(timeout = 60_000)
public void testWithMatchingDifferentNamesOnQueue() throws Exception {
- internalMultipleQueues(true, true);
+ internalMultipleQueues(true, true, false);
}
@Test(timeout = 60_000)
public void testWithMatching() throws Exception {
- internalMultipleQueues(true, false);
+ internalMultipleQueues(true, false, false);
}
@Test(timeout = 60_000)
public void testwithQueueName() throws Exception {
- internalMultipleQueues(false, true);
+ internalMultipleQueues(false, false, false);
+ }
+
+ @Test(timeout = 60_000)
+ public void testwithQueueNameDistinctName() throws Exception {
+ internalMultipleQueues(false, true, false);
}
- private void internalMultipleQueues(boolean useMatching, boolean distinctNaming) throws Exception {
+ private void internalMultipleQueues(boolean useMatching, boolean distinctNaming, boolean kill) throws Exception {
final int numberOfMessages = 100;
final int numberOfQueues = 10;
- AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24622").setRetryInterval(10).setReconnectAttempts(-1);
+ AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24622?amqpIdleTimeout=1000").setRetryInterval(10).setReconnectAttempts(-1);
if (useMatching) {
amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress("queue.#").setType(AMQPBrokerConnectionAddressType.PEER));
} else {
@@ -118,7 +130,7 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
ConnectionFactory factoryProducer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622");
Connection connection = null;
- connection = createConnectionDumbRetry(factoryProducer, connection);
+ connection = createConnectionDumbRetry(factoryProducer);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("queue.test" + dest);
@@ -135,10 +147,14 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
connection.close();
}
+ if (kill) {
+ stopQpidRouter();
+ startQpidRouter();
+ }
for (int dest = 0; dest < numberOfQueues; dest++) {
ConnectionFactory factoryConsumer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622");
- Connection connectionConsumer = factoryConsumer.createConnection();
+ Connection connectionConsumer = createConnectionDumbRetry(factoryConsumer);
Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queueConsumer = sessionConsumer.createQueue("queue.test" + dest);
MessageConsumer consumer = sessionConsumer.createConsumer(queueConsumer);
@@ -167,7 +183,6 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue(createQueueName(dest, distinctNaming));
Wait.assertEquals(0, testQueueOnServer::getMessageCount);
}
-
}
private String createQueueName(int i, boolean useDistinctName) {
@@ -178,18 +193,16 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
}
}
- private Connection createConnectionDumbRetry(ConnectionFactory factoryProducer,
- Connection connection) throws InterruptedException {
+ private Connection createConnectionDumbRetry(ConnectionFactory factoryProducer) throws InterruptedException {
for (int i = 0; i < 100; i++) {
try {
// Some retry
- connection = factoryProducer.createConnection();
- break;
+ return factoryProducer.createConnection();
} catch (Exception e) {
Thread.sleep(10);
}
}
- return connection;
+ return null;
}
}