You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/08/05 17:58:28 UTC
svn commit: r1694254 - in /qpid/java/trunk:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/failover/
client/src/main/java/org/apache/qpid/client/util/
systests/src/test/java/org/apache/qpid/client/failover/
Author: orudyy
Date: Wed Aug 5 15:58:28 2015
New Revision: 1694254
URL: http://svn.apache.org/r1694254
Log:
QPID-3521: [Java Client] Stop using dispatcher to clear pre-dispatch queue on 0.8/0.9.x client in order to avoid dead locks.
Address review comments.
Added:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/ConnectionHelper.java
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1694254&r1=1694253&r2=1694254&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Aug 5 15:58:28 2015
@@ -1818,47 +1818,6 @@ public class AMQConnection extends Close
return _messageCompressionThresholdSize;
}
- void doWithAllLocks(Runnable r)
- {
- doWithAllLocks(r, _sessions.values());
-
- }
-
- private void doWithAllLocks(final Runnable r, final List<AMQSession> sessions)
- {
- if (!sessions.isEmpty())
- {
- AMQSession session = sessions.remove(0);
-
- final Object dispatcherLock = session.getDispatcherLock();
- if (dispatcherLock != null)
- {
- synchronized (dispatcherLock)
- {
- synchronized (session.getMessageDeliveryLock())
- {
- doWithAllLocks(r, sessions);
- }
- }
- }
- else
- {
- synchronized (session.getMessageDeliveryLock())
- {
- doWithAllLocks(r, sessions);
- }
- }
- }
- else
- {
- synchronized (getFailoverMutex())
- {
- r.run();
- }
- }
- }
-
-
public String getTemporaryQueuePrefix()
{
if(_delegate.isVirtualHostPropertiesSupported())
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1694254&r1=1694253&r2=1694254&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Wed Aug 5 15:58:28 2015
@@ -313,7 +313,7 @@ public class AMQConnectionDelegate_0_10
final AtomicBoolean failoverDone = new AtomicBoolean();
- _conn.doWithAllLocks(new Runnable()
+ ConnectionHelper.doWithAllConnectionAndSessionLocks(_conn, new Runnable()
{
@Override
public void run()
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1694254&r1=1694253&r2=1694254&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Aug 5 15:58:28 2015
@@ -3693,17 +3693,24 @@ public abstract class AMQSession<C exten
{
if (!_queue.isEmpty())
{
- setUsingDispatcherForCleanup(true);
- drainDispatchQueue();
- setUsingDispatcherForCleanup(false);
+ try
+ {
+ setUsingDispatcherForCleanup(true);
+ drainDispatchQueue();
+ }
+ finally
+ {
+ setUsingDispatcherForCleanup(false);
+ }
}
}
protected void stopExistingDispatcher()
{
- if (_dispatcher != null)
+ Dispatcher dispatcher = _dispatcher;
+ if (dispatcher != null)
{
- _dispatcher.setConnectionStopped(true);
+ dispatcher.setConnectionStopped(true);
}
}
@@ -3714,5 +3721,10 @@ public abstract class AMQSession<C exten
suspendChannel(true);
}
}
+
+ protected void clearDispatchQueue()
+ {
+ _queue.clear();
+ }
}
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1694254&r1=1694253&r2=1694254&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Wed Aug 5 15:58:28 2015
@@ -182,8 +182,7 @@ public class AMQSession_0_8 extends AMQS
@Override
void resubscribe() throws QpidException
{
- // drain dispatch queue
- drainDispatchQueueWithDispatcher();
+ clearDispatchQueue();
getDeliveredMessageTags().clear();
super.resubscribe();
Added: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/ConnectionHelper.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/ConnectionHelper.java?rev=1694254&view=auto
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/ConnectionHelper.java (added)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/ConnectionHelper.java Wed Aug 5 15:58:28 2015
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.util.List;
+
+public class ConnectionHelper
+{
+
+ public static void doWithAllConnectionAndSessionLocks(AMQConnection connection, Runnable runnable)
+ {
+ doWithAllLocks(runnable, connection.getSessions().values(), connection.getFailoverMutex());
+ }
+
+ private static void doWithAllLocks(final Runnable r, final List<AMQSession> sessions, Object failoverMutex)
+ {
+ if (!sessions.isEmpty())
+ {
+ AMQSession session = sessions.remove(0);
+ final Object dispatcherLock = session.getDispatcherLock();
+ if (dispatcherLock != null)
+ {
+ synchronized (dispatcherLock)
+ {
+ synchronized (session.getMessageDeliveryLock())
+ {
+ doWithAllLocks(r, sessions, failoverMutex);
+ }
+ }
+ }
+ else
+ {
+ synchronized (session.getMessageDeliveryLock())
+ {
+ doWithAllLocks(r, sessions, failoverMutex);
+ }
+ }
+ }
+ else
+ {
+ synchronized (failoverMutex)
+ {
+ r.run();
+ }
+ }
+ }
+}
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=1694254&r1=1694253&r2=1694254&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Wed Aug 5 15:58:28 2015
@@ -23,6 +23,7 @@ package org.apache.qpid.client.failover;
import java.util.concurrent.CountDownLatch;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.ConnectionHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,10 +119,23 @@ public class FailoverHandler implements
final AMQConnection connection = _amqProtocolHandler.getConnection();
- // Since failover impacts several structures we protect them all with a single mutex. These structures
- // are also in child objects of the connection. This allows us to manipulate them without affecting
- // client code which runs in a separate thread.
- synchronized (connection.getFailoverMutex())
+ ConnectionHelper.doWithAllConnectionAndSessionLocks(connection, new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ performFailover();
+ }
+ });
+
+ _amqProtocolHandler.getFailoverLatch().countDown();
+ }
+
+ private void performFailover()
+ {
+ AMQConnection connection = _amqProtocolHandler.getConnection();
+
+ // brace to keep indentation
{
//Clear the exception now that we have the failover mutex there can be no one else waiting for a frame so
// we can clear the exception.
@@ -236,8 +250,6 @@ public class FailoverHandler implements
}
}
}
-
- _amqProtocolHandler.getFailoverLatch().countDown();
}
/**
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=1694254&r1=1694253&r2=1694254&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Wed Aug 5 15:58:28 2015
@@ -172,6 +172,26 @@ public class FlowControllingBlockingQueu
return _queue.iterator();
}
+ public void clear()
+ {
+ _queue.clear();
+
+ if (!disableFlowControl && _listener != null)
+ {
+ synchronized (_listener)
+ {
+ int count = _count;
+ _count = 0;
+
+ if (count >= _flowControlLowThreshold)
+ {
+ _listener.underThreshold(0);
+ }
+ }
+
+ }
+ }
+
private void reportAboveIfNecessary()
{
synchronized (_listener)
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1694254&r1=1694253&r2=1694254&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java Wed Aug 5 15:58:28 2015
@@ -1036,92 +1036,78 @@ public class FailoverBehaviourTest exten
public void testFailoverWhenConnectionStopped() throws Exception
{
- // not needed
- _connection.close();
-
- // not needed
- stopBroker(getFailingPort());
-
- // stop broker and add http management
- stopBroker();
- configureHttpManagement();
- startBroker();
-
- _connection = createConnectionWithFailover();
init(Session.SESSION_TRANSACTED, true);
- // populate broker with initial messages
- final int testMessageNumber = 10;
- produceMessages(TEST_MESSAGE_FORMAT, testMessageNumber, false);
+ produceMessages();
_producerSession.commit();
final CountDownLatch stopFlag = new CountDownLatch(1);
- final CountDownLatch consumerBlocker = new CountDownLatch(1);
final AtomicReference<Exception> exception = new AtomicReference<>();
- final CountDownLatch messageCounter = new CountDownLatch(testMessageNumber);
- try
+ final CountDownLatch expectedMessageLatch = new CountDownLatch(_messageNumber);
+ final AtomicInteger counter = new AtomicInteger();
+
+ _consumer.setMessageListener(new MessageListener()
{
- _consumer.setMessageListener(new MessageListener()
+ @Override
+ public void onMessage(Message message)
{
- @Override
- public void onMessage(Message message)
+ if (stopFlag.getCount() == 1)
{
- if (consumerBlocker.getCount() == 1)
+ try
+ {
+ _LOGGER.debug("Stopping connection from dispatcher thread");
+ _connection.stop();
+ _LOGGER.debug("Connection stopped from dispatcher thread");
+
+ }
+ catch (Exception e)
{
- try
- {
- consumerBlocker.await();
-
- _LOGGER.debug("Stopping connection from dispatcher thread");
- _connection.stop();
- _LOGGER.debug("Connection stopped from dispatcher thread");
- stopFlag.countDown();
- }
- catch (Exception e)
- {
- exception.set(e);
- }
+ exception.set(e);
}
+ finally
+ {
+ stopFlag.countDown();
+ failBroker(getFailingPort());
+ }
+
+ }
+ else
+ {
try
{
_consumerSession.commit();
- messageCounter.countDown();
+ counter.incrementAndGet();
+ expectedMessageLatch.countDown();
}
catch (Exception e)
{
exception.set(e);
}
}
- });
-
- int unacknowledgedMessageNumber = getUnacknowledgedMessageNumber(testMessageNumber);
+ }
+ });
- assertEquals("Unexpected number of unacknowledged messages", testMessageNumber, unacknowledgedMessageNumber);
- }
- finally
- {
- // stop blocking dispatcher thread
- consumerBlocker.countDown();
- }
boolean stopResult = stopFlag.await(2000, TimeUnit.MILLISECONDS);
assertTrue("Connection was not stopped" + (exception.get() == null ? "." : ":" + exception.get().getMessage()),
stopResult);
assertNull("Unexpected exception on stop :" + exception.get(), exception.get());
- closeConnectionViaManagement();
// wait for failover to complete
awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
assertFailoverException();
- // publish more messages when connection stopped
- produceMessages(TEST_MESSAGE_FORMAT, 2, false);
+ resendMessagesIfNecessary();
_producerSession.commit();
_connection.start();
- assertTrue("Not all messages were delivered. Remaining message number " + messageCounter.getCount(), messageCounter.await(11000, TimeUnit.MILLISECONDS));
+ assertTrue("Not all messages were delivered. Remaining message number " + expectedMessageLatch.getCount(), expectedMessageLatch.await(11000, TimeUnit.MILLISECONDS));
+
+ Thread.sleep(500l);
+ assertEquals("Unexpected messages recieved ", _messageNumber, counter.get());
+
_connection.close();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org