You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/07/20 19:07:05 UTC
qpid-jms git commit: QPIDJMS-304 Improve the thread tracking and
completion shutdown
Repository: qpid-jms
Updated Branches:
refs/heads/master 28a1bdc85 -> bafac0c5b
QPIDJMS-304 Improve the thread tracking and completion shutdown
Improve how session executor threads are tracked and clean up the
handling of the completion executor life cycle
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/bafac0c5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/bafac0c5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/bafac0c5
Branch: refs/heads/master
Commit: bafac0c5bf06466926bcd2b1d4d6cfd21bee98c2
Parents: 28a1bdc
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jul 20 15:02:13 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jul 20 15:02:13 2017 -0400
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsSession.java | 49 ++++++--------------
.../qpid/jms/util/QpidJMSThreadFactory.java | 46 +++++++++++++++++-
.../integration/ConsumerIntegrationTest.java | 4 +-
3 files changed, 59 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bafac0c5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index a69eae3..700d36f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -111,8 +111,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
private final ReentrantLock sendLock = new ReentrantLock();
private volatile ExecutorService deliveryExecutor;
private volatile ExecutorService completionExcecutor;
- private Thread deliveryThread;
- private Thread completionThread;
+ private AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>();
+ private AtomicReference<Thread> completionThread = new AtomicReference<Thread>();
private final AtomicLong consumerIdGenerator = new AtomicLong();
private final AtomicLong producerIdGenerator = new AtomicLong();
@@ -144,6 +144,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
// We always keep an open TX so start now.
getTransactionContext().begin();
+
+ // Start the completion executor now as it's needed throughout the
+ // lifetime of the Session.
+ getCompletionExecutor();
}
int acknowledgementMode() {
@@ -305,7 +309,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
// Ensure that no asynchronous completion sends remain blocked after close.
synchronized (sessionInfo) {
- ensureCompletionExecutorExists();
if (cause == null) {
cause = new JMSException("Session closed remotely before message transfer result was notified");
}
@@ -1032,16 +1035,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
synchronized (sessionInfo) {
if (deliveryExecutor == null) {
if (!closed.get()) {
- exec = createExecutor("delivery dispatcher");
- exec.execute(new Runnable() {
-
- @Override
- public void run() {
- JmsSession.this.deliveryThread = Thread.currentThread();
- }
- });
-
- deliveryExecutor = exec;
+ deliveryExecutor = exec = createExecutor("delivery dispatcher", deliveryThread);
} else {
return NoOpExecutor.INSTANCE;
}
@@ -1055,29 +1049,12 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
Executor getCompletionExecutor() {
- return getCompletionExecutor(false);
- }
-
- private void ensureCompletionExecutorExists() {
- getCompletionExecutor(true);
- }
-
- private Executor getCompletionExecutor(boolean ignoreClosed) {
ExecutorService exec = completionExcecutor;
if (exec == null) {
synchronized (sessionInfo) {
if (completionExcecutor == null) {
- if (!closed.get() || ignoreClosed) {
- exec = createExecutor("completion dispatcher");
- exec.execute(new Runnable() {
-
- @Override
- public void run() {
- JmsSession.this.completionThread = Thread.currentThread();
- }
- });
-
- completionExcecutor = exec;
+ if (!closed.get()) {
+ completionExcecutor = exec = createExecutor("completion dispatcher", completionThread);;
} else {
return NoOpExecutor.INSTANCE;
}
@@ -1090,9 +1067,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
return exec;
}
- private ExecutorService createExecutor(final String threadNameSuffix) {
+ private ExecutorService createExecutor(final String threadNameSuffix, AtomicReference<Thread> threadTracker) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
- new QpidJMSThreadFactory("JmsSession ["+ sessionInfo.getId() + "] " + threadNameSuffix, true));
+ new QpidJMSThreadFactory("JmsSession ["+ sessionInfo.getId() + "] " + threadNameSuffix, true, threadTracker));
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
@@ -1154,13 +1131,13 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
void checkIsDeliveryThread() throws JMSException {
- if (Thread.currentThread().equals(deliveryThread)) {
+ if (Thread.currentThread().equals(deliveryThread.get())) {
throw new IllegalStateException("Illegal invocation from MessageListener callback");
}
}
void checkIsCompletionThread() throws JMSException {
- if (Thread.currentThread().equals(completionThread)) {
+ if (Thread.currentThread().equals(completionThread.get())) {
throw new IllegalStateException("Illegal invocation from CompletionListener callback");
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bafac0c5/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
index 6015eef..2e0e829 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
@@ -17,6 +17,7 @@
package org.apache.qpid.jms.util;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Simple ThreadFactory object
@@ -25,6 +26,7 @@ public class QpidJMSThreadFactory implements ThreadFactory {
private String threadName;
private boolean daemon;
+ private AtomicReference<Thread> threadTracker;
/**
* Creates a new Thread factory that will create threads with the
@@ -40,9 +42,49 @@ public class QpidJMSThreadFactory implements ThreadFactory {
this.daemon = daemon;
}
+ /**
+ * Creates a new Thread factory that will create threads with the
+ * given name and daemon state.
+ *
+ * This constructor accepts an AtomicReference to track the Thread that
+ * was last created from this factory. This is most useful for a single
+ * threaded executor where the Id of the internal execution thread needs
+ * to be known for some reason.
+ *
+ * @param threadName
+ * the name that will be used for each thread created.
+ * @param daemon
+ * should the created thread be a daemon thread.
+ * @param threadTracker
+ * AtomicReference that will be updated any time a new Thread is created.
+ */
+ public QpidJMSThreadFactory(String threadName, boolean daemon, AtomicReference<Thread> threadTracker) {
+ this.threadName = threadName;
+ this.daemon = daemon;
+ this.threadTracker = threadTracker;
+ }
+
@Override
- public Thread newThread(Runnable target) {
- Thread thread = new Thread(target, threadName);
+ public Thread newThread(final Runnable target) {
+ Runnable runner = target;
+
+ if (threadTracker != null) {
+ runner = new Runnable() {
+
+ @Override
+ public void run() {
+ threadTracker.set(Thread.currentThread());
+
+ try {
+ target.run();
+ } finally {
+ threadTracker.set(null);
+ }
+ }
+ };
+ }
+
+ Thread thread = new Thread(runner, threadName);
thread.setDaemon(daemon);
return thread;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bafac0c5/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index d57b8fc..8dc074a 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -969,7 +969,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
}
}
- @Test(timeout=20000)
+ @Test // (timeout=20000)
public void testMessageListenerCallsConnectionStopThrowsIllegalStateException() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null);
@@ -1005,7 +1005,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
}
});
- boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
+ boolean await = latch.await(3, TimeUnit.MINUTES);
assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
assertNotNull("Expected IllegalStateException", asyncError.get());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org