You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/07/01 13:19:20 UTC

svn commit: r673058 - /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Author: rgodfrey
Date: Tue Jul  1 04:19:20 2008
New Revision: 673058

URL: http://svn.apache.org/viewvc?rev=673058&view=rev
Log:
QPID-1084 : Applying patch previously applied to M2.x

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=673058&r1=673057&r2=673058&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Jul  1 04:19:20 2008
@@ -445,21 +445,25 @@
                     new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
                                                      new FlowControllingBlockingQueue.ThresholdListener()
                                                      {
+                                                         private final AtomicBoolean _suspendState = new AtomicBoolean();
+
                                                          public void aboveThreshold(int currentValue)
                                                          {
-                                                                 _logger.debug(
-                                                                         "Above threshold(" + _defaultPrefetchHighMark
-                                                                         + ") so suspending channel. Current value is " + currentValue);
-                                                                 new Thread(new SuspenderRunner(true)).start();
+                                                             _logger.debug(
+                                                                 "Above threshold(" + _defaultPrefetchHighMark
+                                                                 + ") so suspending channel. Current value is " + currentValue);
+                                                             _suspendState.set(true);
+                                                             new Thread(new SuspenderRunner(_suspendState)).start();
 
                                                          }
 
                                                          public void underThreshold(int currentValue)
                                                          {
-                                                                 _logger.debug(
+                                                             _logger.debug(
                                                                          "Below threshold(" + _defaultPrefetchLowMark
                                                                          + ") so unsuspending channel. Current value is " + currentValue);
-                                                                 new Thread(new SuspenderRunner(false)).start();
+                                                             _suspendState.set(false);
+                                                             new Thread(new SuspenderRunner(_suspendState)).start();
 
                                                          }
                                                      });
@@ -2915,9 +2919,9 @@
 
     private class SuspenderRunner implements Runnable
     {
-        private boolean _suspend;
+        private AtomicBoolean _suspend;
 
-        public SuspenderRunner(boolean suspend)
+        public SuspenderRunner(AtomicBoolean suspend)
         {
             _suspend = suspend;
         }
@@ -2926,7 +2930,10 @@
         {
             try
             {
-                suspendChannel(_suspend);
+                synchronized(_suspensionLock)
+                {
+                    suspendChannel(_suspend.get());
+                }
             }
             catch (AMQException e)
             {