You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/08/19 12:45:59 UTC
svn commit: r1696553 - in
/qpid/java/trunk/client/src/main/java/org/apache/qpid/client:
AMQConnection.java AMQSession.java BasicMessageConsumer.java
Author: orudyy
Date: Wed Aug 19 10:45:59 2015
New Revision: 1696553
URL: http://svn.apache.org/r1696553
Log:
QPID-6672: [Java Client] Message dispatching after dispatcher close can cause thread to terminate with IllegalStateException.
work by Lorenz Quack <qu...@gmail.com>
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1696553&r1=1696552&r2=1696553&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Aug 19 10:45:59 2015
@@ -1087,10 +1087,15 @@ public class AMQConnection extends Close
if (!sessions.isEmpty())
{
AMQSession session = sessions.remove(0);
- synchronized (session.getMessageDeliveryLock())
+ session.lockMessageDelivery();
+ try
{
doClose(sessions, timeout);
}
+ finally
+ {
+ session.unlockMessageDelivery();
+ }
}
else
{
@@ -1829,23 +1834,23 @@ public class AMQConnection extends Close
{
AMQSession session = sessions.remove(0);
- final Object dispatcherLock = session.getDispatcherLock();
- if (dispatcherLock != null)
+
+ Object dispatcherLock = session.getDispatcherLock();
+ if (dispatcherLock == null)
{
- synchronized (dispatcherLock)
- {
- synchronized (session.getMessageDeliveryLock())
- {
- doWithAllLocks(r, sessions);
- }
- }
+ dispatcherLock = new Object(); // use dummy intrinsic lock to make subsequent code nicer
}
- else
+ synchronized (dispatcherLock)
{
- synchronized (session.getMessageDeliveryLock())
+ session.lockMessageDelivery();
+ try
{
doWithAllLocks(r, sessions);
}
+ finally
+ {
+ session.unlockMessageDelivery();
+ }
}
}
else
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1696553&r1=1696552&r2=1696553&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Aug 19 10:45:59 2015
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentLi
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -96,6 +97,10 @@ public abstract class AMQSession<C exten
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
+ /** System property to configure dispatcher shutdown timeout in milliseconds. */
+ public static final String DISPATCHER_SHUTDOWN_TIMEOUT_MS = "DISPATCHER_SHUTDOWN_TIMEOUT_MS";
+ /** Dispatcher shutdown timeout default setting. */
+ public static final String DISPATCHER_SHUTDOWN_TIMEOUT_MS_DEFAULT = "1000";
/** System property to enable strict AMQP compliance. */
public static final String STRICT_AMQP = "STRICT_AMQP";
@@ -133,6 +138,8 @@ public abstract class AMQSession<C exten
*/
protected final boolean DAEMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
+ private final long _dispatcherShutdownTimeoutMs;
+
/** The connection to which this session belongs. */
private AMQConnection _connection;
@@ -243,7 +250,7 @@ public abstract class AMQSession<C exten
private final boolean _strictAMQP;
private final boolean _strictAMQPFATAL;
- private final Object _messageDeliveryLock = new Object();
+ private final Lock _messageDeliveryLock = new ReentrantLock(true);
/** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
private boolean _dirty;
@@ -340,6 +347,7 @@ public abstract class AMQSession<C exten
_immediatePrefetch =
_strictAMQP
|| Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
+ _dispatcherShutdownTimeoutMs = Integer.parseInt(System.getProperty(DISPATCHER_SHUTDOWN_TIMEOUT_MS, DISPATCHER_SHUTDOWN_TIMEOUT_MS_DEFAULT));
_connection = con;
_transacted = transacted;
@@ -442,6 +450,7 @@ public abstract class AMQSession<C exten
*
* @throws JMSException If the JMS provider fails to close the session due to some internal error.
*/
+ @Override
public void close() throws JMSException
{
close(-1);
@@ -718,16 +727,21 @@ public abstract class AMQSession<C exten
*/
public void close(long timeout) throws JMSException
{
- synchronized (_messageDeliveryLock)
+ setClosing(true);
+ lockMessageDelivery();
+ try
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session.
synchronized (getFailoverMutex())
{
-
close(timeout, true);
}
}
+ finally
+ {
+ unlockMessageDelivery();
+ }
}
private void close(long timeout, boolean sendClose) throws JMSException
@@ -3192,9 +3206,28 @@ public abstract class AMQSession<C exten
public abstract void sendSuspendChannel(boolean suspend) throws QpidException, FailoverException;
- Object getMessageDeliveryLock()
+ boolean tryLockMessageDelivery()
{
- return _messageDeliveryLock;
+ try
+ {
+ // Use timeout of zero to respect fairness. See ReentrantLock#tryLock JavaDocs for details.
+ return _messageDeliveryLock.tryLock(0, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+
+ void lockMessageDelivery()
+ {
+ _messageDeliveryLock.lock();
+ }
+
+ void unlockMessageDelivery()
+ {
+ _messageDeliveryLock.unlock();
}
/**
@@ -3281,6 +3314,7 @@ public abstract class AMQSession<C exten
/** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private final CountDownLatch _closeCompleted = new CountDownLatch(1);
private final Object _lock = new Object();
private final String dispatcherID = "" + System.identityHashCode(this);
@@ -3295,8 +3329,21 @@ public abstract class AMQSession<C exten
_queue.close();
_dispatcherThread.interrupt();
- // fixme awaitTermination
-
+ // If we are not the dispatcherThread we need to await the exiting of the Dispatcher#run(). See QPID-6672.
+ if (Thread.currentThread() != _dispatcherThread)
+ {
+ try
+ {
+ if(!_closeCompleted.await(_dispatcherShutdownTimeoutMs, TimeUnit.MILLISECONDS))
+ {
+ throw new RuntimeException("Dispatcher did not close down within the timeout of " + _dispatcherShutdownTimeoutMs + " ms.");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
}
private AtomicBoolean getClosed()
@@ -3375,53 +3422,61 @@ public abstract class AMQSession<C exten
public void run()
{
- if (_dispatcherLogger.isDebugEnabled())
+ try
{
- _dispatcherLogger.debug(_dispatcherThread.getName() + " started");
- }
+ if (_dispatcherLogger.isDebugEnabled())
+ {
+ _dispatcherLogger.debug(_dispatcherThread.getName() + " started");
+ }
- // Allow disptacher to start stopped
- synchronized (_lock)
- {
- while (!_closed.get() && connectionStopped())
+ // Allow dispatcher to start stopped
+ synchronized (_lock)
{
- try
+ while (!_closed.get() && connectionStopped())
{
- _lock.wait();
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
+ try
+ {
+ _lock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
}
}
- }
- try
- {
-
- while (((_queue.blockingPeek()) != null) && !_closed.get())
+ try
{
- synchronized (_lock)
- {
- Dispatchable disp = _queue.nonBlockingTake();
- if(disp != null)
+ while (((_queue.blockingPeek()) != null) && !_closed.get())
+ {
+ synchronized (_lock)
{
- disp.dispatch(AMQSession.this);
+ if (!isClosed() && !isClosing() && !_closed.get())
+ {
+ Dispatchable disp = _queue.nonBlockingTake();
+
+ if(disp != null)
+ {
+ disp.dispatch(AMQSession.this);
+ }
+ }
}
}
}
+ catch (InterruptedException e)
+ {
+ // ignored as run will exit immediately
+ }
}
- catch (InterruptedException e)
- {
- // ignored as run will exit immediately
- }
-
- if (_dispatcherLogger.isDebugEnabled())
+ finally
{
- _dispatcherLogger.debug(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + AMQSession.this);
+ _closeCompleted.countDown();
+ if (_dispatcherLogger.isDebugEnabled())
+ {
+ _dispatcherLogger.debug(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + AMQSession.this);
+ }
}
-
}
// only call while holding lock
@@ -3487,9 +3542,20 @@ public abstract class AMQSession<C exten
}
else
{
- synchronized (_messageDeliveryLock)
+ while (!isClosed() && !isClosing())
{
- notifyConsumer(message);
+ if (tryLockMessageDelivery())
+ {
+ try
+ {
+ notifyConsumer(message);
+ break;
+ }
+ finally
+ {
+ unlockMessageDelivery();
+ }
+ }
}
}
}
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1696553&r1=1696552&r2=1696553&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Wed Aug 19 10:45:59 2015
@@ -594,13 +594,21 @@ public abstract class BasicMessageConsum
// If the session is open or we are in the process
// of closing the session then send a cance
// no point otherwise as the connection will be gone
- if (!_session.isClosed() || _session.isClosing())
+ while (!_session.isClosed() || _session.isClosing())
{
- synchronized(_session.getMessageDeliveryLock())
+ if (_session.tryLockMessageDelivery())
{
- synchronized (_connection.getFailoverMutex())
+ try
{
- sendCancel();
+ synchronized (_connection.getFailoverMutex())
+ {
+ sendCancel();
+ }
+ break;
+ }
+ finally
+ {
+ _session.unlockMessageDelivery();
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org