You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by tm...@apache.org on 2017/06/12 08:09:53 UTC
activemq git commit: [AMQ-6700] Leak of ActiveMQ Connection Executor
threads and ActiveMQConnection objects in JCA layer
Repository: activemq
Updated Branches:
refs/heads/AMQ-6700 [created] a1e595c18
[AMQ-6700] Leak of ActiveMQ Connection Executor threads and ActiveMQConnection objects in JCA layer
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a1e595c1
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a1e595c1
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a1e595c1
Branch: refs/heads/AMQ-6700
Commit: a1e595c18fbfb8a21e632b665f180005f1daf053
Parents: a678244
Author: Torsten Mielke <tm...@redhat.com>
Authored: Mon Jun 12 10:02:27 2017 +0200
Committer: Torsten Mielke <tm...@redhat.com>
Committed: Mon Jun 12 10:02:27 2017 +0200
----------------------------------------------------------------------
.../activemq/ra/ActiveMQResourceAdapter.java | 14 ++-
...veMQConnectionExecutorThreadCleanUpTest.java | 122 +++++++++++++++++++
2 files changed, 134 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/a1e595c1/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
index fd16603..b17674e 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java
@@ -355,8 +355,18 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement
}
private ActiveMQConnection newConnection() throws JMSException {
- ActiveMQConnection connection = makeConnection();
- connection.start();
+ ActiveMQConnection connection = null;
+ try {
+ connection = makeConnection();
+ connection.start();
+ } catch (JMSException ex) {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (JMSException ignore) { }
+ }
+ throw ex;
+ }
return connection;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/a1e595c1/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java
new file mode 100644
index 0000000..bec57d3
--- /dev/null
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java
@@ -0,0 +1,122 @@
+package org.apache.activemq.ra;
+
+import java.util.Set;
+import java.util.Iterator;
+import javax.transaction.xa.XAResource;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.ra.ActiveMQResourceAdapter;
+
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test for AMQ-6700.
+ * Will fail to connect to embedded broker using JCA and uses
+ * "ActiveMQ Connection Executor" thread to deal with low
+ * level exception. This tests verifies if this thread gets
+ * cleared up correctly after use.
+ */
+public class ActiveMQConnectionExecutorThreadCleanUpTest {
+
+ protected static Logger LOG =
+ LoggerFactory.getLogger(ActiveMQConnectionExecutorThreadCleanUpTest.class);
+ protected static final String AMQ_CONN_EXECUTOR_THREAD_NAME =
+ "ActiveMQ Connection Executor";
+ private BrokerService broker = null;
+
+
+ @Before
+ public void setUp() throws Exception {
+ LOG.info("Configuring broker programmatically.");
+ broker = new BrokerService();
+ broker.setPersistent(false);
+
+ // explicitly limiting to 0 connections so that test is unable
+ // to connect
+ broker.addConnector("tcp://localhost:0?maximumConnections=0");
+ broker.start();
+ broker.waitUntilStarted(5000);
+ }
+
+
+ @After
+ public void shutDown() throws Exception {
+ if (broker != null) {
+ if (broker.isStarted()) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ }
+ }
+
+
+ /**
+ * This test tries to create connections into the broker using the
+ * resource adapter's transaction recovery functionality.
+ * If the broker does not accept the connection, the connection's
+ * thread pool executor is used to deal with the error.
+ * This has lead to race conditions where the thread was not shutdown
+ * but got leaked.
+ * @throws Exception
+ */
+ @Test
+ public void testAMQConnectionExecutorThreadCleanUp() throws Exception {
+ LOG.info("testAMQConnectionExecutorThreadCleanUp() started.");
+
+ ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
+ ra.setServerUrl(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
+ LOG.info("Using brokerUrl " + ra.getServerUrl());
+
+ // running in a small loop as very occasionally the call to
+ // ActiveMQResourceAdapter.$2.makeConnection() raises an exception
+ // rather than using the connection's executor task to deal with the
+ // connection error.
+ for (int i=0; i<10; i++) {
+ LOG.debug("Iteration " + i);
+ try {
+ XAResource[] resources = ra.getXAResources(null);
+ resources[0].recover(100);
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage());
+ }
+ // allow some small time for thread cleanup to happen
+ Thread.sleep(300);
+
+ // check if thread exists
+ Assert.assertFalse("Thread named \"" +
+ AMQ_CONN_EXECUTOR_THREAD_NAME +
+ "\" not cleared up with ActiveMQConnection.",
+ hasActiveMQConnectionExceutorThread());
+ }
+ ra.stop();
+ }
+
+
+ /**
+ * Retrieves all threads from JVM and checks if any thread names contain
+ * AMQ_CONN_EXECUTOR_THREAD_NAME.
+ *
+ * @return true if such thread exists, otherwise false
+ */
+ public boolean hasActiveMQConnectionExceutorThread() {
+ // retrieve all threads
+ Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
+ Iterator<Thread> iter = threadSet.iterator();
+ while (iter.hasNext()) {
+ Thread thread = (Thread)iter.next();
+ if (thread.getName().startsWith(AMQ_CONN_EXECUTOR_THREAD_NAME )) {
+ LOG.error("Thread with name {} found.", thread.getName());
+ return true;
+ }
+ }
+ LOG.debug("Thread with name {} not found.", AMQ_CONN_EXECUTOR_THREAD_NAME);
+ return false;
+ }
+}