You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/08/19 12:45:59 UTC

svn commit: r1696553 - in /qpid/java/trunk/client/src/main/java/org/apache/qpid/client: AMQConnection.java AMQSession.java BasicMessageConsumer.java

Author: orudyy
Date: Wed Aug 19 10:45:59 2015
New Revision: 1696553

URL: http://svn.apache.org/r1696553
Log:
QPID-6672: [Java Client] Message dispatching after dispatcher close can cause thread to terminate with IllegalStateException.

work by Lorenz Quack <qu...@gmail.com>

Modified:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1696553&r1=1696552&r2=1696553&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Aug 19 10:45:59 2015
@@ -1087,10 +1087,15 @@ public class AMQConnection extends Close
         if (!sessions.isEmpty())
         {
             AMQSession session = sessions.remove(0);
-            synchronized (session.getMessageDeliveryLock())
+            session.lockMessageDelivery();
+            try
             {
                 doClose(sessions, timeout);
             }
+            finally
+            {
+                session.unlockMessageDelivery();
+            }
         }
         else
         {
@@ -1829,23 +1834,23 @@ public class AMQConnection extends Close
         {
             AMQSession session = sessions.remove(0);
 
-            final Object dispatcherLock = session.getDispatcherLock();
-            if (dispatcherLock != null)
+
+            Object dispatcherLock = session.getDispatcherLock();
+            if (dispatcherLock == null)
             {
-                synchronized (dispatcherLock)
-                {
-                    synchronized (session.getMessageDeliveryLock())
-                    {
-                        doWithAllLocks(r, sessions);
-                    }
-                }
+                dispatcherLock = new Object(); // use dummy intrinsic lock to make subsequent code nicer
             }
-            else
+            synchronized (dispatcherLock)
             {
-                synchronized (session.getMessageDeliveryLock())
+                session.lockMessageDelivery();
+                try
                 {
                     doWithAllLocks(r, sessions);
                 }
+                finally
+                {
+                    session.unlockMessageDelivery();
+                }
             }
         }
         else

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1696553&r1=1696552&r2=1696553&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Aug 19 10:45:59 2015
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentLi
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -96,6 +97,10 @@ public abstract class AMQSession<C exten
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
 
+    /** System property to configure dispatcher shutdown timeout in milliseconds. */
+    public static final String DISPATCHER_SHUTDOWN_TIMEOUT_MS = "DISPATCHER_SHUTDOWN_TIMEOUT_MS";
+    /** Dispatcher shutdown timeout default setting. */
+    public static final String DISPATCHER_SHUTDOWN_TIMEOUT_MS_DEFAULT = "1000";
 
     /** System property to enable strict AMQP compliance. */
     public static final String STRICT_AMQP = "STRICT_AMQP";
@@ -133,6 +138,8 @@ public abstract class AMQSession<C exten
      */
     protected final boolean DAEMON_DISPATCHER_THREAD = Boolean.getBoolean(ClientProperties.DAEMON_DISPATCHER);
 
+    private final long _dispatcherShutdownTimeoutMs;
+
     /** The connection to which this session belongs. */
     private AMQConnection _connection;
 
@@ -243,7 +250,7 @@ public abstract class AMQSession<C exten
     private final boolean _strictAMQP;
 
     private final boolean _strictAMQPFATAL;
-    private final Object _messageDeliveryLock = new Object();
+    private final Lock _messageDeliveryLock = new ReentrantLock(true);
 
     /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
     private boolean _dirty;
@@ -340,6 +347,7 @@ public abstract class AMQSession<C exten
         _immediatePrefetch =
                 _strictAMQP
                 || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
+        _dispatcherShutdownTimeoutMs = Integer.parseInt(System.getProperty(DISPATCHER_SHUTDOWN_TIMEOUT_MS, DISPATCHER_SHUTDOWN_TIMEOUT_MS_DEFAULT));
 
         _connection = con;
         _transacted = transacted;
@@ -442,6 +450,7 @@ public abstract class AMQSession<C exten
      *
      * @throws JMSException If the JMS provider fails to close the session due to some internal error.
      */
+    @Override
     public void close() throws JMSException
     {
         close(-1);
@@ -718,16 +727,21 @@ public abstract class AMQSession<C exten
      */
     public void close(long timeout) throws JMSException
     {
-        synchronized (_messageDeliveryLock)
+        setClosing(true);
+        lockMessageDelivery();
+        try
         {
             // We must close down all producers and consumers in an orderly fashion. This is the only method
             // that can be called from a different thread of control from the one controlling the session.
             synchronized (getFailoverMutex())
             {
-
                 close(timeout, true);
             }
         }
+        finally
+        {
+            unlockMessageDelivery();
+        }
     }
 
     private void close(long timeout, boolean sendClose) throws JMSException
@@ -3192,9 +3206,28 @@ public abstract class AMQSession<C exten
 
     public abstract void sendSuspendChannel(boolean suspend) throws QpidException, FailoverException;
 
-    Object getMessageDeliveryLock()
+    boolean tryLockMessageDelivery()
     {
-        return _messageDeliveryLock;
+        try
+        {
+            // Use timeout of zero to respect fairness. See ReentrantLock#tryLock JavaDocs for details.
+            return _messageDeliveryLock.tryLock(0, TimeUnit.SECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            Thread.currentThread().interrupt();
+            return false;
+        }
+    }
+
+    void lockMessageDelivery()
+    {
+        _messageDeliveryLock.lock();
+    }
+
+    void unlockMessageDelivery()
+    {
+        _messageDeliveryLock.unlock();
     }
 
     /**
@@ -3281,6 +3314,7 @@ public abstract class AMQSession<C exten
 
         /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */
         private final AtomicBoolean _closed = new AtomicBoolean(false);
+        private final CountDownLatch _closeCompleted = new CountDownLatch(1);
 
         private final Object _lock = new Object();
         private final String dispatcherID = "" + System.identityHashCode(this);
@@ -3295,8 +3329,21 @@ public abstract class AMQSession<C exten
             _queue.close();
             _dispatcherThread.interrupt();
 
-            // fixme awaitTermination
-
+            // If we are not the dispatcherThread we need to await the exiting of the Dispatcher#run(). See QPID-6672.
+            if (Thread.currentThread() != _dispatcherThread)
+            {
+                try
+                {
+                    if(!_closeCompleted.await(_dispatcherShutdownTimeoutMs, TimeUnit.MILLISECONDS))
+                    {
+                        throw new RuntimeException("Dispatcher did not close down within the timeout of " + _dispatcherShutdownTimeoutMs + " ms.");
+                    }
+                }
+                catch (InterruptedException e)
+                {
+                    Thread.currentThread().interrupt();
+                }
+            }
         }
 
         private AtomicBoolean getClosed()
@@ -3375,53 +3422,61 @@ public abstract class AMQSession<C exten
 
         public void run()
         {
-            if (_dispatcherLogger.isDebugEnabled())
+            try
             {
-                _dispatcherLogger.debug(_dispatcherThread.getName() + " started");
-            }
+                if (_dispatcherLogger.isDebugEnabled())
+                {
+                    _dispatcherLogger.debug(_dispatcherThread.getName() + " started");
+                }
 
-            // Allow disptacher to start stopped
-            synchronized (_lock)
-            {
-                while (!_closed.get() && connectionStopped())
+                // Allow dispatcher to start stopped
+                synchronized (_lock)
                 {
-                    try
+                    while (!_closed.get() && connectionStopped())
                     {
-                        _lock.wait();
-                    }
-                    catch (InterruptedException e)
-                    {
-                        Thread.currentThread().interrupt();
+                        try
+                        {
+                            _lock.wait();
+                        }
+                        catch (InterruptedException e)
+                        {
+                            Thread.currentThread().interrupt();
+                        }
                     }
                 }
-            }
 
-            try
-            {
-
-                while (((_queue.blockingPeek()) != null) && !_closed.get())
+                try
                 {
-                    synchronized (_lock)
-                    {
-                        Dispatchable disp = _queue.nonBlockingTake();
 
-                        if(disp != null)
+                    while (((_queue.blockingPeek()) != null) && !_closed.get())
+                    {
+                        synchronized (_lock)
                         {
-                            disp.dispatch(AMQSession.this);
+                            if (!isClosed() && !isClosing() && !_closed.get())
+                            {
+                                Dispatchable disp = _queue.nonBlockingTake();
+
+                                if(disp != null)
+                                {
+                                    disp.dispatch(AMQSession.this);
+                                }
+                            }
                         }
                     }
                 }
+                catch (InterruptedException e)
+                {
+                    // ignored as run will exit immediately
+                }
             }
-            catch (InterruptedException e)
-            {
-                // ignored as run will exit immediately
-            }
-
-            if (_dispatcherLogger.isDebugEnabled())
+            finally
             {
-                _dispatcherLogger.debug(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + AMQSession.this);
+                _closeCompleted.countDown();
+                if (_dispatcherLogger.isDebugEnabled())
+                {
+                    _dispatcherLogger.debug(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + AMQSession.this);
+                }
             }
-
         }
 
         // only call while holding lock
@@ -3487,9 +3542,20 @@ public abstract class AMQSession<C exten
                 }
                 else
                 {
-                    synchronized (_messageDeliveryLock)
+                    while (!isClosed() && !isClosing())
                     {
-                        notifyConsumer(message);
+                        if (tryLockMessageDelivery())
+                        {
+                            try
+                            {
+                                notifyConsumer(message);
+                                break;
+                            }
+                            finally
+                            {
+                                unlockMessageDelivery();
+                            }
+                        }
                     }
                 }
             }

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1696553&r1=1696552&r2=1696553&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Wed Aug 19 10:45:59 2015
@@ -594,13 +594,21 @@ public abstract class BasicMessageConsum
                     // If the session is open or we are in the process
                     // of closing the session then send a cance
                     // no point otherwise as the connection will be gone
-                    if (!_session.isClosed() || _session.isClosing())
+                    while (!_session.isClosed() || _session.isClosing())
                     {
-                        synchronized(_session.getMessageDeliveryLock())
+                        if (_session.tryLockMessageDelivery())
                         {
-                            synchronized (_connection.getFailoverMutex())
+                            try
                             {
-                                sendCancel();
+                                synchronized (_connection.getFailoverMutex())
+                                {
+                                    sendCancel();
+                                }
+                                break;
+                            }
+                            finally
+                            {
+                                _session.unlockMessageDelivery();
                             }
                         }
                     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org