You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/05/21 12:08:41 UTC
svn commit: r658614 -
/incubator/qpid/branches/M2.x/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Author: rgodfrey
Date: Wed May 21 03:08:41 2008
New Revision: 658614
URL: http://svn.apache.org/viewvc?rev=658614&view=rev
Log:
QPID-1084 : Fix AMQSession race condition on no-ack flow control
Modified:
incubator/qpid/branches/M2.x/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified: incubator/qpid/branches/M2.x/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=658614&r1=658613&r2=658614&view=diff
==============================================================================
--- incubator/qpid/branches/M2.x/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2.x/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed May 21 03:08:41 2008
@@ -468,12 +468,15 @@
new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
+ private final AtomicBoolean _suspendState = new AtomicBoolean();
+
public void aboveThreshold(int currentValue)
{
- _logger.debug(
- "Above threshold(" + _defaultPrefetchHighMark
- + ") so suspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(true)).start();
+ _logger.debug(
+ "Above threshold(" + _defaultPrefetchHighMark
+ + ") so suspending channel. Current value is " + currentValue);
+ _suspendState.set(true);
+ new Thread(new SuspenderRunner(_suspendState)).start();
}
@@ -482,7 +485,8 @@
_logger.debug(
"Below threshold(" + _defaultPrefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
- new Thread(new SuspenderRunner(false)).start();
+ _suspendState.set(false);
+ new Thread(new SuspenderRunner(_suspendState)).start();
}
});
@@ -3106,9 +3110,9 @@
private class SuspenderRunner implements Runnable
{
- private boolean _suspend;
+ private AtomicBoolean _suspend;
- public SuspenderRunner(boolean suspend)
+ public SuspenderRunner(AtomicBoolean suspend)
{
_suspend = suspend;
}
@@ -3117,7 +3121,10 @@
{
try
{
- suspendChannel(_suspend);
+ synchronized(_suspensionLock)
+ {
+ suspendChannel(_suspend.get());
+ }
}
catch (AMQException e)
{