You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/01/13 19:03:26 UTC

qpid-jms git commit: QPIDJMS-251 Thread safety updates for state values in MessageConsumer

Repository: qpid-jms
Updated Branches:
  refs/heads/master 53916c506 -> 0aaf6d5e5


QPIDJMS-251 Thread safety updates for state values in MessageConsumer

Make the MessageListener volatile to ensure it is visible to the
dispatcher task that is fired after it is set, and remove the unsafe
usage of the started flag and use the state of the messageQueue object
instead. 

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0aaf6d5e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0aaf6d5e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0aaf6d5e

Branch: refs/heads/master
Commit: 0aaf6d5e51beac49375de86b2dbfd7b98130d882
Parents: 53916c5
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jan 13 13:14:48 2017 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Jan 13 13:14:48 2017 -0500

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsMessageConsumer.java | 21 +++++++++-----------
 1 file changed, 9 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0aaf6d5e/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 6c4c06b..d782fab 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -60,9 +60,8 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     protected JmsConsumerInfo consumerInfo;
     protected final int acknowledgementMode;
     protected final AtomicBoolean closed = new AtomicBoolean();
-    protected boolean started;
-    protected MessageListener messageListener;
-    protected JmsMessageAvailableListener availableListener;
+    protected volatile MessageListener messageListener;
+    protected volatile JmsMessageAvailableListener availableListener;
     protected final MessageQueue messageQueue;
     protected final Lock lock = new ReentrantLock();
     protected final AtomicBoolean suspendedConnection = new AtomicBoolean();
@@ -79,7 +78,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         this.connection = session.getConnection();
         this.acknowledgementMode = session.acknowledgementMode();
 
-        if(destination.isTemporary()) {
+        if (destination.isTemporary()) {
             connection.checkConsumeFromTemporaryDestination((JmsTemporaryDestination) destination);
         }
 
@@ -496,14 +495,14 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
                 this.messageQueue.enqueue(envelope);
             }
 
-            if (this.messageListener != null && this.started) {
-                session.getDispatcherExecutor().execute(new MessageDeliverTask());
-            } else {
-                if (availableListener != null) {
+            if (session.isStarted() && messageQueue.isRunning()) {
+                if (messageListener != null) {
+                    session.getDispatcherExecutor().execute(new MessageDeliverTask());
+                } else if (availableListener != null) {
                     session.getDispatcherExecutor().execute(new Runnable() {
                         @Override
                         public void run() {
-                            if (session.isStarted()) {
+                            if (messageQueue.isRunning()) {
                                 availableListener.onMessageAvailable(JmsMessageConsumer.this);
                             }
                         }
@@ -518,7 +517,6 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     public void start() {
         lock.lock();
         try {
-            this.started = true;
             this.messageQueue.start();
             drainMessageQueueToListener();
         } finally {
@@ -533,7 +531,6 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     private void stop(boolean closeMessageQueue) {
         lock.lock();
         try {
-            this.started = false;
             if (closeMessageQueue) {
                 this.messageQueue.close();
             } else {
@@ -565,7 +562,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     }
 
     void drainMessageQueueToListener() {
-        if (this.messageListener != null && this.started) {
+        if (messageListener != null && messageQueue.isRunning()) {
             session.getDispatcherExecutor().execute(new MessageDeliverTask());
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org