You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/09/21 22:31:18 UTC

svn commit: r578258 - in /incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client: AMQSession.java util/FlowControllingBlockingQueue.java

Author: rgreig
Date: Fri Sep 21 13:31:18 2007
New Revision: 578258

URL: http://svn.apache.org/viewvc?rev=578258&view=rev
Log:
QPID-607: dispatcher threads now poll so that the can die when the connection is closed.

Modified:
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=578258&r1=578257&r2=578258&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Sep 21 13:31:18 2007
@@ -108,6 +108,7 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -2677,30 +2678,33 @@
 
             try
             {
-                while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null))
+                while (!_closed.get())
                 {
-                    synchronized (_lock)
+                    message = (UnprocessedMessage) _queue.poll(1000, TimeUnit.MILLISECONDS);
+                    if (message != null)
                     {
-
-                        while (connectionStopped())
+                        synchronized (_lock)
                         {
-                            _lock.wait(2000);
-                        }
 
-                        if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
-                        {
-                            rejectMessage(message, true);
-                        }
-                        else
-                        {
-                            synchronized (_messageDeliveryLock)
+                            while (connectionStopped())
                             {
-                                dispatchMessage(message);
+                                _lock.wait(2000);
                             }
-                        }
 
-                    }
+                            if (message.getDeliverBody().deliveryTag <= _rollbackMark.get())
+                            {
+                                rejectMessage(message, true);
+                            }
+                            else
+                            {
+                                synchronized (_messageDeliveryLock)
+                                {
+                                    dispatchMessage(message);
+                                }
+                            }
 
+                        }
+                    }
                 }
             }
             catch (InterruptedException e)

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=578258&r1=578257&r2=578258&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Fri Sep 21 13:31:18 2007
@@ -23,6 +23,7 @@
 import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
@@ -69,10 +70,10 @@
         _listener = listener;
     }
 
-    public Object take() throws InterruptedException
+    public Object poll(long time, TimeUnit unit) throws InterruptedException
     {
-        Object o = _queue.take();
-        if (_listener != null)
+        Object o = _queue.poll(time, unit);
+        if (o != null && _listener != null)
         {
             synchronized (_listener)
             {