You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/07/31 10:34:37 UTC
svn commit: r1693542 - in /qpid/java/trunk:
client/src/main/java/org/apache/qpid/client/
client/src/test/java/org/apache/qpid/client/
systests/src/test/java/org/apache/qpid/client/failover/ test-profiles/
Author: orudyy
Date: Fri Jul 31 08:34:36 2015
New Revision: 1693542
URL: http://svn.apache.org/r1693542
Log:
QPID-3521: Clear pre-dispatch queue in 0-8 client on failover process
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
qpid/java/trunk/test-profiles/cpp.excludes
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1693542&r1=1693541&r2=1693542&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Jul 31 08:34:36 2015
@@ -2324,6 +2324,15 @@ public abstract class AMQSession<C exten
_failedOverDirty = true;
}
+ // Also reset the delivery tag tracker, to insure we dont
+ // return the first <total number of msgs received on session>
+ // messages sent by the brokers following the first rollback
+ // after failover
+ _highestDeliveryTag.set(-1);
+
+ _unacknowledgedMessageTags.clear();
+ _prefetchedMessageTags.clear();
+
_rollbackMark.set(-1);
resubscribeProducers();
resubscribeConsumers();
@@ -2431,15 +2440,8 @@ public abstract class AMQSession<C exten
void stop() throws QpidException
{
// Stop the server delivering messages to this session.
- if (!(isClosed() || isClosing()))
- {
- suspendChannel(true);
- }
-
- if (_dispatcher != null)
- {
- _dispatcher.setConnectionStopped(true);
- }
+ suspendChannelIfNotClosing();
+ stopExistingDispatcher();
}
private void checkNotTransacted() throws JMSException
@@ -3686,5 +3688,31 @@ public abstract class AMQSession<C exten
{
return _messageEncryptionHelper;
}
+
+ protected void drainDispatchQueueWithDispatcher()
+ {
+ if (!_queue.isEmpty())
+ {
+ setUsingDispatcherForCleanup(true);
+ drainDispatchQueue();
+ setUsingDispatcherForCleanup(false);
+ }
+ }
+
+ protected void stopExistingDispatcher()
+ {
+ if (_dispatcher != null)
+ {
+ _dispatcher.setConnectionStopped(true);
+ }
+ }
+
+ protected void suspendChannelIfNotClosing() throws QpidException
+ {
+ if (!(isClosed() || isClosing()))
+ {
+ suspendChannel(true);
+ }
+ }
}
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1693542&r1=1693541&r2=1693542&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Jul 31 08:34:36 2015
@@ -1278,17 +1278,11 @@ public class AMQSession_0_10 extends AMQ
@Override
void resubscribe() throws QpidException
{
- // Also reset the delivery tag tracker, to insure we dont
- // return the first <total number of msgs received on session>
- // messages sent by the brokers following the first rollback
- // after failover
- getHighestDeliveryTag().set(-1);
// Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to
//messages that came from the old broker.
_txRangeSet.clear();
_txSize = 0;
- getUnacknowledgedMessageTags().clear();
- getPrefetchedMessageTags().clear();
+
super.resubscribe();
getQpidSession().sync();
}
@@ -1296,10 +1290,10 @@ public class AMQSession_0_10 extends AMQ
@Override
void stop() throws QpidException
{
- super.stop();
- setUsingDispatcherForCleanup(true);
- drainDispatchQueue();
- setUsingDispatcherForCleanup(false);
+ // Stop the server delivering messages to this session.
+ suspendChannelIfNotClosing();
+ drainDispatchQueueWithDispatcher();
+ stopExistingDispatcher();
for (BasicMessageConsumer consumer : getConsumers())
{
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1693542&r1=1693541&r2=1693542&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Fri Jul 31 08:34:36 2015
@@ -179,6 +179,16 @@ public class AMQSession_0_8 extends AMQS
getUnacknowledgedMessageTags().remove(deliveryTag);
}
+ @Override
+ void resubscribe() throws QpidException
+ {
+ // drain dispatch queue
+ drainDispatchQueueWithDispatcher();
+
+ getDeliveredMessageTags().clear();
+ super.resubscribe();
+ }
+
public void sendQueueBind(final String queueName, final String routingKey, final Map<String,Object> arguments,
final String exchangeName, final AMQDestination destination,
final boolean nowait) throws QpidException, FailoverException
Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1693542&r1=1693541&r2=1693542&view=diff
==============================================================================
--- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original)
+++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Fri Jul 31 08:34:36 2015
@@ -18,7 +18,14 @@
*/
package org.apache.qpid.client;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import javax.jms.JMSException;
@@ -28,10 +35,12 @@ import javax.jms.MessageProducer;
import javax.jms.StreamMessage;
import org.apache.qpid.client.message.AMQPEncodedListMessage;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.*;
import org.apache.qpid.transport.Connection.SessionFactory;
import org.apache.qpid.transport.Connection.State;
+import org.apache.qpid.url.AMQBindingURL;
/**
* Tests AMQSession_0_10 methods.
@@ -456,6 +465,91 @@ public class AMQSession_0_10Test extends
assertNotNull("QueueQuery command was not sent", command);
}
+ public void testResubscribe() throws Exception
+ {
+ AMQSession_0_10 session = createAMQSession_0_10(AMQSession_0_10.AUTO_ACKNOWLEDGE);
+
+ AMQQueue queue1 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test1?routingkey='test1'&durable='true'"));
+ session.createProducer(queue1);
+ BasicMessageConsumer_0_10 consumer1 = (BasicMessageConsumer_0_10)session.createConsumer(queue1);
+
+ AMQQueue queue2 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test2?routingkey='test2'"));
+ session.createProducer(queue2);
+ BasicMessageConsumer_0_10 consumer2 = (BasicMessageConsumer_0_10)session.createConsumer(queue2);
+
+ UnprocessedMessage[] messages = new UnprocessedMessage[4];
+ for (int i =0; i< messages.length;i++ )
+ {
+ int consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag();
+ int deliveryTag = i + 1;
+ messages[i]= createMockMessage(deliveryTag, consumerTag);
+ session.messageReceived(messages[i]);
+ if (deliveryTag % 2 == 0)
+ {
+ session.addUnacknowledgedMessage(deliveryTag);
+ }
+ }
+
+ assertEquals("Unexpected highest delivery tag", 4, session.getHighestDeliveryTag().get());
+ assertFalse("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty());
+ assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers()));
+
+ // verify test messages were not dispatched
+ for (UnprocessedMessage message: messages )
+ {
+ verify(message, never()).dispatch(session);
+ }
+
+ session.resubscribe();
+
+ assertEquals("Unexpected highest delivery tag", -1, session.getHighestDeliveryTag().get());
+ assertTrue("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty());
+ assertTrue("Unexpected pre-fetched message tags", session.getPrefetchedMessageTags().isEmpty());
+ assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers()));
+ }
+
+ public void testFailoverPrep() throws Exception
+ {
+ AMQSession_0_10 session = createAMQSession_0_10(AMQSession_0_10.AUTO_ACKNOWLEDGE);
+
+ UnprocessedMessage[] messages = new UnprocessedMessage[4];
+ for (int i =0; i< messages.length;i++ )
+ {
+ int consumerTag = i % 2;
+ int deliveryTag = i + 1;
+ messages[i]= createMockMessage(deliveryTag, consumerTag);
+ session.messageReceived(messages[i]);
+ if (deliveryTag % 2 == 0)
+ {
+ session.addUnacknowledgedMessage(deliveryTag);
+ }
+ }
+
+ // verify test messages were not dispatched
+ for (UnprocessedMessage message: messages )
+ {
+ verify(message, never()).dispatch(session);
+ }
+
+ session.failoverPrep();
+
+ // verify dispatcher queue is drained
+ for (UnprocessedMessage message: messages )
+ {
+ verify(message).dispatch(session);
+ }
+ }
+
+ private UnprocessedMessage createMockMessage(long deliveryTag, int consumerTag)
+ {
+ UnprocessedMessage message = mock(UnprocessedMessage.class);
+ when(message.getConsumerTag()).thenReturn(consumerTag);
+ when(message.getDeliveryTag()).thenReturn(deliveryTag);
+ return message;
+ }
+
+
+
private AMQAnyDestination createDestination()
{
AMQAnyDestination destination = null;
Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java?rev=1693542&r1=1693541&r2=1693542&view=diff
==============================================================================
--- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java (original)
+++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java Fri Jul 31 08:34:36 2015
@@ -20,9 +20,25 @@
*/
package org.apache.qpid.client;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
import org.apache.qpid.QpidException;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.transport.TestNetworkConnection;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -43,21 +59,8 @@ public class AMQSession_0_8Test extends
{
final String testQueueName = "tmp_127_0_0_1_1_1";
- _connection.setConnectionListener(new ConnectionListenerSupport()
- {
- @Override
- public void bytesSent(long count)
- {
- try
- {
- _connection.getProtocolHandler().methodBodyReceived(1, new QueueDeclareOkBody(AMQShortString.valueOf(testQueueName), 0, 0));
- }
- catch (QpidException e)
- {
- throw new RuntimeException(e);
- }
- }
- });
+ _connection.setConnectionListener(new MockReceiveConnectionListener(_connection, 1,
+ new QueueDeclareOkBody(AMQShortString.valueOf(testQueueName), 0, 0)));
AMQSession_0_8 session = new AMQSession_0_8(_connection, 1, true, 0, 1, 1);
@@ -70,4 +73,137 @@ public class AMQSession_0_8Test extends
assertEquals("Unexpected queue name", testQueueName, queue.getAMQQueueName());
}
+
+ public void testResubscribe() throws Exception
+ {
+ // to verify producer resubscribe set qpid.declare_exchanges=true
+ setTestSystemProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "true");
+ setTestSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false");
+ setTestSystemProperty(ClientProperties.QPID_BIND_QUEUES_PROP_NAME, "false");
+
+ AMQSession_0_8 session = new AMQSession_0_8(_connection, 1, true, 0, 1, 1);
+
+ AMQQueue queue1 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test1?routingkey='test1'"));
+
+ // expecting exchange declare Ok
+ MockReceiveConnectionListener listener = new MockReceiveConnectionListener(_connection, 1, new ExchangeDeclareOkBody());
+ _connection.setConnectionListener(listener);
+ session.createProducer(queue1);
+ assertTrue("Not all expected commands have been sent on producer1 creation", listener.responsesEmpty());
+
+ // expecting exchange declare Ok, Channel flow Ok, Consume Ok
+ listener = new MockReceiveConnectionListener(_connection, 1,
+ new ExchangeDeclareOkBody(), new ChannelFlowOkBody(false), new BasicConsumeOkBody(AMQShortString.valueOf("1")));
+ _connection.setConnectionListener(listener );
+ BasicMessageConsumer_0_8 consumer1 = (BasicMessageConsumer_0_8)session.createConsumer(queue1);
+ assertTrue("Not all expected commands have been sent on consumer1 creation", listener.responsesEmpty());
+
+ // expecting exchange declare Ok
+ listener = new MockReceiveConnectionListener(_connection, 1, new ExchangeDeclareOkBody());
+ _connection.setConnectionListener(listener);
+ AMQQueue queue2 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test2?routingkey='test2'"));
+ session.createProducer(queue2);
+ assertTrue("Not all expected commands have been sent on producer2 creation", listener.responsesEmpty());
+
+
+ // expecting exchange declare Ok, Consume Ok
+ listener = new MockReceiveConnectionListener(_connection, 1,
+ new ExchangeDeclareOkBody(), new BasicConsumeOkBody(AMQShortString.valueOf("2")));
+ _connection.setConnectionListener(listener);
+
+ BasicMessageConsumer_0_8 consumer2 = (BasicMessageConsumer_0_8)session.createConsumer(queue2);
+ assertTrue("Not all expected commands have been sent on consumer2 creation", listener.responsesEmpty());
+
+ UnprocessedMessage[] messages = new UnprocessedMessage[4];
+ for (int i =0; i< messages.length;i++ )
+ {
+ int consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag();
+ int deliveryTag = i + 1;
+ messages[i]= createMockMessage(deliveryTag, consumerTag);
+ session.messageReceived(messages[i]);
+ if (deliveryTag % 2 == 0)
+ {
+ session.addDeliveredMessage(deliveryTag);
+ }
+ else
+ {
+ session.addUnacknowledgedMessage(deliveryTag);
+ }
+ }
+
+ assertEquals("Unexpected highest delivery tag", messages.length, session.getHighestDeliveryTag().get());
+ assertFalse("Unexpected delivered message tags", session.getDeliveredMessageTags().isEmpty());
+ assertFalse("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty());
+ assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers()));
+
+ // verify messages were not dispatched
+ for (UnprocessedMessage message: messages )
+ {
+ verify(message, never()).dispatch(session);
+ }
+
+ listener = new MockReceiveConnectionListener(_connection, 1,
+ new ExchangeDeclareOkBody(), // first producer resubscribe
+ new ExchangeDeclareOkBody(), // second producer resubscribe
+ new ExchangeDeclareOkBody(), new BasicConsumeOkBody(AMQShortString.valueOf("1")), // first consumer resubscribe
+ new ExchangeDeclareOkBody(), new BasicConsumeOkBody(AMQShortString.valueOf("2"))); // second consumer resubscribe
+ _connection.setConnectionListener(listener);
+
+ session.resubscribe();
+
+ assertTrue("Not all expected commands have been sent on session resubscribe", listener.responsesEmpty());
+
+ assertEquals("Unexpected highest delivery tag", -1, session.getHighestDeliveryTag().get());
+ assertTrue("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty());
+ assertTrue("Unexpected delivered message tags", session.getDeliveredMessageTags().isEmpty());
+ assertTrue("Unexpected pre-fetched message tags", session.getPrefetchedMessageTags().isEmpty());
+ assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers()));
+
+ // verify dispatcher queue is drained
+ for (UnprocessedMessage message: messages )
+ {
+ verify(message).dispatch(session);
+ }
+ }
+
+ private UnprocessedMessage createMockMessage(long deliveryTag, int consumerTag)
+ {
+ UnprocessedMessage message = mock(UnprocessedMessage.class);
+ when(message.getConsumerTag()).thenReturn(consumerTag);
+ when(message.getDeliveryTag()).thenReturn(deliveryTag);
+ return message;
+ }
+
+ static class MockReceiveConnectionListener extends ConnectionListenerSupport
+ {
+ private final AMQConnection _connection;
+ private final List<AMQBody> _responses;
+ private final int _channelId;
+
+ MockReceiveConnectionListener(AMQConnection connection, int channelId, AMQBody... response)
+ {
+ _connection = connection;
+ _responses = new ArrayList<>(Arrays.asList(response));
+ _channelId = channelId;
+ }
+
+ @Override
+ public void bytesSent(long count)
+ {
+ try
+ {
+ AMQBody response = _responses.remove(0);
+ _connection.getProtocolHandler().methodBodyReceived(_channelId, response);
+ }
+ catch (QpidException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public boolean responsesEmpty()
+ {
+ return _responses.isEmpty();
+ }
+ }
}
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1693542&r1=1693541&r2=1693542&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java Fri Jul 31 08:34:36 2015
@@ -27,7 +27,15 @@ import org.apache.qpid.client.AMQSession
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.server.management.plugin.HttpManagement;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Plugin;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.systest.rest.RestTestHelper;
import org.apache.qpid.test.utils.FailoverBaseCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.test.utils.TestUtils;
import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +56,7 @@ import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import javax.naming.NamingException;
+import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
@@ -1024,6 +1033,184 @@ public class FailoverBehaviourTest exten
}
}
+ public void testFailoverWhenConnectionStopped() throws Exception
+ {
+ // not needed
+ _connection.close();
+
+ // not needed
+ stopBroker(getFailingPort());
+
+ // stop broker and add http management
+ stopBroker();
+ configureHttpManagement();
+ startBroker();
+
+ _connection = createConnectionWithFailover();
+ init(Session.SESSION_TRANSACTED, true);
+
+ // populate broker with initial messages
+ final int testMessageNumber = 10;
+ produceMessages(TEST_MESSAGE_FORMAT, testMessageNumber, false);
+ _producerSession.commit();
+
+ final CountDownLatch stopFlag = new CountDownLatch(1);
+ final CountDownLatch consumerBlocker = new CountDownLatch(1);
+ final AtomicReference<Exception> exception = new AtomicReference<>();
+ final CountDownLatch messageCounter = new CountDownLatch(testMessageNumber);
+ _consumer.setMessageListener(new MessageListener()
+ {
+ @Override
+ public void onMessage(Message message)
+ {
+ if (consumerBlocker.getCount() == 1)
+ {
+ try
+ {
+ consumerBlocker.await();
+
+ _LOGGER.debug("Stopping connection from dispatcher thread");
+ _connection.stop();
+ _LOGGER.debug("Connection stopped from dispatcher thread");
+ stopFlag.countDown();
+ }
+ catch (Exception e)
+ {
+ exception.set(e);
+ }
+ }
+
+ try
+ {
+ _consumerSession.commit();
+ messageCounter.countDown();
+ }
+ catch (Exception e)
+ {
+ exception.set(e);
+ }
+ }
+ });
+
+ int unacknowledgedMessageNumber = getUnacknowledgedMessageNumber(testMessageNumber);
+
+ assertEquals("Unexpected number of unacknowledged messages", testMessageNumber, unacknowledgedMessageNumber);
+
+ // stop blocking dispatcher thread
+ consumerBlocker.countDown();
+
+ boolean stopResult = stopFlag.await(2000, TimeUnit.MILLISECONDS);
+ _LOGGER.debug("Thread dump:" + TestUtils.dumpThreads());
+ assertTrue("Connection was not stopped" + (exception.get() == null ? "." : ":" + exception.get().getMessage()),
+ stopResult);
+ assertNull("Unexpected exception on stop :" + exception.get(), exception.get());
+ closeConnectionViaManagement();
+
+ // wait for failover to complete
+ awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
+ assertFailoverException();
+
+ // publish more messages when connection stopped
+ produceMessages(TEST_MESSAGE_FORMAT, 2, false);
+ _producerSession.commit();
+
+ _connection.start();
+
+ assertTrue("Not all messages were delivered. Remaining message number " + messageCounter.getCount(), messageCounter.await(11000, TimeUnit.MILLISECONDS));
+ _connection.close();
+ }
+
+ private int getUnacknowledgedMessageNumber(int testMessageNumber) throws IOException, InterruptedException
+ {
+ int unacknowledgedMessageNumber = 0;
+ int i =0;
+ do
+ {
+ unacknowledgedMessageNumber = getUnacknowledgedMessageNumber();
+ if (unacknowledgedMessageNumber != testMessageNumber)
+ {
+ Thread.sleep(50);
+ }
+ else
+ {
+ break;
+ }
+ }
+ while (i++ < 20);
+ return unacknowledgedMessageNumber;
+ }
+
+ private void configureHttpManagement()
+ {
+ TestBrokerConfiguration config = getBrokerConfiguration();
+ config.addHttpManagementConfiguration();
+ String initialConfiguration = System.getProperty("virtualhostnode.context.blueprint");
+ if (initialConfiguration != null)
+ {
+ config.setObjectAttribute(VirtualHostNode.class, "test", VirtualHostNode.VIRTUALHOST_INITIAL_CONFIGURATION, initialConfiguration);
+ }
+ config.setObjectAttribute(AuthenticationProvider.class, TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER,
+ "secureOnlyMechanisms",
+ "{}");
+
+
+ // set password authentication provider on http port for the tests
+ config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.AUTHENTICATION_PROVIDER,
+ TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER);
+ config.setObjectAttribute(Plugin.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_MANAGEMENT, HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, true);
+ config.setSaved(false);
+ }
+
+ private void closeConnectionViaManagement() throws IOException
+ {
+ RestTestHelper restTestHelper = new RestTestHelper(getHttpManagementPort(getPort(0)));
+ try
+ {
+ restTestHelper.setUsernameAndPassword("webadmin", "webadmin");
+ List<Map<String, Object>> connections = restTestHelper.getJsonAsList("virtualhost/test/test/getConnections");
+ assertEquals("Unexpected number of connections", 1, connections.size());
+ Map<String, Object> connection = connections.get(0);
+ String connectionName = (String) connection.get(org.apache.qpid.server.model.Connection.NAME);
+ restTestHelper.submitRequest("connection/" + TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT + "/" + restTestHelper.encodeAsUTF( connectionName ), "DELETE", 200);
+ }
+ finally
+ {
+ restTestHelper.tearDown();
+ }
+ }
+
+ private int getUnacknowledgedMessageNumber() throws IOException
+ {
+ RestTestHelper restTestHelper = new RestTestHelper(getHttpManagementPort(getPort(0)));
+ try
+ {
+ restTestHelper.setUsernameAndPassword("webadmin", "webadmin");
+ List<Map<String, Object>> sessions = restTestHelper.getJsonAsList("session");
+ for(Map<String, Object> session: sessions )
+ {
+ List<Map<String, Object>> consumers = (List<Map<String, Object>>)session.get("consumers");
+ if (consumers != null)
+ {
+ Map<String, Object> consumer = consumers.get(0);
+ Map<String, Object> stat = (Map<String, Object>)consumer.get("statistics");
+ if (stat != null)
+ {
+ Number unacknowledgedMessages = (Number)stat.get("unacknowledgedMessages");
+ if (unacknowledgedMessages != null)
+ {
+ return unacknowledgedMessages.intValue();
+ }
+ }
+ }
+ }
+ return 0;
+ }
+ finally
+ {
+ restTestHelper.tearDown();
+ }
+ }
+
private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
{
final Map<String, Object> arguments = new HashMap<String, Object>();
Modified: qpid/java/trunk/test-profiles/cpp.excludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/cpp.excludes?rev=1693542&r1=1693541&r2=1693542&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/cpp.excludes (original)
+++ qpid/java/trunk/test-profiles/cpp.excludes Fri Jul 31 08:34:36 2015
@@ -29,3 +29,6 @@ org.apache.qpid.test.client.message.JMSD
//BDB System Tests
org.apache.qpid.server.store.berkeleydb.*
+
+// test relies on Java Broker REST interfaces
+org.apache.qpid.client.failover.FailoverBehaviourTest.testFailoverWhenConnectionStopped
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org