You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/02/02 13:05:36 UTC
svn commit: r905596 -
/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Author: ritchiem
Date: Tue Feb 2 12:05:35 2010
New Revision: 905596
URL: http://svn.apache.org/viewvc?rev=905596&view=rev
Log:
QPID-2370,QPID-1084 : Committed change to prevent flow control threads being created/sent if the channel/session is closed or the state has acutally changed in the mean time.
Modified:
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified: qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=905596&r1=905595&r2=905596&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Feb 2 12:05:35 2010
@@ -190,6 +190,8 @@
}
}
+ final AMQSession _thisSession = this;
+
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -456,22 +458,38 @@
public void aboveThreshold(int currentValue)
{
- _logger.debug(
- "Above threshold(" + _prefetchHighMark
- + ") so suspending channel. Current value is " + currentValue);
- _suspendState.set(true);
- new Thread(new SuspenderRunner(_suspendState)).start();
-
+ // If the session has been closed don't waste time creating a thread to do
+ // flow control
+ if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+ {
+ // Only executute change if previous state
+ // was False
+ if (!_suspendState.getAndSet(true))
+ {
+ _logger.debug(
+ "Above threshold(" + _prefetchHighMark
+ + ") so suspending channel. Current value is " + currentValue);
+ new Thread(new SuspenderRunner(_suspendState)).start();
+ }
+ }
}
public void underThreshold(int currentValue)
{
- _logger.debug(
- "Below threshold(" + _prefetchLowMark
- + ") so unsuspending channel. Current value is " + currentValue);
- _suspendState.set(false);
- new Thread(new SuspenderRunner(_suspendState)).start();
-
+ // If the session has been closed don't waste time creating a thread to do
+ // flow control
+ if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+ {
+ // Only executute change if previous state
+ // was true
+ if (_suspendState.getAndSet(false))
+ {
+ _logger.debug(
+ "Below threshold(" + _prefetchLowMark
+ + ") so unsuspending channel. Current value is " + currentValue);
+ new Thread(new SuspenderRunner(_suspendState)).start();
+ }
+ }
}
});
}
@@ -2946,8 +2964,9 @@
if (_dispatcherLogger.isInfoEnabled())
{
- _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId);
+ _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + _thisSession);
}
+
}
// only call while holding lock
@@ -3109,12 +3128,22 @@
{
synchronized (_suspensionLock)
{
- suspendChannel(_suspend.get());
+ // If the session has closed by the time we get here
+ // then we should not attempt to write to the sesion/channel.
+ if (!(_thisSession.isClosed() || _thisSession.isClosing()))
+ {
+ suspendChannel(_suspend.get());
+ }
}
}
catch (AMQException e)
{
- _logger.warn("Unable to suspend channel");
+ _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + _thisSession + " due to: " + e);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Is the _queue empty?" + _queue.isEmpty());
+ _logger.debug("Is the dispatcher closed?" + (_dispatcher == null ? "it's Null" : _dispatcher._closed));
+ }
}
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org