You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/09/23 00:05:31 UTC

svn commit: r578509 - /incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Author: rgreig
Date: Sat Sep 22 15:05:30 2007
New Revision: 578509

URL: http://svn.apache.org/viewvc?rev=578509&view=rev
Log:
QPID-609 : dispatcher thread was being restarted by the code that closed the consumer due to the receipt of a basic.cancel frame. Move the dispatcher shutdown to the end of the consumer close process. Also rename the dispatcher _closed field since it clashes with a field in the container class.

Modified:
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=578509&r1=578508&r2=578509&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Sat Sep 22 15:05:30 2007
@@ -673,11 +673,10 @@
                 else
                 {
                     _logger.info("Dispatcher is null so created stopped dispatcher");
-
                     startDistpatcherIfNecessary(true);
                 }
 
-                _dispatcher.rejectPending(consumer);
+                _dispatcher.rejectPending(consumer);                
             }
             else
             {
@@ -1954,11 +1953,6 @@
      */
     private void closeConsumers(Throwable error) throws JMSException
     {
-        if (_dispatcher != null)
-        {
-            _dispatcher.close();
-            _dispatcher = null;
-        }
         // we need to clone the list of consumers since the close() method updates the _consumers collection
         // which would result in a concurrent modification exception
         final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
@@ -1977,6 +1971,11 @@
             }
         }
         // at this point the _consumers map will be empty
+        if (_dispatcher != null)
+        {
+            _dispatcher.close();
+            _dispatcher = null;
+        }
     }
 
     /**
@@ -2567,7 +2566,7 @@
     {
 
         /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
-        private final AtomicBoolean _closed = new AtomicBoolean(false);
+        private final AtomicBoolean _dispatcherClosed = new AtomicBoolean(false);
 
         private final Object _lock = new Object();
         private final AtomicLong _rollbackMark = new AtomicLong(-1);
@@ -2583,7 +2582,7 @@
 
         public void close()
         {
-            _closed.set(true);
+            _dispatcherClosed.set(true);
             interrupt();
 
             // fixme awaitTermination
@@ -2678,7 +2677,7 @@
 
             try
             {
-                while (!_closed.get())
+                while (!_dispatcherClosed.get())
                 {
                     message = (UnprocessedMessage) _queue.poll(1000, TimeUnit.MILLISECONDS);
                     if (message != null)
@@ -2768,7 +2767,7 @@
                         }
                     }
                     // Don't reject if we're already closing
-                    if (!_closed.get())
+                    if (!_dispatcherClosed.get())
                     {
                         rejectMessage(message, true);
                     }