You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/08/05 17:58:28 UTC

svn commit: r1694254 - in /qpid/java/trunk: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/failover/ client/src/main/java/org/apache/qpid/client/util/ systests/src/test/java/org/apache/qpid/client/failover/

Author: orudyy
Date: Wed Aug  5 15:58:28 2015
New Revision: 1694254

URL: http://svn.apache.org/r1694254
Log:
QPID-3521: [Java Client] Stop using dispatcher to clear pre-dispatch queue on 0.8/0.9.x client in order to avoid dead locks.
           Address review comments.

Added:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/ConnectionHelper.java
Modified:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1694254&r1=1694253&r2=1694254&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Aug  5 15:58:28 2015
@@ -1818,47 +1818,6 @@ public class AMQConnection extends Close
         return _messageCompressionThresholdSize;
     }
 
-    void doWithAllLocks(Runnable r)
-    {
-        doWithAllLocks(r, _sessions.values());
-
-    }
-
-    private void doWithAllLocks(final Runnable r, final List<AMQSession> sessions)
-    {
-        if (!sessions.isEmpty())
-        {
-            AMQSession session = sessions.remove(0);
-
-            final Object dispatcherLock = session.getDispatcherLock();
-            if (dispatcherLock != null)
-            {
-                synchronized (dispatcherLock)
-                {
-                    synchronized (session.getMessageDeliveryLock())
-                    {
-                        doWithAllLocks(r, sessions);
-                    }
-                }
-            }
-            else
-            {
-                synchronized (session.getMessageDeliveryLock())
-                {
-                    doWithAllLocks(r, sessions);
-                }
-            }
-        }
-        else
-        {
-            synchronized (getFailoverMutex())
-            {
-                r.run();
-            }
-        }
-    }
-
-
     public String getTemporaryQueuePrefix()
     {
         if(_delegate.isVirtualHostPropertiesSupported())

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1694254&r1=1694253&r2=1694254&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Wed Aug  5 15:58:28 2015
@@ -313,7 +313,7 @@ public class AMQConnectionDelegate_0_10
 
             final AtomicBoolean failoverDone = new AtomicBoolean();
 
-            _conn.doWithAllLocks(new Runnable()
+            ConnectionHelper.doWithAllConnectionAndSessionLocks(_conn, new Runnable()
             {
                 @Override
                 public void run()

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1694254&r1=1694253&r2=1694254&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Aug  5 15:58:28 2015
@@ -3693,17 +3693,24 @@ public abstract class AMQSession<C exten
     {
         if (!_queue.isEmpty())
         {
-            setUsingDispatcherForCleanup(true);
-            drainDispatchQueue();
-            setUsingDispatcherForCleanup(false);
+            try
+            {
+                setUsingDispatcherForCleanup(true);
+                drainDispatchQueue();
+            }
+            finally
+            {
+                setUsingDispatcherForCleanup(false);
+            }
         }
     }
 
     protected void stopExistingDispatcher()
     {
-        if (_dispatcher != null)
+        Dispatcher dispatcher = _dispatcher;
+        if (dispatcher != null)
         {
-            _dispatcher.setConnectionStopped(true);
+            dispatcher.setConnectionStopped(true);
         }
     }
 
@@ -3714,5 +3721,10 @@ public abstract class AMQSession<C exten
             suspendChannel(true);
         }
     }
+
+    protected void clearDispatchQueue()
+    {
+        _queue.clear();
+    }
 }
 

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1694254&r1=1694253&r2=1694254&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Wed Aug  5 15:58:28 2015
@@ -182,8 +182,7 @@ public class AMQSession_0_8 extends AMQS
     @Override
     void resubscribe() throws QpidException
     {
-        // drain dispatch queue
-        drainDispatchQueueWithDispatcher();
+        clearDispatchQueue();
 
         getDeliveredMessageTags().clear();
         super.resubscribe();

Added: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/ConnectionHelper.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/ConnectionHelper.java?rev=1694254&view=auto
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/ConnectionHelper.java (added)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/ConnectionHelper.java Wed Aug  5 15:58:28 2015
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.List;
+
+public class ConnectionHelper
+{
+
+    public static void doWithAllConnectionAndSessionLocks(AMQConnection connection, Runnable runnable)
+    {
+        doWithAllLocks(runnable, connection.getSessions().values(), connection.getFailoverMutex());
+    }
+
+    private static void doWithAllLocks(final Runnable r, final List<AMQSession> sessions, Object failoverMutex)
+    {
+        if (!sessions.isEmpty())
+        {
+            AMQSession session = sessions.remove(0);
+            final Object dispatcherLock = session.getDispatcherLock();
+            if (dispatcherLock != null)
+            {
+                synchronized (dispatcherLock)
+                {
+                    synchronized (session.getMessageDeliveryLock())
+                    {
+                        doWithAllLocks(r, sessions, failoverMutex);
+                    }
+                }
+            }
+            else
+            {
+                synchronized (session.getMessageDeliveryLock())
+                {
+                    doWithAllLocks(r, sessions, failoverMutex);
+                }
+            }
+        }
+        else
+        {
+            synchronized (failoverMutex)
+            {
+                r.run();
+            }
+        }
+    }
+}

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=1694254&r1=1694253&r2=1694254&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Wed Aug  5 15:58:28 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.client.failover;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.ConnectionHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -118,10 +119,23 @@ public class FailoverHandler implements
 
         final AMQConnection connection = _amqProtocolHandler.getConnection();
 
-        // Since failover impacts several structures we protect them all with a single mutex. These structures
-        // are also in child objects of the connection. This allows us to manipulate them without affecting
-        // client code which runs in a separate thread.
-        synchronized (connection.getFailoverMutex())
+        ConnectionHelper.doWithAllConnectionAndSessionLocks(connection, new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                performFailover();
+            }
+        });
+
+        _amqProtocolHandler.getFailoverLatch().countDown();
+    }
+
+    private void performFailover()
+    {
+        AMQConnection connection = _amqProtocolHandler.getConnection();
+
+        // brace to keep indentation
         {
             //Clear the exception now that we have the failover mutex there can be no one else waiting for a frame so
             // we can clear the exception.
@@ -236,8 +250,6 @@ public class FailoverHandler implements
                 }
             }
         }
-
-        _amqProtocolHandler.getFailoverLatch().countDown();
     }
 
     /**

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=1694254&r1=1694253&r2=1694254&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Wed Aug  5 15:58:28 2015
@@ -172,6 +172,26 @@ public class FlowControllingBlockingQueu
         return _queue.iterator();
     }
 
+    public void clear()
+    {
+        _queue.clear();
+
+        if (!disableFlowControl && _listener != null)
+        {
+            synchronized (_listener)
+            {
+                int count = _count;
+                _count = 0;
+
+                if (count >= _flowControlLowThreshold)
+                {
+                    _listener.underThreshold(0);
+                }
+            }
+
+        }
+    }
+
     private void reportAboveIfNecessary()
     {
         synchronized (_listener)

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1694254&r1=1694253&r2=1694254&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java Wed Aug  5 15:58:28 2015
@@ -1036,92 +1036,78 @@ public class FailoverBehaviourTest exten
 
     public void testFailoverWhenConnectionStopped() throws Exception
     {
-        // not needed
-        _connection.close();
-
-        // not needed
-        stopBroker(getFailingPort());
-
-        // stop broker and add http management
-        stopBroker();
-        configureHttpManagement();
-        startBroker();
-
-        _connection  = createConnectionWithFailover();
         init(Session.SESSION_TRANSACTED, true);
 
-        // populate broker with initial messages
-        final int testMessageNumber = 10;
-        produceMessages(TEST_MESSAGE_FORMAT, testMessageNumber, false);
+        produceMessages();
         _producerSession.commit();
 
         final CountDownLatch stopFlag = new CountDownLatch(1);
-        final CountDownLatch consumerBlocker = new CountDownLatch(1);
         final AtomicReference<Exception> exception = new AtomicReference<>();
-        final CountDownLatch messageCounter = new CountDownLatch(testMessageNumber);
-        try
+        final CountDownLatch expectedMessageLatch = new CountDownLatch(_messageNumber);
+        final AtomicInteger counter = new AtomicInteger();
+
+        _consumer.setMessageListener(new MessageListener()
         {
-            _consumer.setMessageListener(new MessageListener()
+            @Override
+            public void onMessage(Message message)
             {
-                @Override
-                public void onMessage(Message message)
+                if (stopFlag.getCount() == 1)
                 {
-                    if (consumerBlocker.getCount() == 1)
+                    try
+                    {
+                        _LOGGER.debug("Stopping connection from dispatcher thread");
+                        _connection.stop();
+                        _LOGGER.debug("Connection stopped from dispatcher thread");
+
+                    }
+                    catch (Exception e)
                     {
-                        try
-                        {
-                            consumerBlocker.await();
-
-                            _LOGGER.debug("Stopping connection from dispatcher thread");
-                            _connection.stop();
-                            _LOGGER.debug("Connection stopped from dispatcher thread");
-                            stopFlag.countDown();
-                        }
-                        catch (Exception e)
-                        {
-                            exception.set(e);
-                        }
+                        exception.set(e);
                     }
+                    finally
+                    {
+                        stopFlag.countDown();
 
+                        failBroker(getFailingPort());
+                    }
+
+                }
+                else
+                {
                     try
                     {
                         _consumerSession.commit();
-                        messageCounter.countDown();
+                        counter.incrementAndGet();
+                        expectedMessageLatch.countDown();
                     }
                     catch (Exception e)
                     {
                         exception.set(e);
                     }
                 }
-            });
-
-            int unacknowledgedMessageNumber = getUnacknowledgedMessageNumber(testMessageNumber);
+            }
+        });
 
-            assertEquals("Unexpected number of unacknowledged messages", testMessageNumber, unacknowledgedMessageNumber);
-        }
-        finally
-        {
-            // stop blocking dispatcher thread
-            consumerBlocker.countDown();
-        }
 
         boolean stopResult = stopFlag.await(2000, TimeUnit.MILLISECONDS);
         assertTrue("Connection was not stopped" + (exception.get() == null ? "." : ":" + exception.get().getMessage()),
                 stopResult);
         assertNull("Unexpected exception on stop :" + exception.get(), exception.get());
-        closeConnectionViaManagement();
 
         // wait for failover to complete
         awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
         assertFailoverException();
 
-        // publish more messages when connection stopped
-        produceMessages(TEST_MESSAGE_FORMAT, 2, false);
+        resendMessagesIfNecessary();
         _producerSession.commit();
 
         _connection.start();
 
-        assertTrue("Not all messages were delivered. Remaining message number " + messageCounter.getCount(), messageCounter.await(11000, TimeUnit.MILLISECONDS));
+        assertTrue("Not all messages were delivered. Remaining message number " + expectedMessageLatch.getCount(), expectedMessageLatch.await(11000, TimeUnit.MILLISECONDS));
+
+        Thread.sleep(500l);
+        assertEquals("Unexpected messages recieved ", _messageNumber, counter.get());
+
         _connection.close();
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org