You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/08/04 16:43:50 UTC

qpid-jms git commit: QPIDJMS-300 Fix for race on remote and local close

Repository: qpid-jms
Updated Branches:
  refs/heads/master cc00816c4 -> d9807984a


QPIDJMS-300 Fix for race on remote and local close

Possible race on local session close and remote producer close
can lead to final task to cleanup async completions not being
run and the executor in a failed shutdown state.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d9807984
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d9807984
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d9807984

Branch: refs/heads/master
Commit: d9807984ab90605ab1a4cb774439b4f7d4fece11
Parents: cc00816
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Aug 4 12:38:50 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Aug 4 12:38:50 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsSession.java    | 66 ++++++++++++--------
 .../qpid/jms/util/QpidJMSThreadFactory.java     | 22 ++++++-
 .../integration/ProducerIntegrationTest.java    |  1 +
 3 files changed, 61 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9807984/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index a821c18..bde6bd6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -25,8 +25,10 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -109,8 +111,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     private final AtomicBoolean started = new AtomicBoolean();
     private final JmsSessionInfo sessionInfo;
     private final ReentrantLock sendLock = new ReentrantLock();
-    private volatile ExecutorService deliveryExecutor;
-    private volatile ExecutorService completionExcecutor;
+    private volatile ThreadPoolExecutor deliveryExecutor;
+    private volatile ThreadPoolExecutor completionExcecutor;
     private AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>();
     private AtomicReference<Thread> completionThread = new AtomicReference<Thread>();
 
@@ -155,10 +157,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
             throw e;
         }
-
-        // Start the completion executor now as it's needed throughout the
-        // lifetime of the Session.
-        getCompletionExecutor();
     }
 
     int acknowledgementMode() {
@@ -308,6 +306,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             sessionInfo.setState(ResourceState.CLOSED);
             setFailureCause(cause);
             stop();
+
             for (JmsMessageConsumer consumer : new ArrayList<JmsMessageConsumer>(this.consumers.values())) {
                 consumer.shutdown(cause);
             }
@@ -324,13 +323,14 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
                     cause = new JMSException("Session closed remotely before message transfer result was notified");
                 }
 
-                completionExcecutor.execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
-                completionExcecutor.shutdown();
-                try {
-                    completionExcecutor.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
-                } catch (InterruptedException e) {
-                    LOG.trace("Session close awaiting send completions was interrupted");
-                }
+                getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
+                getCompletionExecutor().shutdown();
+            }
+
+            try {
+                getCompletionExecutor().awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                LOG.trace("Session close awaiting send completions was interrupted");
             }
         }
     }
@@ -1041,7 +1041,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     }
 
     Executor getDispatcherExecutor() {
-        ExecutorService exec = deliveryExecutor;
+        ThreadPoolExecutor exec = deliveryExecutor;
         if (exec == null) {
             synchronized (sessionInfo) {
                 if (deliveryExecutor == null) {
@@ -1059,18 +1059,24 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         return exec;
     }
 
-    Executor getCompletionExecutor() {
-        ExecutorService exec = completionExcecutor;
+    private ExecutorService getCompletionExecutor() {
+        ThreadPoolExecutor exec = completionExcecutor;
         if (exec == null) {
             synchronized (sessionInfo) {
-                if (completionExcecutor == null) {
-                    if (!closed.get()) {
-                        completionExcecutor = exec = createExecutor("completion dispatcher", completionThread);;
-                    } else {
-                        return NoOpExecutor.INSTANCE;
+                exec = completionExcecutor;
+                if (exec == null) {
+                    exec = createExecutor("completion dispatcher", completionThread);
+
+                    // Ensure work thread is fully up before allowing other threads
+                    // to attempt to execute on this instance.
+                    Future<?> starter = exec.submit(() -> {});
+                    try {
+                        starter.get();
+                    } catch (InterruptedException | ExecutionException e) {
+                        LOG.trace("Completion Executor starter task failed: {}", e.getMessage());
                     }
-                } else {
-                    exec = completionExcecutor;
+
+                    completionExcecutor = exec;
                 }
             }
         }
@@ -1078,11 +1084,21 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         return exec;
     }
 
-    private ExecutorService createExecutor(final String threadNameSuffix, AtomicReference<Thread> threadTracker) {
+    private ThreadPoolExecutor createExecutor(final String threadNameSuffix, AtomicReference<Thread> threadTracker) {
         ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
             new QpidJMSThreadFactory("JmsSession ["+ sessionInfo.getId() + "] " + threadNameSuffix, true, threadTracker));
 
-        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy() {
+
+            @Override
+            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
+                // Completely ignore the task if the session has closed.
+                if (!closed.get()) {
+                    LOG.trace("Task {} rejected from executor: {}", r, e);
+                    super.rejectedExecution(r, e);
+                }
+            }
+        });
 
         return executor;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9807984/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
index 2e0e829..b4e9f06 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
@@ -16,17 +16,23 @@
  */
 package org.apache.qpid.jms.util;
 
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Simple ThreadFactory object
  */
 public class QpidJMSThreadFactory implements ThreadFactory {
 
-    private String threadName;
-    private boolean daemon;
-    private AtomicReference<Thread> threadTracker;
+    private static final Logger LOG = LoggerFactory.getLogger(QpidJMSThreadFactory.class);
+
+    private final String threadName;
+    private final boolean daemon;
+    private final AtomicReference<Thread> threadTracker;
 
     /**
      * Creates a new Thread factory that will create threads with the
@@ -40,6 +46,7 @@ public class QpidJMSThreadFactory implements ThreadFactory {
     public QpidJMSThreadFactory(String threadName, boolean daemon) {
         this.threadName = threadName;
         this.daemon = daemon;
+        this.threadTracker = null;
     }
 
     /**
@@ -86,6 +93,15 @@ public class QpidJMSThreadFactory implements ThreadFactory {
 
         Thread thread = new Thread(runner, threadName);
         thread.setDaemon(daemon);
+        thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+
+            @Override
+            public void uncaughtException(Thread target, Throwable error) {
+                LOG.warn("Thread: {} failed due to an uncaught exception: {}", target.getName(), error.getMessage());
+                LOG.trace("Uncaught Stacktrace: ", error);
+            }
+        });
+
         return thread;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9807984/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 24f5b8a..7cda887 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -1019,6 +1019,7 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Repeat(repetitions = 1)
     @Test(timeout = 20000)
     public void testRemotelyCloseProducerWithSendWaitingForCredit() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org