You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/09/23 00:05:31 UTC
svn commit: r578509 -
/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Author: rgreig
Date: Sat Sep 22 15:05:30 2007
New Revision: 578509
URL: http://svn.apache.org/viewvc?rev=578509&view=rev
Log:
QPID-609 : dispatcher thread was being restarted by the code that closed the consumer due to the receipt of a basic.cancel frame. Move the dispatcher shutdown to the end of the consumer close process. Also rename the dispatcher _closed field since it clashes with a field in the container class.
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=578509&r1=578508&r2=578509&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Sat Sep 22 15:05:30 2007
@@ -673,11 +673,10 @@
else
{
_logger.info("Dispatcher is null so created stopped dispatcher");
-
startDistpatcherIfNecessary(true);
}
- _dispatcher.rejectPending(consumer);
+ _dispatcher.rejectPending(consumer);
}
else
{
@@ -1954,11 +1953,6 @@
*/
private void closeConsumers(Throwable error) throws JMSException
{
- if (_dispatcher != null)
- {
- _dispatcher.close();
- _dispatcher = null;
- }
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
@@ -1977,6 +1971,11 @@
}
}
// at this point the _consumers map will be empty
+ if (_dispatcher != null)
+ {
+ _dispatcher.close();
+ _dispatcher = null;
+ }
}
/**
@@ -2567,7 +2566,7 @@
{
/** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
- private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private final AtomicBoolean _dispatcherClosed = new AtomicBoolean(false);
private final Object _lock = new Object();
private final AtomicLong _rollbackMark = new AtomicLong(-1);
@@ -2583,7 +2582,7 @@
public void close()
{
- _closed.set(true);
+ _dispatcherClosed.set(true);
interrupt();
// fixme awaitTermination
@@ -2678,7 +2677,7 @@
try
{
- while (!_closed.get())
+ while (!_dispatcherClosed.get())
{
message = (UnprocessedMessage) _queue.poll(1000, TimeUnit.MILLISECONDS);
if (message != null)
@@ -2768,7 +2767,7 @@
}
}
// Don't reject if we're already closing
- if (!_closed.get())
+ if (!_dispatcherClosed.get())
{
rejectMessage(message, true);
}