You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/07/31 10:34:37 UTC

svn commit: r1693542 - in /qpid/java/trunk: client/src/main/java/org/apache/qpid/client/ client/src/test/java/org/apache/qpid/client/ systests/src/test/java/org/apache/qpid/client/failover/ test-profiles/

Author: orudyy
Date: Fri Jul 31 08:34:36 2015
New Revision: 1693542

URL: http://svn.apache.org/r1693542
Log:
QPID-3521: Clear pre-dispatch queue in 0-8 client on failover process

Modified:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
    qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
    qpid/java/trunk/test-profiles/cpp.excludes

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1693542&r1=1693541&r2=1693542&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Jul 31 08:34:36 2015
@@ -2324,6 +2324,15 @@ public abstract class AMQSession<C exten
             _failedOverDirty = true;
         }
 
+        // Also reset the delivery tag tracker, to insure we dont
+        // return the first <total number of msgs received on session>
+        // messages sent by the brokers following the first rollback
+        // after failover
+        _highestDeliveryTag.set(-1);
+
+        _unacknowledgedMessageTags.clear();
+        _prefetchedMessageTags.clear();
+
         _rollbackMark.set(-1);
         resubscribeProducers();
         resubscribeConsumers();
@@ -2431,15 +2440,8 @@ public abstract class AMQSession<C exten
     void stop() throws QpidException
     {
         // Stop the server delivering messages to this session.
-        if (!(isClosed() || isClosing()))
-        {
-            suspendChannel(true);
-        }
-
-        if (_dispatcher != null)
-        {
-            _dispatcher.setConnectionStopped(true);
-        }
+        suspendChannelIfNotClosing();
+        stopExistingDispatcher();
     }
 
     private void checkNotTransacted() throws JMSException
@@ -3686,5 +3688,31 @@ public abstract class AMQSession<C exten
     {
         return _messageEncryptionHelper;
     }
+
+    protected void drainDispatchQueueWithDispatcher()
+    {
+        if (!_queue.isEmpty())
+        {
+            setUsingDispatcherForCleanup(true);
+            drainDispatchQueue();
+            setUsingDispatcherForCleanup(false);
+        }
+    }
+
+    protected void stopExistingDispatcher()
+    {
+        if (_dispatcher != null)
+        {
+            _dispatcher.setConnectionStopped(true);
+        }
+    }
+
+    protected void suspendChannelIfNotClosing() throws QpidException
+    {
+        if (!(isClosed() || isClosing()))
+        {
+            suspendChannel(true);
+        }
+    }
 }
 

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1693542&r1=1693541&r2=1693542&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Jul 31 08:34:36 2015
@@ -1278,17 +1278,11 @@ public class AMQSession_0_10 extends AMQ
     @Override
     void resubscribe() throws QpidException
     {
-        // Also reset the delivery tag tracker, to insure we dont
-        // return the first <total number of msgs received on session>
-        // messages sent by the brokers following the first rollback
-        // after failover
-        getHighestDeliveryTag().set(-1);
         // Clear txRangeSet/unacknowledgedMessageTags so we don't complete commands corresponding to
         //messages that came from the old broker.
         _txRangeSet.clear();
         _txSize = 0;
-        getUnacknowledgedMessageTags().clear();
-        getPrefetchedMessageTags().clear();
+
         super.resubscribe();
         getQpidSession().sync();
     }
@@ -1296,10 +1290,10 @@ public class AMQSession_0_10 extends AMQ
     @Override
     void stop() throws QpidException
     {
-        super.stop();
-        setUsingDispatcherForCleanup(true);
-        drainDispatchQueue();
-        setUsingDispatcherForCleanup(false);
+        // Stop the server delivering messages to this session.
+        suspendChannelIfNotClosing();
+        drainDispatchQueueWithDispatcher();
+        stopExistingDispatcher();
 
         for (BasicMessageConsumer consumer : getConsumers())
         {

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1693542&r1=1693541&r2=1693542&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Fri Jul 31 08:34:36 2015
@@ -179,6 +179,16 @@ public class AMQSession_0_8 extends AMQS
         getUnacknowledgedMessageTags().remove(deliveryTag);
     }
 
+    @Override
+    void resubscribe() throws QpidException
+    {
+        // drain dispatch queue
+        drainDispatchQueueWithDispatcher();
+
+        getDeliveredMessageTags().clear();
+        super.resubscribe();
+    }
+
     public void sendQueueBind(final String queueName, final String routingKey, final Map<String,Object> arguments,
                               final String exchangeName, final AMQDestination destination,
                               final boolean nowait) throws QpidException, FailoverException

Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1693542&r1=1693541&r2=1693542&view=diff
==============================================================================
--- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original)
+++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Fri Jul 31 08:34:36 2015
@@ -18,7 +18,14 @@
  */
 package org.apache.qpid.client;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 
 import javax.jms.JMSException;
@@ -28,10 +35,12 @@ import javax.jms.MessageProducer;
 import javax.jms.StreamMessage;
 
 import org.apache.qpid.client.message.AMQPEncodedListMessage;
+import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.*;
 import org.apache.qpid.transport.Connection.SessionFactory;
 import org.apache.qpid.transport.Connection.State;
+import org.apache.qpid.url.AMQBindingURL;
 
 /**
  * Tests AMQSession_0_10 methods.
@@ -456,6 +465,91 @@ public class AMQSession_0_10Test extends
         assertNotNull("QueueQuery command was not sent", command);
     }
 
+    public void testResubscribe() throws Exception
+    {
+        AMQSession_0_10 session = createAMQSession_0_10(AMQSession_0_10.AUTO_ACKNOWLEDGE);
+
+        AMQQueue queue1 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test1?routingkey='test1'&durable='true'"));
+        session.createProducer(queue1);
+        BasicMessageConsumer_0_10 consumer1 = (BasicMessageConsumer_0_10)session.createConsumer(queue1);
+
+        AMQQueue queue2 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test2?routingkey='test2'"));
+        session.createProducer(queue2);
+        BasicMessageConsumer_0_10 consumer2 = (BasicMessageConsumer_0_10)session.createConsumer(queue2);
+
+        UnprocessedMessage[] messages = new UnprocessedMessage[4];
+        for (int i =0; i< messages.length;i++ )
+        {
+            int consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag();
+            int deliveryTag = i + 1;
+            messages[i]= createMockMessage(deliveryTag, consumerTag);
+            session.messageReceived(messages[i]);
+            if (deliveryTag % 2 == 0)
+            {
+                session.addUnacknowledgedMessage(deliveryTag);
+            }
+        }
+
+        assertEquals("Unexpected highest delivery tag", 4, session.getHighestDeliveryTag().get());
+        assertFalse("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty());
+        assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers()));
+
+        // verify test messages were not dispatched
+        for (UnprocessedMessage message: messages )
+        {
+            verify(message, never()).dispatch(session);
+        }
+
+        session.resubscribe();
+
+        assertEquals("Unexpected highest delivery tag", -1, session.getHighestDeliveryTag().get());
+        assertTrue("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty());
+        assertTrue("Unexpected pre-fetched message tags", session.getPrefetchedMessageTags().isEmpty());
+        assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers()));
+    }
+
+    public void testFailoverPrep() throws Exception
+    {
+        AMQSession_0_10 session = createAMQSession_0_10(AMQSession_0_10.AUTO_ACKNOWLEDGE);
+
+        UnprocessedMessage[] messages = new UnprocessedMessage[4];
+        for (int i =0; i< messages.length;i++ )
+        {
+            int consumerTag = i % 2;
+            int deliveryTag = i + 1;
+            messages[i]= createMockMessage(deliveryTag, consumerTag);
+            session.messageReceived(messages[i]);
+            if (deliveryTag % 2 == 0)
+            {
+                session.addUnacknowledgedMessage(deliveryTag);
+            }
+        }
+
+        // verify test messages were not dispatched
+        for (UnprocessedMessage message: messages )
+        {
+            verify(message, never()).dispatch(session);
+        }
+
+        session.failoverPrep();
+
+        // verify dispatcher queue is drained
+        for (UnprocessedMessage message: messages )
+        {
+            verify(message).dispatch(session);
+        }
+    }
+
+    private UnprocessedMessage createMockMessage(long deliveryTag, int consumerTag)
+    {
+        UnprocessedMessage message = mock(UnprocessedMessage.class);
+        when(message.getConsumerTag()).thenReturn(consumerTag);
+        when(message.getDeliveryTag()).thenReturn(deliveryTag);
+        return message;
+    }
+
+
+
     private AMQAnyDestination createDestination()
     {
         AMQAnyDestination destination = null;

Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java?rev=1693542&r1=1693541&r2=1693542&view=diff
==============================================================================
--- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java (original)
+++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQSession_0_8Test.java Fri Jul 31 08:34:36 2015
@@ -20,9 +20,25 @@
  */
 package org.apache.qpid.client;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
 import org.apache.qpid.QpidException;
+import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.transport.TestNetworkConnection;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
 import org.apache.qpid.framing.QueueDeclareOkBody;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.network.NetworkConnection;
@@ -43,21 +59,8 @@ public class AMQSession_0_8Test extends
     {
         final String testQueueName = "tmp_127_0_0_1_1_1";
 
-        _connection.setConnectionListener(new ConnectionListenerSupport()
-        {
-            @Override
-            public void bytesSent(long count)
-            {
-                try
-                {
-                    _connection.getProtocolHandler().methodBodyReceived(1, new QueueDeclareOkBody(AMQShortString.valueOf(testQueueName), 0, 0));
-                }
-                catch (QpidException e)
-                {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
+        _connection.setConnectionListener(new MockReceiveConnectionListener(_connection, 1,
+                new QueueDeclareOkBody(AMQShortString.valueOf(testQueueName), 0, 0)));
 
         AMQSession_0_8 session = new AMQSession_0_8(_connection, 1, true, 0, 1, 1);
 
@@ -70,4 +73,137 @@ public class AMQSession_0_8Test extends
 
         assertEquals("Unexpected queue name", testQueueName, queue.getAMQQueueName());
     }
+
+    public void testResubscribe() throws Exception
+    {
+        // to verify producer resubscribe set qpid.declare_exchanges=true
+        setTestSystemProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "true");
+        setTestSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false");
+        setTestSystemProperty(ClientProperties.QPID_BIND_QUEUES_PROP_NAME, "false");
+
+        AMQSession_0_8 session = new AMQSession_0_8(_connection, 1, true, 0, 1, 1);
+
+        AMQQueue queue1 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test1?routingkey='test1'"));
+
+        // expecting exchange declare Ok
+        MockReceiveConnectionListener listener = new MockReceiveConnectionListener(_connection, 1, new ExchangeDeclareOkBody());
+        _connection.setConnectionListener(listener);
+        session.createProducer(queue1);
+        assertTrue("Not all expected commands have been sent on producer1 creation", listener.responsesEmpty());
+
+        // expecting exchange declare Ok, Channel flow Ok, Consume Ok
+        listener = new MockReceiveConnectionListener(_connection, 1,
+                new ExchangeDeclareOkBody(), new ChannelFlowOkBody(false), new BasicConsumeOkBody(AMQShortString.valueOf("1")));
+        _connection.setConnectionListener(listener );
+        BasicMessageConsumer_0_8 consumer1 = (BasicMessageConsumer_0_8)session.createConsumer(queue1);
+        assertTrue("Not all expected commands have been sent on consumer1 creation", listener.responsesEmpty());
+
+        // expecting exchange declare Ok
+        listener = new MockReceiveConnectionListener(_connection, 1, new ExchangeDeclareOkBody());
+        _connection.setConnectionListener(listener);
+        AMQQueue queue2 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test2?routingkey='test2'"));
+        session.createProducer(queue2);
+        assertTrue("Not all expected commands have been sent on producer2 creation", listener.responsesEmpty());
+
+
+        // expecting exchange declare Ok, Consume Ok
+        listener = new MockReceiveConnectionListener(_connection, 1,
+                new ExchangeDeclareOkBody(), new BasicConsumeOkBody(AMQShortString.valueOf("2")));
+        _connection.setConnectionListener(listener);
+
+        BasicMessageConsumer_0_8 consumer2 = (BasicMessageConsumer_0_8)session.createConsumer(queue2);
+        assertTrue("Not all expected commands have been sent on consumer2 creation", listener.responsesEmpty());
+
+        UnprocessedMessage[] messages = new UnprocessedMessage[4];
+        for (int i =0; i< messages.length;i++ )
+        {
+            int consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag();
+            int deliveryTag = i + 1;
+            messages[i]= createMockMessage(deliveryTag, consumerTag);
+            session.messageReceived(messages[i]);
+            if (deliveryTag % 2 == 0)
+            {
+                session.addDeliveredMessage(deliveryTag);
+            }
+            else
+            {
+                session.addUnacknowledgedMessage(deliveryTag);
+            }
+        }
+
+        assertEquals("Unexpected highest delivery tag", messages.length, session.getHighestDeliveryTag().get());
+        assertFalse("Unexpected delivered message tags", session.getDeliveredMessageTags().isEmpty());
+        assertFalse("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty());
+        assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers()));
+
+        // verify messages were not dispatched
+        for (UnprocessedMessage message: messages )
+        {
+            verify(message, never()).dispatch(session);
+        }
+
+        listener = new MockReceiveConnectionListener(_connection, 1,
+                new ExchangeDeclareOkBody(), // first producer resubscribe
+                new ExchangeDeclareOkBody(), // second producer resubscribe
+                new ExchangeDeclareOkBody(), new BasicConsumeOkBody(AMQShortString.valueOf("1")),  // first consumer resubscribe
+                new ExchangeDeclareOkBody(), new BasicConsumeOkBody(AMQShortString.valueOf("2"))); // second consumer resubscribe
+        _connection.setConnectionListener(listener);
+
+        session.resubscribe();
+
+        assertTrue("Not all expected commands have been sent on session resubscribe", listener.responsesEmpty());
+
+        assertEquals("Unexpected highest delivery tag", -1, session.getHighestDeliveryTag().get());
+        assertTrue("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty());
+        assertTrue("Unexpected delivered message tags", session.getDeliveredMessageTags().isEmpty());
+        assertTrue("Unexpected pre-fetched message tags", session.getPrefetchedMessageTags().isEmpty());
+        assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers()));
+
+        // verify dispatcher queue is drained
+        for (UnprocessedMessage message: messages )
+        {
+            verify(message).dispatch(session);
+        }
+    }
+
+    private UnprocessedMessage createMockMessage(long deliveryTag, int consumerTag)
+    {
+        UnprocessedMessage message = mock(UnprocessedMessage.class);
+        when(message.getConsumerTag()).thenReturn(consumerTag);
+        when(message.getDeliveryTag()).thenReturn(deliveryTag);
+        return message;
+    }
+
+    static class MockReceiveConnectionListener extends ConnectionListenerSupport
+    {
+        private final AMQConnection _connection;
+        private final List<AMQBody> _responses;
+        private final int _channelId;
+
+        MockReceiveConnectionListener(AMQConnection connection, int channelId, AMQBody... response)
+        {
+            _connection = connection;
+            _responses = new ArrayList<>(Arrays.asList(response));
+            _channelId = channelId;
+        }
+
+        @Override
+        public void bytesSent(long count)
+        {
+            try
+            {
+                AMQBody response = _responses.remove(0);
+                _connection.getProtocolHandler().methodBodyReceived(_channelId, response);
+            }
+            catch (QpidException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public boolean responsesEmpty()
+        {
+            return _responses.isEmpty();
+        }
+    }
 }

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java?rev=1693542&r1=1693541&r2=1693542&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java Fri Jul 31 08:34:36 2015
@@ -27,7 +27,15 @@ import org.apache.qpid.client.AMQSession
 import org.apache.qpid.jms.ConnectionListener;
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.server.management.plugin.HttpManagement;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Plugin;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.systest.rest.RestTestHelper;
 import org.apache.qpid.test.utils.FailoverBaseCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.test.utils.TestUtils;
 import org.apache.qpid.url.URLSyntaxException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +56,7 @@ import javax.jms.TextMessage;
 import javax.jms.TransactionRolledBackException;
 import javax.naming.NamingException;
 
+import java.io.IOException;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -1024,6 +1033,184 @@ public class FailoverBehaviourTest exten
         }
     }
 
+    public void testFailoverWhenConnectionStopped() throws Exception
+    {
+        // not needed
+        _connection.close();
+
+        // not needed
+        stopBroker(getFailingPort());
+
+        // stop broker and add http management
+        stopBroker();
+        configureHttpManagement();
+        startBroker();
+
+        _connection  = createConnectionWithFailover();
+        init(Session.SESSION_TRANSACTED, true);
+
+        // populate broker with initial messages
+        final int testMessageNumber = 10;
+        produceMessages(TEST_MESSAGE_FORMAT, testMessageNumber, false);
+        _producerSession.commit();
+
+        final CountDownLatch stopFlag = new CountDownLatch(1);
+        final CountDownLatch consumerBlocker = new CountDownLatch(1);
+        final AtomicReference<Exception> exception = new AtomicReference<>();
+        final CountDownLatch messageCounter = new CountDownLatch(testMessageNumber);
+        _consumer.setMessageListener(new MessageListener()
+        {
+            @Override
+            public void onMessage(Message message)
+            {
+                if (consumerBlocker.getCount() == 1)
+                {
+                    try
+                    {
+                        consumerBlocker.await();
+
+                        _LOGGER.debug("Stopping connection from dispatcher thread");
+                        _connection.stop();
+                        _LOGGER.debug("Connection stopped from dispatcher thread");
+                        stopFlag.countDown();
+                    }
+                    catch (Exception e)
+                    {
+                        exception.set(e);
+                    }
+                }
+
+                try
+                {
+                    _consumerSession.commit();
+                    messageCounter.countDown();
+                }
+                catch (Exception e)
+                {
+                    exception.set(e);
+                }
+            }
+        });
+
+        int unacknowledgedMessageNumber = getUnacknowledgedMessageNumber(testMessageNumber);
+
+        assertEquals("Unexpected number of unacknowledged messages", testMessageNumber, unacknowledgedMessageNumber);
+
+        // stop blocking dispatcher thread
+        consumerBlocker.countDown();
+
+        boolean stopResult = stopFlag.await(2000, TimeUnit.MILLISECONDS);
+        _LOGGER.debug("Thread dump:" + TestUtils.dumpThreads());
+        assertTrue("Connection was not stopped" + (exception.get() == null ? "." : ":" + exception.get().getMessage()),
+                stopResult);
+        assertNull("Unexpected exception on stop :" + exception.get(), exception.get());
+        closeConnectionViaManagement();
+
+        // wait for failover to complete
+        awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
+        assertFailoverException();
+
+        // publish more messages when connection stopped
+        produceMessages(TEST_MESSAGE_FORMAT, 2, false);
+        _producerSession.commit();
+
+        _connection.start();
+
+        assertTrue("Not all messages were delivered. Remaining message number " + messageCounter.getCount(), messageCounter.await(11000, TimeUnit.MILLISECONDS));
+        _connection.close();
+    }
+
+    private int getUnacknowledgedMessageNumber(int testMessageNumber) throws IOException, InterruptedException
+    {
+        int unacknowledgedMessageNumber = 0;
+        int i =0;
+        do
+        {
+            unacknowledgedMessageNumber = getUnacknowledgedMessageNumber();
+            if (unacknowledgedMessageNumber != testMessageNumber)
+            {
+                Thread.sleep(50);
+            }
+            else
+            {
+                break;
+            }
+        }
+        while (i++ < 20);
+        return unacknowledgedMessageNumber;
+    }
+
+    private void configureHttpManagement()
+    {
+        TestBrokerConfiguration config = getBrokerConfiguration();
+        config.addHttpManagementConfiguration();
+        String initialConfiguration = System.getProperty("virtualhostnode.context.blueprint");
+        if (initialConfiguration != null)
+        {
+            config.setObjectAttribute(VirtualHostNode.class, "test", VirtualHostNode.VIRTUALHOST_INITIAL_CONFIGURATION, initialConfiguration);
+        }
+        config.setObjectAttribute(AuthenticationProvider.class, TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER,
+                "secureOnlyMechanisms",
+                "{}");
+
+
+        // set password authentication provider on http port for the tests
+        config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT, Port.AUTHENTICATION_PROVIDER,
+                TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER);
+        config.setObjectAttribute(Plugin.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_MANAGEMENT, HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, true);
+        config.setSaved(false);
+    }
+
+    private void closeConnectionViaManagement() throws IOException
+    {
+        RestTestHelper restTestHelper = new RestTestHelper(getHttpManagementPort(getPort(0)));
+        try
+        {
+            restTestHelper.setUsernameAndPassword("webadmin", "webadmin");
+            List<Map<String, Object>> connections = restTestHelper.getJsonAsList("virtualhost/test/test/getConnections");
+            assertEquals("Unexpected number of connections", 1, connections.size());
+            Map<String, Object> connection = connections.get(0);
+            String connectionName = (String) connection.get(org.apache.qpid.server.model.Connection.NAME);
+            restTestHelper.submitRequest("connection/" + TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT + "/" + restTestHelper.encodeAsUTF( connectionName ), "DELETE", 200);
+        }
+        finally
+        {
+            restTestHelper.tearDown();
+        }
+    }
+
+    private int getUnacknowledgedMessageNumber() throws IOException
+    {
+        RestTestHelper restTestHelper = new RestTestHelper(getHttpManagementPort(getPort(0)));
+        try
+        {
+            restTestHelper.setUsernameAndPassword("webadmin", "webadmin");
+            List<Map<String, Object>> sessions = restTestHelper.getJsonAsList("session");
+            for(Map<String, Object> session: sessions )
+            {
+                List<Map<String, Object>> consumers =  (List<Map<String, Object>>)session.get("consumers");
+                if (consumers != null)
+                {
+                    Map<String, Object> consumer = consumers.get(0);
+                    Map<String, Object> stat = (Map<String, Object>)consumer.get("statistics");
+                    if (stat != null)
+                    {
+                        Number unacknowledgedMessages = (Number)stat.get("unacknowledgedMessages");
+                        if (unacknowledgedMessages != null)
+                        {
+                            return unacknowledgedMessages.intValue();
+                        }
+                    }
+                }
+            }
+            return 0;
+        }
+        finally
+        {
+            restTestHelper.tearDown();
+        }
+    }
+
     private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
     {
         final Map<String, Object> arguments = new HashMap<String, Object>();

Modified: qpid/java/trunk/test-profiles/cpp.excludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/cpp.excludes?rev=1693542&r1=1693541&r2=1693542&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/cpp.excludes (original)
+++ qpid/java/trunk/test-profiles/cpp.excludes Fri Jul 31 08:34:36 2015
@@ -29,3 +29,6 @@ org.apache.qpid.test.client.message.JMSD
 
 //BDB System Tests
 org.apache.qpid.server.store.berkeleydb.*
+
+// test relies on Java Broker REST interfaces
+org.apache.qpid.client.failover.FailoverBehaviourTest.testFailoverWhenConnectionStopped



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org