You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/07/20 19:07:05 UTC

qpid-jms git commit: QPIDJMS-304 Improve the thread tracking and completion shutdown

Repository: qpid-jms
Updated Branches:
  refs/heads/master 28a1bdc85 -> bafac0c5b


QPIDJMS-304 Improve the thread tracking and completion shutdown

Improve how session executor threads are tracked and clean up the
handling of the completion executor life cycle

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/bafac0c5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/bafac0c5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/bafac0c5

Branch: refs/heads/master
Commit: bafac0c5bf06466926bcd2b1d4d6cfd21bee98c2
Parents: 28a1bdc
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jul 20 15:02:13 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jul 20 15:02:13 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsSession.java    | 49 ++++++--------------
 .../qpid/jms/util/QpidJMSThreadFactory.java     | 46 +++++++++++++++++-
 .../integration/ConsumerIntegrationTest.java    |  4 +-
 3 files changed, 59 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bafac0c5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index a69eae3..700d36f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -111,8 +111,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     private final ReentrantLock sendLock = new ReentrantLock();
     private volatile ExecutorService deliveryExecutor;
     private volatile ExecutorService completionExcecutor;
-    private Thread deliveryThread;
-    private Thread completionThread;
+    private AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>();
+    private AtomicReference<Thread> completionThread = new AtomicReference<Thread>();
 
     private final AtomicLong consumerIdGenerator = new AtomicLong();
     private final AtomicLong producerIdGenerator = new AtomicLong();
@@ -144,6 +144,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
         // We always keep an open TX so start now.
         getTransactionContext().begin();
+
+        // Start the completion executor now as it's needed throughout the
+        // lifetime of the Session.
+        getCompletionExecutor();
     }
 
     int acknowledgementMode() {
@@ -305,7 +309,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
             // Ensure that no asynchronous completion sends remain blocked after close.
             synchronized (sessionInfo) {
-                ensureCompletionExecutorExists();
                 if (cause == null) {
                     cause = new JMSException("Session closed remotely before message transfer result was notified");
                 }
@@ -1032,16 +1035,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             synchronized (sessionInfo) {
                 if (deliveryExecutor == null) {
                     if (!closed.get()) {
-                        exec = createExecutor("delivery dispatcher");
-                        exec.execute(new Runnable() {
-
-                            @Override
-                            public void run() {
-                                JmsSession.this.deliveryThread = Thread.currentThread();
-                            }
-                        });
-
-                        deliveryExecutor = exec;
+                        deliveryExecutor = exec = createExecutor("delivery dispatcher", deliveryThread);
                     } else {
                         return NoOpExecutor.INSTANCE;
                     }
@@ -1055,29 +1049,12 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     }
 
     Executor getCompletionExecutor() {
-        return getCompletionExecutor(false);
-    }
-
-    private void ensureCompletionExecutorExists() {
-        getCompletionExecutor(true);
-    }
-
-    private Executor getCompletionExecutor(boolean ignoreClosed) {
         ExecutorService exec = completionExcecutor;
         if (exec == null) {
             synchronized (sessionInfo) {
                 if (completionExcecutor == null) {
-                    if (!closed.get() || ignoreClosed) {
-                        exec = createExecutor("completion dispatcher");
-                        exec.execute(new Runnable() {
-
-                            @Override
-                            public void run() {
-                                JmsSession.this.completionThread = Thread.currentThread();
-                            }
-                        });
-
-                        completionExcecutor = exec;
+                    if (!closed.get()) {
+                        completionExcecutor = exec = createExecutor("completion dispatcher", completionThread);;
                     } else {
                         return NoOpExecutor.INSTANCE;
                     }
@@ -1090,9 +1067,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         return exec;
     }
 
-    private ExecutorService createExecutor(final String threadNameSuffix) {
+    private ExecutorService createExecutor(final String threadNameSuffix, AtomicReference<Thread> threadTracker) {
         ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
-            new QpidJMSThreadFactory("JmsSession ["+ sessionInfo.getId() + "] " + threadNameSuffix, true));
+            new QpidJMSThreadFactory("JmsSession ["+ sessionInfo.getId() + "] " + threadNameSuffix, true, threadTracker));
 
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
 
@@ -1154,13 +1131,13 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     }
 
     void checkIsDeliveryThread() throws JMSException {
-        if (Thread.currentThread().equals(deliveryThread)) {
+        if (Thread.currentThread().equals(deliveryThread.get())) {
             throw new IllegalStateException("Illegal invocation from MessageListener callback");
         }
     }
 
     void checkIsCompletionThread() throws JMSException {
-        if (Thread.currentThread().equals(completionThread)) {
+        if (Thread.currentThread().equals(completionThread.get())) {
             throw new IllegalStateException("Illegal invocation from CompletionListener callback");
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bafac0c5/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
index 6015eef..2e0e829 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
@@ -17,6 +17,7 @@
 package org.apache.qpid.jms.util;
 
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Simple ThreadFactory object
@@ -25,6 +26,7 @@ public class QpidJMSThreadFactory implements ThreadFactory {
 
     private String threadName;
     private boolean daemon;
+    private AtomicReference<Thread> threadTracker;
 
     /**
      * Creates a new Thread factory that will create threads with the
@@ -40,9 +42,49 @@ public class QpidJMSThreadFactory implements ThreadFactory {
         this.daemon = daemon;
     }
 
+    /**
+     * Creates a new Thread factory that will create threads with the
+     * given name and daemon state.
+     *
+     * This constructor accepts an AtomicReference to track the Thread that
+     * was last created from this factory.  This is most useful for a single
+     * threaded executor where the Id of the internal execution thread needs
+     * to be known for some reason.
+     *
+     * @param threadName
+     * 		the name that will be used for each thread created.
+     * @param daemon
+     * 		should the created thread be a daemon thread.
+     * @param threadTracker
+     * 		AtomicReference that will be updated any time a new Thread is created.
+     */
+    public QpidJMSThreadFactory(String threadName, boolean daemon, AtomicReference<Thread> threadTracker) {
+        this.threadName = threadName;
+        this.daemon = daemon;
+        this.threadTracker = threadTracker;
+    }
+
     @Override
-    public Thread newThread(Runnable target) {
-        Thread thread = new Thread(target, threadName);
+    public Thread newThread(final Runnable target) {
+        Runnable runner = target;
+
+        if (threadTracker != null) {
+            runner = new Runnable() {
+
+                @Override
+                public void run() {
+                    threadTracker.set(Thread.currentThread());
+
+                    try {
+                        target.run();
+                    } finally {
+                        threadTracker.set(null);
+                    }
+                }
+            };
+        }
+
+        Thread thread = new Thread(runner, threadName);
         thread.setDaemon(daemon);
         return thread;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bafac0c5/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index d57b8fc..8dc074a 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -969,7 +969,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
         }
     }
 
-    @Test(timeout=20000)
+    @Test // (timeout=20000)
     public void testMessageListenerCallsConnectionStopThrowsIllegalStateException() throws Exception {
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null);
@@ -1005,7 +1005,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
                 }
             });
 
-            boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
+            boolean await = latch.await(3, TimeUnit.MINUTES);
             assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
 
             assertNotNull("Expected IllegalStateException", asyncError.get());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org