You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/07/01 13:19:20 UTC
svn commit: r673058 -
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Author: rgodfrey
Date: Tue Jul 1 04:19:20 2008
New Revision: 673058
URL: http://svn.apache.org/viewvc?rev=673058&view=rev
Log:
QPID-1084 : Applying patch previously applied to M2.x
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=673058&r1=673057&r2=673058&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Jul 1 04:19:20 2008
@@ -445,21 +445,25 @@
new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
+ private final AtomicBoolean _suspendState = new AtomicBoolean();
+
public void aboveThreshold(int currentValue)
{
- _logger.debug(
- "Above threshold(" + _defaultPrefetchHighMark
- + ") so suspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(true)).start();
+ _logger.debug(
+ "Above threshold(" + _defaultPrefetchHighMark
+ + ") so suspending channel. Current value is " + currentValue);
+ _suspendState.set(true);
+ new Thread(new SuspenderRunner(_suspendState)).start();
}
public void underThreshold(int currentValue)
{
- _logger.debug(
+ _logger.debug(
"Below threshold(" + _defaultPrefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(false)).start();
+ _suspendState.set(false);
+ new Thread(new SuspenderRunner(_suspendState)).start();
}
});
@@ -2915,9 +2919,9 @@
private class SuspenderRunner implements Runnable
{
- private boolean _suspend;
+ private AtomicBoolean _suspend;
- public SuspenderRunner(boolean suspend)
+ public SuspenderRunner(AtomicBoolean suspend)
{
_suspend = suspend;
}
@@ -2926,7 +2930,10 @@
{
try
{
- suspendChannel(_suspend);
+ synchronized(_suspensionLock)
+ {
+ suspendChannel(_suspend.get());
+ }
}
catch (AMQException e)
{