You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/05/13 17:04:28 UTC

[qpid-jms-amqp-0-x] branch master updated: QPID-8285: [JMS AMQP 0-x] Fix syncing of dispatch queue

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms-amqp-0-x.git


The following commit(s) were added to refs/heads/master by this push:
     new d6335fd  QPID-8285: [JMS AMQP 0-x] Fix syncing of dispatch queue
d6335fd is described below

commit d6335fd04ce38f5cfe796f15c177ece988ee2815
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Mon May 13 18:03:39 2019 +0100

    QPID-8285: [JMS AMQP 0-x] Fix syncing of dispatch queue
---
 client/src/main/java/org/apache/qpid/client/AMQSession.java | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/client/src/main/java/org/apache/qpid/client/AMQSession.java b/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ffcbcac..125cba1 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -2330,7 +2330,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
 
             final CountDownLatch signal = new CountDownLatch(1);
 
-            _queue.add(new Dispatchable()
+            _queue.add(new DispatchableControl()
             {
                 public void dispatch(AMQSession ssn)
                 {
@@ -2382,7 +2382,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
 
             final CountDownLatch signal = new CountDownLatch(1);
 
-            _queue.add(new Dispatchable()
+            _queue.add(new DispatchableControl()
             {
                 public void dispatch(AMQSession ssn)
                 {
@@ -3385,6 +3385,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
         void dispatch(AMQSession ssn);
     }
 
+    public interface DispatchableControl extends Dispatchable
+    {
+    }
+
     public void dispatch(UnprocessedMessage message)
     {
         if (_dispatcher == null)
@@ -3535,10 +3539,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
                     {
                         synchronized (_lock)
                         {
-                            if (!isClosed() && !isClosing() && !_closed.get())
+                            final Dispatchable disp = _queue.nonBlockingTake();
+                            if (disp instanceof DispatchableControl || (!isClosed() && !isClosing() && !_closed.get()))
                             {
-                                Dispatchable disp = _queue.nonBlockingTake();
-
                                 if(disp != null)
                                 {
                                     disp.dispatch(AMQSession.this);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org