You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/21 10:11:13 UTC
[2/5] qpid-broker-j git commit: QPID-6933: [System Tests] Move AMQP
0-x client specific test failover to client suite
QPID-6933: [System Tests] Move AMQP 0-x client specific test failover to client suite
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/588c65f7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/588c65f7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/588c65f7
Branch: refs/heads/master
Commit: 588c65f77406318c1884cf0aed37bf74f1f495ae
Parents: 25f11ba
Author: Keith Wall <kw...@apache.org>
Authored: Sun Jan 21 08:48:15 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Sun Jan 21 08:48:15 2018 +0000
----------------------------------------------------------------------
.../AddressBasedFailoverBehaviourTest.java | 34 -
.../client/failover/FailoverBehaviourTest.java | 1630 ------------------
.../failover/MultipleBrokersFailoverTest.java | 272 ---
.../server/failover/FailoverMethodTest.java | 271 ---
.../qpid/test/client/failover/FailoverTest.java | 325 ----
.../qpid/test/utils/FailoverBaseCase.java | 163 --
test-profiles/CPPExcludes | 10 -
test-profiles/ExternalBrokerTests | 9 -
test-profiles/Java010Excludes | 14 -
test-profiles/Java10Excludes | 13 -
test-profiles/JavaPre010Excludes | 3 -
test-profiles/JavaTransientExcludes | 4 -
12 files changed, 2748 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/systests/src/test/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java b/systests/src/test/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java
deleted file mode 100644
index 99fcbc5..0000000
--- a/systests/src/test/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.failover;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-public class AddressBasedFailoverBehaviourTest extends FailoverBehaviourTest
-{
- @Override
- protected Destination createDestination(Session session) throws JMSException
- {
- return session.createQueue("ADDR:" +getTestQueueName() + "_" + System.currentTimeMillis() + "; {create: always}");
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
deleted file mode 100644
index dd04d6d..0000000
--- a/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
+++ /dev/null
@@ -1,1630 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.client.failover;
-
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.TransactionRolledBackException;
-import javax.naming.NamingException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.BrokerDetails;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.FailoverPolicy;
-import org.apache.qpid.test.utils.FailoverBaseCase;
-import org.apache.qpid.url.URLSyntaxException;
-
-/**
- * Test suite to test all possible failover corner cases
- */
-public class FailoverBehaviourTest extends FailoverBaseCase implements ExceptionListener
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(FailoverBehaviourTest.class);
-
- private static final String TEST_MESSAGE_FORMAT = "test message {0}";
-
- /** Indicates whether tests are run against clustered broker */
- private static boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
-
- /** Default number of messages to send before failover */
- private static final int DEFAULT_NUMBER_OF_MESSAGES = 40;
-
- /** Actual number of messages to send before failover */
- protected int _messageNumber = Integer.getInteger("profile.failoverMsgCount", DEFAULT_NUMBER_OF_MESSAGES);
-
- /** Test connection */
- protected Connection _connection;
-
- /**
- * Consumer session
- */
- private Session _consumerSession;
-
- /**
- * Test destination
- */
- private Destination _destination;
-
- /**
- * Consumer
- */
- private MessageConsumer _consumer;
-
- /**
- * Producer session
- */
- private Session _producerSession;
-
- /**
- * Producer
- */
- private MessageProducer _producer;
-
- /**
- * Holds exception sent into {@link ExceptionListener} on failover
- */
- private JMSException _exceptionListenerException;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
-
- _connection = getConnection();
- _connection.setExceptionListener(this);
- ((AMQConnection) _connection).setConnectionListener(this);
- }
-
- /**
- * Test whether MessageProducer can successfully publish messages after
- * failover and rollback transaction
- */
- public void testMessageProducingAndRollbackAfterFailover() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
- produceMessages();
- causeFailure();
-
- assertFailoverException();
- // producer should be able to send messages after failover
- _producer.send(_producerSession.createTextMessage("test message " + _messageNumber));
-
- // rollback after failover
- _producerSession.rollback();
-
- // tests whether sending and committing is working after failover
- produceMessages();
- _producerSession.commit();
-
- // tests whether receiving and committing is working after failover
- consumeMessages();
- _consumerSession.commit();
- }
-
- /**
- * Test whether {@link TransactionRolledBackException} is thrown on commit
- * of dirty transacted session after failover.
- * <p>
- * Verifies whether second after failover commit is successful.
- */
- public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnProducingMessages() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
- produceMessages();
- causeFailure();
-
- assertFailoverException();
-
- // producer should be able to send messages after failover
- _producer.send(_producerSession.createTextMessage("test message " + _messageNumber));
-
- try
- {
- _producerSession.commit();
- fail("TransactionRolledBackException is expected on commit after failover with dirty session!");
- }
- catch (JMSException t)
- {
- assertTrue("Expected TransactionRolledBackException but thrown " + t,
- t instanceof TransactionRolledBackException);
- }
-
- // simulate process of user replaying the transaction
- produceMessages("replayed test message {0}", _messageNumber, false);
-
- // no exception should be thrown
- _producerSession.commit();
-
- // only messages sent after rollback should be received
- consumeMessages("replayed test message {0}", _messageNumber);
-
- // no exception should be thrown
- _consumerSession.commit();
- }
-
- /**
- * Tests JMSException is not thrown on commit with a clean session after
- * failover
- */
- public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanProducerSession() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
-
- causeFailure();
-
- assertFailoverException();
-
- // should not throw an exception for a clean session
- _producerSession.commit();
-
- // tests whether sending and committing is working after failover
- produceMessages();
- _producerSession.commit();
-
- // tests whether receiving and committing is working after failover
- consumeMessages();
- _consumerSession.commit();
- }
-
- /**
- * Tests {@link TransactionRolledBackException} is thrown on commit of dirty
- * transacted session after failover.
- * <p>
- * Verifies whether second after failover commit is successful.
- */
- public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnMessageReceiving() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
- produceMessages();
- _producerSession.commit();
-
- // receive messages but do not commit
- consumeMessages();
-
- causeFailure();
-
- assertFailoverException();
-
- try
- {
- // should throw TransactionRolledBackException
- _consumerSession.commit();
- fail("TransactionRolledBackException is expected on commit after failover");
- }
- catch (Exception t)
- {
- assertTrue("Expected TransactionRolledBackException but thrown " + t,
- t instanceof TransactionRolledBackException);
- }
-
- resendMessagesIfNecessary();
-
- // consume messages successfully
- consumeMessages();
- _consumerSession.commit();
- }
-
- /**
- * Tests JMSException is not thrown on commit with a clean session after failover
- */
- public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanConsumerSession() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
- produceMessages();
- _producerSession.commit();
-
- consumeMessages();
- _consumerSession.commit();
-
- causeFailure();
-
- assertFailoverException();
-
- // should not throw an exception with a clean consumer session
- _consumerSession.commit();
- }
-
- /**
- * Test that TransactionRolledBackException is thrown on commit of
- * dirty session in asynchronous consumer after failover.
- */
- public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnReceivingMessagesAsynchronously()
- throws Exception
- {
- init(Session.SESSION_TRANSACTED, false);
- FailoverTestMessageListener ml = new FailoverTestMessageListener();
- _consumer.setMessageListener(ml);
-
- _connection.start();
-
- produceMessages();
- _producerSession.commit();
-
- // wait for message receiving
- ml.awaitForEnd();
-
- assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
-
- // assert messages
- int counter = 0;
- for (Message message : ml.getReceivedMessages())
- {
- assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
- }
- ml.reset();
-
- causeFailure();
- assertFailoverException();
-
-
- try
- {
- _consumerSession.commit();
- fail("TransactionRolledBackException should be thrown!");
- }
- catch (TransactionRolledBackException e)
- {
- // that is what is expected
- }
-
- resendMessagesIfNecessary();
-
- // wait for message receiving
- ml.awaitForEnd();
-
- assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
-
- // assert messages
- counter = 0;
- for (Message message : ml.getReceivedMessages())
- {
- assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
- }
-
- // commit again. It should be successful
- _consumerSession.commit();
- }
-
- /**
- * Test that {@link Session#rollback()} does not throw exception after failover
- * and that we are able to consume messages.
- */
- public void testRollbackAfterFailover() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
-
- produceMessages();
- _producerSession.commit();
-
- consumeMessages();
-
- causeFailure();
-
- assertFailoverException();
-
- _consumerSession.rollback();
-
- resendMessagesIfNecessary();
-
- // tests whether receiving and committing is working after failover
- consumeMessages();
- _consumerSession.commit();
- }
-
- /**
- * Test that {@link Session#rollback()} does not throw exception after receiving further messages
- * after failover, and we can receive published messages after rollback.
- */
- public void testRollbackAfterReceivingAfterFailover() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
-
- produceMessages();
- _producerSession.commit();
-
- consumeMessages();
- causeFailure();
-
- assertFailoverException();
-
- resendMessagesIfNecessary();
-
- consumeMessages();
-
- _consumerSession.rollback();
-
- // tests whether receiving and committing is working after failover
- consumeMessages();
- _consumerSession.commit();
- }
-
- /**
- * Test that {@link Session#recover()} does not throw an exception after failover
- * and that we can consume messages after recover.
- */
- public void testRecoverAfterFailover() throws Exception
- {
- init(Session.CLIENT_ACKNOWLEDGE, true);
-
- produceMessages();
-
- // consume messages but do not acknowledge them
- consumeMessages();
-
- causeFailure();
-
- assertFailoverException();
-
- _consumerSession.recover();
-
- resendMessagesIfNecessary();
-
- // tests whether receiving and acknowledgment is working after recover
- Message lastMessage = consumeMessages();
- lastMessage.acknowledge();
- }
-
- /**
- * Test that receiving more messages after failover and then calling
- * {@link Session#recover()} does not throw an exception
- * and that we can consume messages after recover.
- */
- public void testRecoverWithConsumedMessagesAfterFailover() throws Exception
- {
- init(Session.CLIENT_ACKNOWLEDGE, true);
-
- produceMessages();
-
- // consume messages but do not acknowledge them
- consumeMessages();
-
- causeFailure();
-
- assertFailoverException();
-
- // publishing should work after failover
- resendMessagesIfNecessary();
-
- // consume messages again on a dirty session
- consumeMessages();
-
- // recover should successfully restore session
- _consumerSession.recover();
-
- // tests whether receiving and acknowledgment is working after recover
- Message lastMessage = consumeMessages();
- lastMessage.acknowledge();
- }
-
- /**
- * Test that first call to {@link Message#acknowledge()} after failover
- * throws a JMSEXception if session is dirty.
- */
- public void testAcknowledgeAfterFailover() throws Exception
- {
- init(Session.CLIENT_ACKNOWLEDGE, true);
-
- produceMessages();
-
- // consume messages but do not acknowledge them
- Message lastMessage = consumeMessages();
- causeFailure();
-
- assertFailoverException();
-
- try
- {
- // an implicit recover performed when acknowledge throws an exception due to failover
- lastMessage.acknowledge();
- fail("JMSException should be thrown");
- }
- catch (JMSException t)
- {
- // TODO: assert error code and/or expected exception type
- }
-
- resendMessagesIfNecessary();
-
- // tests whether receiving and acknowledgment is working after recover
- lastMessage = consumeMessages();
- lastMessage.acknowledge();
- }
-
- /**
- * Test that calling acknowledge before failover leaves the session
- * clean for use after failover.
- */
- public void testAcknowledgeBeforeFailover() throws Exception
- {
- init(Session.CLIENT_ACKNOWLEDGE, true);
-
- produceMessages();
-
- // consume messages and acknowledge them
- Message lastMessage = consumeMessages();
- lastMessage.acknowledge();
-
- causeFailure();
-
- assertFailoverException();
-
- produceMessages();
-
- // tests whether receiving and acknowledgment is working after recover
- lastMessage = consumeMessages();
- lastMessage.acknowledge();
- }
-
- /**
- * Test that receiving of messages after failover prior to calling
- * {@link Message#acknowledge()} still results in acknowledge throwing an exception.
- */
- public void testAcknowledgeAfterMessageReceivingAfterFailover() throws Exception
- {
- init(Session.CLIENT_ACKNOWLEDGE, true);
-
- produceMessages();
-
- // consume messages but do not acknowledge them
- consumeMessages();
- causeFailure();
-
- assertFailoverException();
-
- resendMessagesIfNecessary();
-
- // consume again on dirty session
- Message lastMessage = consumeMessages();
- try
- {
- // an implicit recover performed when acknowledge throws an exception due to failover
- lastMessage.acknowledge();
- fail("JMSException should be thrown");
- }
- catch (JMSException t)
- {
- // TODO: assert error code and/or expected exception type
- }
-
- // tests whether receiving and acknowledgment is working on a clean session
- lastMessage = consumeMessages();
- lastMessage.acknowledge();
- }
-
- /**
- * Tests that call to {@link Message#acknowledge()} after failover throws an exception in asynchronous consumer
- * and we can consume messages after acknowledge.
- */
- public void testAcknowledgeAfterFailoverForAsynchronousConsumer() throws Exception
- {
- init(Session.CLIENT_ACKNOWLEDGE, false);
- FailoverTestMessageListener ml = new FailoverTestMessageListener();
- _consumer.setMessageListener(ml);
- _connection.start();
-
- produceMessages();
-
- // wait for message receiving
- ml.awaitForEnd();
-
- assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
-
- // assert messages
- int counter = 0;
- Message currentMessage = null;
- for (Message message : ml.getReceivedMessages())
- {
- assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
- currentMessage = message;
- }
- ml.reset();
-
- causeFailure();
- assertFailoverException();
-
-
- try
- {
- currentMessage.acknowledge();
- fail("JMSException should be thrown!");
- }
- catch (JMSException e)
- {
- // TODO: assert error code and/or expected exception type
- }
-
- resendMessagesIfNecessary();
-
- // wait for message receiving
- ml.awaitForEnd();
-
- assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
-
- // assert messages
- counter = 0;
- for (Message message : ml.getReceivedMessages())
- {
- assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
- currentMessage = message;
- }
-
- // acknowledge again. It should be successful
- currentMessage.acknowledge();
- }
-
- /**
- * Test whether {@link Session#recover()} works as expected after failover
- * in AA mode.
- */
- public void testRecoverAfterFailoverInAutoAcknowledgeMode() throws Exception
- {
- init(Session.AUTO_ACKNOWLEDGE, true);
-
- produceMessages();
-
- // receive first message in order to start a dispatcher thread
- Message receivedMessage = _consumer.receive(1000l);
- assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
-
- causeFailure();
-
- assertFailoverException();
-
- _consumerSession.recover();
-
- resendMessagesIfNecessary();
-
- // tests whether receiving is working after recover
- consumeMessages();
- }
-
- public void testClientAcknowledgedSessionCloseAfterFailover() throws Exception
- {
- sessionCloseAfterFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
- }
-
- public void testTransactedSessionCloseAfterFailover() throws Exception
- {
- sessionCloseAfterFailoverImpl(Session.SESSION_TRANSACTED);
- }
-
- public void testAutoAcknowledgedSessionCloseAfterFailover() throws Exception
- {
- sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE);
- }
-
- public void testPublishAutoAcknowledgedWhileFailover() throws Exception
- {
- publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE);
- }
-
- public void testPublishClientAcknowledgedWhileFailover() throws Exception
- {
- Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE);
- receivedMessage.acknowledge();
- }
-
- public void testPublishTransactedAcknowledgedWhileFailover() throws Exception
- {
- publishWhileFailingOver(Session.SESSION_TRANSACTED);
- _consumerSession.commit();
- }
-
- public void testPublishAutoAcknowledgedWithFailoverMutex() throws Exception
- {
- publishWithFailoverMutex(Session.AUTO_ACKNOWLEDGE);
- }
-
- public void testPublishClientAcknowledgedWithFailoverMutex() throws Exception
- {
- publishWithFailoverMutex(Session.CLIENT_ACKNOWLEDGE);
-
- }
-
- public void testPublishTransactedAcknowledgedWithFailoverMutex() throws Exception
- {
- publishWithFailoverMutex(Session.SESSION_TRANSACTED);
- }
-
- public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception
- {
- sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
- }
-
- public void testTransactedSessionCloseWhileFailover() throws Exception
- {
- sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
- }
-
- public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception
- {
- sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
- }
-
- public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
- {
- browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
- }
-
- public void testTransactedQueueBrowserCloseWhileFailover() throws Exception
- {
- browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
- }
-
- public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
- {
- browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
- }
-
- public void testKillBrokerFailoverWhilstPublishingInFlight() throws Exception
- {
- doFailoverWhilstPublishingInFlight(true);
- }
-
- public void testStopBrokerFailoverWhilstPublishingInFlight() throws Exception
- {
- doFailoverWhilstPublishingInFlight(false);
- }
-
- private void doFailoverWhilstPublishingInFlight(boolean hardKill) throws Exception
- {
- init(Session.SESSION_TRANSACTED, false);
-
- final int numberOfMessages = 200;
-
- final CountDownLatch halfWay = new CountDownLatch(1);
- final CountDownLatch allDone = new CountDownLatch(1);
- final AtomicReference<Exception> exception = new AtomicReference<>();
-
- Runnable producerRunnable = new Runnable()
- {
- @Override
- public void run()
- {
- Thread.currentThread().setName("ProducingThread");
-
- try
- {
- for(int i=0; i< numberOfMessages; i++)
- {
- boolean success = false;
- while(!success)
- {
- try
- {
- Message message = _producerSession.createMessage();
- message.setIntProperty("msgNum", i);
- _producer.send(message);
- _producerSession.commit();
- success = true;
- }
- catch (javax.jms.IllegalStateException e)
- {
- // fail - failover should not leave a JMS object in an illegal state
- throw e;
- }
- catch (JMSException e)
- {
- // OK we will be failing over
- LOGGER.debug("Got JMS exception, probably just failing over", e);
- }
- }
-
- if (i > numberOfMessages / 2 && halfWay.getCount() == 1)
- {
- halfWay.countDown();
- }
- }
-
- allDone.countDown();
- }
- catch (Exception e)
- {
- exception.set(e);
- }
- }
- };
-
- Thread producerThread = new Thread(producerRunnable);
- producerThread.start();
-
- assertTrue("Didn't get to half way within timeout", halfWay.await(30000, TimeUnit.MILLISECONDS));
-
- if (hardKill)
- {
- LOGGER.debug("Killing the Broker");
- killDefaultBroker();
- }
- else
- {
- LOGGER.debug("Stopping the Broker");
- stopDefaultBroker();
- }
-
- if (exception.get() != null)
- {
- LOGGER.error("Unexpected exception from producer thread", exception.get());
- }
- assertNull("Producer thread should not have got an exception", exception.get());
-
- assertTrue("All producing work was not completed", allDone.await(30000, TimeUnit.MILLISECONDS));
-
- producerThread.join(30000);
-
- // Extra work to prove the session still okay
- assertNotNull(_producerSession.createTemporaryQueue());
- }
-
-
- private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException
- {
- setDelayedFailoverPolicy(5);
- init(autoAcknowledge, true);
-
- String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
- Message message = _producerSession.createTextMessage(text);
-
- failDefaultBroker();
-
- if(!_failoverStarted.await(5, TimeUnit.SECONDS))
- {
- fail("Did not receieve notification failover had started");
- }
-
- _producer.send(message);
-
- if (_producerSession.getTransacted())
- {
- _producerSession.commit();
- }
-
- Message receivedMessage = _consumer.receive(1000l);
- assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
- return receivedMessage;
- }
-
- private void publishWithFailoverMutex(int autoAcknowledge) throws JMSException, InterruptedException
- {
- setDelayedFailoverPolicy(5);
- init(autoAcknowledge, true);
-
- String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
- Message message = _producerSession.createTextMessage(text);
-
- AMQConnection connection = (AMQConnection)_connection;
-
- // holding failover mutex should prevent the failover from
- // proceeding before we try to send the message
- synchronized(connection.getFailoverMutex())
- {
- failDefaultBroker();
-
- // wait to make sure that connection is lost
- while(!connection.isFailingOver())
- {
- Thread.sleep(25l);
- }
-
- try
- {
- _producer.send(message);
- fail("Sending should fail because connection was lost and failover has not yet completed");
- }
- catch(JMSException e)
- {
- // JMSException is expected
- }
- }
- // wait for failover completion, thus ensuring it actually
- //got started, before allowing the test to tear down
- awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
- }
-
- /**
- * This test only tests 0-8/0-9/0-9-1 failover timeout
- */
- public void testFailoverHandlerTimeoutExpires() throws Exception
- {
- _connection.close();
- setTestSystemProperty("qpid.failover_method_timeout", "10000");
- AMQConnection connection = null;
- try
- {
- connection = createConnectionWithFailover();
-
- // holding failover mutex should prevent the failover from proceeding
- synchronized(connection.getFailoverMutex())
- {
- killDefaultBroker();
- startDefaultBroker();
-
- // sleep interval exceeds failover timeout interval
- Thread.sleep(11000l);
- }
-
- // allows the failover thread to proceed
- Thread.yield();
- assertFalse("Unexpected failover", _failoverComplete.await(2000l, TimeUnit.MILLISECONDS));
- assertTrue("Failover should not succeed due to timeout", connection.isClosed());
- }
- finally
- {
- if (connection != null)
- {
- connection.close();
- }
- }
- }
-
- public void testFailoverHandlerTimeoutReconnected() throws Exception
- {
- _connection.close();
- setTestSystemProperty("qpid.failover_method_timeout", "10000");
- AMQConnection connection = null;
- try
- {
- connection = createConnectionWithFailover();
-
- // holding failover mutex should prevent the failover from proceeding
- synchronized(connection.getFailoverMutex())
- {
- killDefaultBroker();
- startDefaultBroker();
- }
-
- // allows the failover thread to proceed
- Thread.yield();
- awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
- assertFalse("Failover should restore connectivity", connection.isClosed());
- }
- finally
- {
- if (connection != null)
- {
- connection.close();
- }
- }
- }
-
- /**
- * Tests that the producer flow control flag is reset when failover occurs while
- * the producers are being blocked by the broker.
- *
- * Uses Apache Qpid Broker-J specific queue configuration to enabled PSFC.
- */
- public void testFlowControlFlagResetOnFailover() throws Exception
- {
- // we do not need the connection failing to second broker
- _connection.close();
-
- // make sure that failover timeout is bigger than flow control timeout
- setTestSystemProperty("qpid.failover_method_timeout", "60000");
- setTestSystemProperty("qpid.flow_control_wait_failure", "10000");
-
- AMQConnection connection = null;
- try
- {
- connection = createConnectionWithFailover(Collections.singletonMap(ConnectionURL.OPTIONS_SYNC_PUBLISH, "all"));
-
- final Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
- final Queue queue = createAndBindQueueWithFlowControlEnabled(producerSession, getTestQueueName(), DEFAULT_MESSAGE_SIZE * 3, DEFAULT_MESSAGE_SIZE * 2);
- final AtomicInteger counter = new AtomicInteger();
- // try to send 5 messages (should block after 4)
- new Thread(new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- MessageProducer producer = producerSession.createProducer(queue);
- for (int i=0; i < 5; i++)
- {
- Message next = createNextMessage(producerSession, i);
- producer.send(next);
- producerSession.commit();
- counter.incrementAndGet();
- }
- }
- catch(Exception e)
- {
- // ignore
- }
- }
- }).start();
-
- long limit= 30000l;
- long start = System.currentTimeMillis();
-
- // wait until session is blocked
- while(!((AMQSession<?,?>)producerSession).isFlowBlocked() && System.currentTimeMillis() - start < limit)
- {
- Thread.sleep(100l);
- }
-
- assertTrue("Flow is not blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
-
- final int currentCounter = counter.get();
- assertTrue("Unexpected number of sent messages:" + currentCounter, currentCounter >=3);
-
- killDefaultBroker();
- startDefaultBroker();
-
- // allows the failover thread to proceed
- Thread.yield();
- awaitForFailoverCompletion(60000l);
-
- assertFalse("Flow is blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
- }
- finally
- {
- if (connection != null)
- {
- connection.close();
- }
- }
- }
-
- public void testFailoverWhenConnectionStopped() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
-
- produceMessages();
- _producerSession.commit();
-
- final CountDownLatch stopFlag = new CountDownLatch(1);
- final AtomicReference<Exception> exception = new AtomicReference<>();
- final CountDownLatch expectedMessageLatch = new CountDownLatch(_messageNumber);
- final AtomicInteger counter = new AtomicInteger();
-
- _consumer.setMessageListener(new MessageListener()
- {
- @Override
- public void onMessage(Message message)
- {
- if (stopFlag.getCount() == 1)
- {
- try
- {
- LOGGER.debug("Stopping connection from dispatcher thread");
- _connection.stop();
- LOGGER.debug("Connection stopped from dispatcher thread");
-
- }
- catch (Exception e)
- {
- exception.set(e);
- }
- finally
- {
- stopFlag.countDown();
-
- failDefaultBroker();
- }
-
- }
- else
- {
- try
- {
- _consumerSession.commit();
- counter.incrementAndGet();
- expectedMessageLatch.countDown();
- }
- catch (Exception e)
- {
- exception.set(e);
- }
- }
- }
- });
-
-
- boolean stopResult = stopFlag.await(2000, TimeUnit.MILLISECONDS);
- assertTrue("Connection was not stopped" + (exception.get() == null ? "." : ":" + exception.get().getMessage()),
- stopResult);
- assertNull("Unexpected exception on stop :" + exception.get(), exception.get());
-
- // wait for failover to complete
- awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
- assertFailoverException();
-
- resendMessagesIfNecessary();
- _producerSession.commit();
-
- _connection.start();
-
- assertTrue("Not all messages were delivered. Remaining message number " + expectedMessageLatch.getCount(), expectedMessageLatch.await(11000, TimeUnit.MILLISECONDS));
-
- Thread.sleep(500l);
- assertEquals("Unexpected messages recieved ", _messageNumber, counter.get());
-
- _connection.close();
- }
-
- public void testConnectionCloseInterruptsFailover() throws Exception
- {
- _connection.close();
-
- final AtomicBoolean failoverCompleted = new AtomicBoolean(false);
- final CountDownLatch failoverBegun = new CountDownLatch(1);
-
- AMQConnection connection = createConnectionWithFailover();
- connection.setConnectionListener(new ConnectionListener()
- {
- @Override
- public void bytesSent(final long count)
- {
- }
-
- @Override
- public void bytesReceived(final long count)
- {
- }
-
- @Override
- public boolean preFailover(final boolean redirect)
- {
- failoverBegun.countDown();
- LOGGER.info("Failover started");
- return true;
- }
-
- @Override
- public boolean preResubscribe()
- {
- return true;
- }
-
- @Override
- public void failoverComplete()
- {
- failoverCompleted.set(true);
- }
- });
-
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- assertNotNull("Session should be created", session);
- killDefaultBroker();
-
- boolean failingOver = failoverBegun.await(5000, TimeUnit.MILLISECONDS);
- assertTrue("Failover did not begin with a reasonable time", failingOver);
-
- // Failover will now be in flight
- connection.close();
- assertTrue("Failover policy is unexpectedly exhausted", connection.getFailoverPolicy().failoverAllowed());
- }
-
- private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
- {
- final Map<String, Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity", capacity);
- arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
- ((AMQSession<?, ?>) session).createQueue(queueName, false, true, false, arguments);
- Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true
- + "'&autodelete='" + false + "'");
- ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue);
- return queue;
- }
-
- private AMQConnection createConnectionWithFailover() throws NamingException, JMSException, URLSyntaxException
- {
- return createConnectionWithFailover(null);
- }
-
- private AMQConnection createConnectionWithFailover(Map<String,String> connectionOptions) throws NamingException, JMSException, URLSyntaxException
- {
- String retries = "200";
- String connectdelay = "1000";
- String cycleCount = "2";
-
- String newUrlFormat="amqp://username:password@clientid/test?brokerlist=" +
- "'tcp://%s:%s?retries='%s'&connectdelay='%s''&failover='singlebroker?cyclecount='%s''";
-
- String newUrl = String.format(newUrlFormat, "localhost", getDefaultAmqpPort(),
- retries, connectdelay, cycleCount);
-
- if (connectionOptions != null)
- {
- for (Map.Entry<String,String> option: connectionOptions.entrySet())
- {
- newUrl+= "&" + option.getKey() + "='" + option.getValue() + "'";
- }
- }
- ConnectionFactory connectionFactory = new AMQConnectionFactory(newUrl);
- AMQConnection connection = (AMQConnection) connectionFactory.createConnection("admin", "admin");
- connection.setConnectionListener(this);
- return connection;
- }
-
- /**
- * Tests {@link Session#close()} for session with given acknowledge mode
- * to ensure that close works after failover.
- *
- * @param acknowledgeMode session acknowledge mode
- * @throws JMSException
- */
- private void sessionCloseAfterFailoverImpl(int acknowledgeMode) throws JMSException
- {
- init(acknowledgeMode, true);
- produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
- if (acknowledgeMode == Session.SESSION_TRANSACTED)
- {
- _producerSession.commit();
- }
-
- // intentionally receive message but do not commit or acknowledge it in
- // case of transacted or CLIENT_ACK session
- Message receivedMessage = _consumer.receive(1000l);
- assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
-
- causeFailure();
-
- assertFailoverException();
-
- // for transacted/client_ack session
- // no exception should be thrown but transaction should be automatically
- // rolled back
- _consumerSession.close();
- }
-
- /**
- * A helper method to instantiate produce and consumer sessions, producer
- * and consumer.
- *
- * @param acknowledgeMode
- * acknowledge mode
- * @param startConnection
- * indicates whether connection should be started
- * @throws JMSException
- */
- private void init(int acknowledgeMode, boolean startConnection) throws JMSException
- {
- boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;
-
- _consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
- _destination = createDestination(_consumerSession);
- _consumer = _consumerSession.createConsumer(_destination);
-
- if (startConnection)
- {
- _connection.start();
- }
-
- _producerSession = _connection.createSession(isTransacted, acknowledgeMode);
- _producer = _producerSession.createProducer(_destination);
-
- }
-
- protected Destination createDestination(Session session) throws JMSException
- {
- return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
- }
-
- /**
- * Resends messages if reconnected to a non-clustered broker
- *
- * @throws JMSException
- */
- private void resendMessagesIfNecessary() throws JMSException
- {
- if (!CLUSTERED)
- {
- // assert that a new broker does not have messages on a queue
- if (_consumer.getMessageListener() == null)
- {
- Message message = _consumer.receive(100l);
- assertNull("Received a message after failover with non-clustered broker!", message);
- }
- // re-sending messages if reconnected to a non-clustered broker
- produceMessages(true);
- }
- }
-
- /**
- * Produces a default number of messages with default text content into test
- * queue
- *
- * @throws JMSException
- */
- private void produceMessages() throws JMSException
- {
- produceMessages(false);
- }
-
- private void produceMessages(boolean seperateProducer) throws JMSException
- {
- produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, seperateProducer);
- }
-
- /**
- * Consumes a default number of messages and asserts their content.
- *
- * @return last consumed message
- * @throws JMSException
- */
- private Message consumeMessages() throws JMSException
- {
- return consumeMessages(TEST_MESSAGE_FORMAT, _messageNumber);
- }
-
- /**
- * Produces given number of text messages with content matching given
- * content pattern
- *
- * @param messagePattern message content pattern
- * @param messageNumber number of messages to send
- * @param standaloneProducer whether to use the existing producer or a new one.
- * @throws JMSException
- */
- private void produceMessages(String messagePattern, int messageNumber, boolean standaloneProducer) throws JMSException
- {
- Session producerSession;
- MessageProducer producer;
-
- if(standaloneProducer)
- {
- producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
- producer = producerSession.createProducer(_destination);
- }
- else
- {
- producerSession = _producerSession;
- producer = _producer;
- }
-
- for (int i = 0; i < messageNumber; i++)
- {
- String text = MessageFormat.format(messagePattern, i);
- Message message = producerSession.createTextMessage(text);
- producer.send(message);
- LOGGER.debug("Test message number " + i + " produced with text = " + text + ", and JMSMessageID = " + message.getJMSMessageID());
- }
-
- if(standaloneProducer)
- {
- producerSession.commit();
- }
- }
-
- /**
- * Consumes given number of text messages and asserts that their content
- * matches given pattern
- *
- * @param messagePattern
- * messages content pattern
- * @param messageNumber
- * message number to received
- * @return last consumed message
- * @throws JMSException
- */
- private Message consumeMessages(String messagePattern, int messageNumber) throws JMSException
- {
- Message receivedMesssage = null;
- for (int i = 0; i < messageNumber; i++)
- {
- receivedMesssage = _consumer.receive(1000l);
- assertReceivedMessage(receivedMesssage, messagePattern, i);
- }
- return receivedMesssage;
- }
-
- /**
- * Asserts received message
- *
- * @param receivedMessage
- * received message
- * @param messagePattern
- * messages content pattern
- * @param messageIndex
- * message index
- */
- private void assertReceivedMessage(Message receivedMessage, String messagePattern, int messageIndex) throws JMSException
- {
- assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage);
- assertTrue("Failure to receive message [" + messageIndex + "], expected TextMessage but received "
- + receivedMessage, receivedMessage instanceof TextMessage);
- String expectedText = MessageFormat.format(messagePattern, messageIndex);
- String receivedText = null;
- try
- {
- receivedText = ((TextMessage) receivedMessage).getText();
- }
- catch (JMSException e)
- {
- fail("JMSException occured while getting message text:" + e.getMessage());
- }
- LOGGER.debug("Test message number " + messageIndex + " consumed with text = " + receivedText + ", and JMSMessageID = " + receivedMessage.getJMSMessageID());
- assertEquals("Failover is broken! Expected [" + expectedText + "] but got [" + receivedText + "]",
- expectedText, receivedText);
- }
-
- /**
- * Causes failover and waits till connection is re-established.
- */
- private void causeFailure()
- {
- causeFailure(DEFAULT_FAILOVER_TIME * 2);
- }
-
- /**
- * Causes failover by stopping broker and waits till
- * connection is re-established during given time interval.
- *
- * @param delay
- * time interval to wait for connection re-establishement
- */
- private void causeFailure(long delay)
- {
- failDefaultBroker();
-
- awaitForFailoverCompletion(delay);
- }
-
- private void awaitForFailoverCompletion(long delay)
- {
- LOGGER.info("Awaiting {} ms for failover completion..", delay);
- try
- {
- if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS))
- {
- fail("Failover did not complete");
- }
- }
- catch (InterruptedException e)
- {
- fail("Test was interrupted:" + e.getMessage());
- }
- }
-
- private void assertFailoverException()
- {
- // TODO: assert exception is received (once implemented)
- // along with error code and/or expected exception type
- }
-
-
- @Override
- public void onException(JMSException e)
- {
- _exceptionListenerException = e;
- }
-
- /**
- * Causes 1 second delay before reconnect in order to test whether JMS
- * methods block while failover is in progress
- */
- private static class DelayingFailoverPolicy extends FailoverPolicy
- {
-
- private CountDownLatch _suspendLatch;
- private long _delay;
-
- public DelayingFailoverPolicy(AMQConnection connection, long delay)
- {
- super(connection.getConnectionURL(), connection);
- _suspendLatch = new CountDownLatch(1);
- _delay = delay;
- }
-
- @Override
- public void attainedConnection()
- {
- try
- {
- _suspendLatch.await(_delay, TimeUnit.SECONDS);
- }
- catch (InterruptedException e)
- {
- // continue
- }
- super.attainedConnection();
- }
-
- }
-
-
- private class FailoverTestMessageListener implements MessageListener
- {
- // message counter
- private AtomicInteger _counter = new AtomicInteger();
-
- private List<Message> _receivedMessage = new ArrayList<Message>();
-
- private volatile CountDownLatch _endLatch;
-
- public FailoverTestMessageListener() throws JMSException
- {
- _endLatch = new CountDownLatch(1);
- }
-
- @Override
- public void onMessage(Message message)
- {
- _receivedMessage.add(message);
- if (_counter.incrementAndGet() % _messageNumber == 0)
- {
- _endLatch.countDown();
- }
- }
-
- public void reset()
- {
- _receivedMessage.clear();
- _endLatch = new CountDownLatch(1);
- _counter.set(0);
- }
-
- public List<Message> getReceivedMessages()
- {
- return _receivedMessage;
- }
-
- public Object awaitForEnd() throws InterruptedException
- {
- return _endLatch.await((long) _messageNumber, TimeUnit.SECONDS);
- }
-
- public int getMessageCounter()
- {
- return _counter.get();
- }
- }
-
- /**
- * Tests {@link Session#close()} for session with given acknowledge mode
- * to ensure that it blocks until failover implementation restores connection.
- *
- * @param acknowledgeMode session acknowledge mode
- * @throws JMSException
- */
- private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws Exception
- {
- initDelayedFailover(acknowledgeMode);
-
- // intentionally receive message but not commit or acknowledge it in
- // case of transacted or CLIENT_ACK session
- Message receivedMessage = _consumer.receive(1000l);
- assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
-
- failDefaultBroker();
-
- // wait until failover is started
- _failoverStarted.await(5, TimeUnit.SECONDS);
-
- // test whether session#close blocks while failover is in progress
- _consumerSession.close();
-
- assertTrue("Failover has not completed yet but session was closed", _failoverComplete.await(5, TimeUnit.SECONDS));
-
- assertFailoverException();
- }
-
- /**
- * A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing.
- *
- * @param acknowledgeMode session acknowledge mode
- * @return queue browser
- * @throws JMSException
- */
- private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException, QpidException
- {
- init(acknowledgeMode, false);
- _consumer.close();
- _connection.start();
-
- produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
- if (acknowledgeMode == Session.SESSION_TRANSACTED)
- {
- _producerSession.commit();
- }
- else
- {
- ((AMQSession)_producerSession).sync();
- }
-
- QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination);
- return browser;
- }
-
- /**
- * Tests {@link QueueBrowser#close()} for session with given acknowledge mode
- * to ensure that it blocks until failover implementation restores connection.
- *
- * @param acknowledgeMode session acknowledge mode
- * @throws JMSException
- */
- private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws Exception
- {
- QueueBrowser browser = prepareQueueBrowser(acknowledgeMode);
-
- @SuppressWarnings("unchecked")
- Enumeration<Message> messages = browser.getEnumeration();
- Message receivedMessage = (Message) messages.nextElement();
- assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
-
- failDefaultBroker();
-
- // wait until failover is started
- _failoverStarted.await(5, TimeUnit.SECONDS);
-
- browser.close();
-
- assertTrue("Failover has not completed yet but browser was closed", _failoverComplete.await(5, TimeUnit.SECONDS));
-
- assertFailoverException();
- }
-
- private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException
- {
- DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy();
- init(acknowledgeMode, true);
- produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
- if (acknowledgeMode == Session.SESSION_TRANSACTED)
- {
- _producerSession.commit();
- }
- return failoverPolicy;
- }
-
- private DelayingFailoverPolicy setDelayedFailoverPolicy()
- {
- return setDelayedFailoverPolicy(2);
- }
-
- private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay)
- {
- AMQConnection amqConnection = (AMQConnection) _connection;
- DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay);
- ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy);
- return failoverPolicy;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java b/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
deleted file mode 100644
index f78b9d2..0000000
--- a/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.failover;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.test.utils.BrokerHolder;
-import org.apache.qpid.test.utils.FailoverBaseCase;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-/*
- * we need to create 4 brokers:
- * 1st broker will be running in test JVM and will not have failover host (only tcp connection will established, amqp connection will be closed)
- * 2d broker will be spawn in separate JVM and should have a failover host (amqp connection should be established)
- * 3d broker will be spawn in separate JVM and should not have a failover host (only tcp connection will established, amqp connection will be closed)
- * 4d broker will be spawn in separate JVM and should have a failover host (amqp connection should be established)
- */
-public class MultipleBrokersFailoverTest extends QpidBrokerTestCase implements ConnectionListener
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(MultipleBrokersFailoverTest.class);
-
- private static final String FAILOVER_VIRTUAL_HOST = "failover";
- private static final String NON_FAILOVER_VIRTUAL_HOST = "nonfailover";
- private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
- private static final int FAILOVER_RETRIES = 0;
- private static final int FAILOVER_CONNECTDELAY = 0;
- private static final long FAILOVER_AWAIT_TIME = FailoverBaseCase.DEFAULT_FAILOVER_TIME;
- private static final int NUMBER_OF_BROKERS = 4;
-
- private BrokerHolder[] _brokerHolders;
- private String _connectionURL;
- private Connection _connection;
- private CountDownLatch _failoverComplete;
- private CountDownLatch _failoverStarted;
- private Session _consumerSession;
- private Destination _destination;
- private MessageConsumer _consumer;
- private Session _producerSession;
- private MessageProducer _producer;
-
- @Override
- public void startDefaultBroker()
- {
- // do not start the default broker for this test
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- _brokerHolders = new BrokerHolder[NUMBER_OF_BROKERS];
-
- // the test should connect to the second broker first and fail over to the forth broker
- // after unsuccessful try to establish the connection to the 3d broker
- for (int i = 0; i < NUMBER_OF_BROKERS; i++)
- {
- String host = null;
- if (i == 1 || i == NUMBER_OF_BROKERS - 1)
- {
- host = FAILOVER_VIRTUAL_HOST;
- }
- else
- {
- host = NON_FAILOVER_VIRTUAL_HOST;
- }
-
- BrokerHolder brokerHolder = createSpawnedBroker();
- createTestVirtualHostNode(brokerHolder, host, true);
- brokerHolder.start();
-
- _brokerHolders[i] = brokerHolder;
- }
-
- _connectionURL = generateUrlString(NUMBER_OF_BROKERS);
-
- _connection = getConnection(_connectionURL);
- ((AMQConnection) _connection).setConnectionListener(this);
- _failoverComplete = new CountDownLatch(1);
- _failoverStarted = new CountDownLatch(1);
- }
-
- private String generateUrlString(int numBrokers)
- {
- String baseString = "amqp://guest:guest@test/" + FAILOVER_VIRTUAL_HOST
- + "?&failover='roundrobin?cyclecount='1''&brokerlist='";
- StringBuffer buffer = new StringBuffer(baseString);
-
- for(int i = 0; i< numBrokers ; i++)
- {
- if(i != 0)
- {
- buffer.append(";");
- }
-
- String broker = String.format(BROKER_PORTION_FORMAT, _brokerHolders[i].getAmqpPort(),
- FAILOVER_CONNECTDELAY, FAILOVER_RETRIES);
- buffer.append(broker);
- }
- buffer.append("'");
-
- return buffer.toString();
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- super.tearDown();
- }
- finally
- {
- for (BrokerHolder broker : _brokerHolders)
- {
- stopBrokerSafely(broker);
- }
- }
- }
-
-
- public void testFailoverOnBrokerKill() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
- assertConnectionPort(_brokerHolders[1].getAmqpPort());
-
- assertSendReceive(0);
-
- _brokerHolders[1].kill();
-
- awaitForFailoverCompletion(FAILOVER_AWAIT_TIME);
- assertEquals("Failover did not start within " + FAILOVER_AWAIT_TIME + "ms.", 0, _failoverStarted.getCount());
-
- assertSendReceive(2);
- assertConnectionPort(_brokerHolders[NUMBER_OF_BROKERS - 1].getAmqpPort());
- }
-
- public void testFailoverOnBrokerStop() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
- assertConnectionPort(_brokerHolders[1].getAmqpPort());
-
- assertSendReceive(0);
-
- _brokerHolders[1].shutdown();
-
- awaitForFailoverCompletion(FAILOVER_AWAIT_TIME);
- assertEquals("Failover did not start within " + FAILOVER_AWAIT_TIME + "ms.", 0, _failoverStarted.getCount());
-
- assertSendReceive(1);
- assertConnectionPort(_brokerHolders[NUMBER_OF_BROKERS - 1].getAmqpPort());
- }
-
- private void assertConnectionPort(int brokerPort)
- {
- int connectionPort = ((AMQConnection)_connection).getActiveBrokerDetails().getPort();
- assertEquals("Unexpected broker port", brokerPort, connectionPort);
- }
-
- private void assertSendReceive(int index) throws JMSException
- {
- Message message = createNextMessage(_producerSession, index);
- _producer.send(message);
- if (_producerSession.getTransacted())
- {
- _producerSession.commit();
- }
- Message receivedMessage = _consumer.receive(1000l);
- assertReceivedMessage(receivedMessage, index);
- if (_consumerSession.getTransacted())
- {
- _consumerSession.commit();
- }
- }
-
- private void awaitForFailoverCompletion(long delay) throws Exception
- {
- LOGGER.info("Awaiting Failover completion..");
- if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS))
- {
- fail("Failover did not complete within " + delay + "ms.");
- }
- }
-
- private void assertReceivedMessage(Message receivedMessage, int messageIndex)
- {
- assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage);
- assertTrue(
- "Failure to receive message [" + messageIndex + "], expected TextMessage but received " + receivedMessage,
- receivedMessage instanceof TextMessage);
- }
-
- private void init(int acknowledgeMode, boolean startConnection) throws Exception
- {
- boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;
-
- _consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
- _destination = _consumerSession.createQueue(getTestQueueName());
- _consumer = _consumerSession.createConsumer(_destination);
-
- if (startConnection)
- {
- _connection.start();
- }
-
- _producerSession = _connection.createSession(isTransacted, acknowledgeMode);
- _producer = _producerSession.createProducer(_destination);
-
- }
-
- @Override
- public void bytesSent(long count)
- {
- }
-
- @Override
- public void bytesReceived(long count)
- {
- }
-
- @Override
- public boolean preFailover(boolean redirect)
- {
- _failoverStarted.countDown();
- return true;
- }
-
- @Override
- public boolean preResubscribe()
- {
- return true;
- }
-
- @Override
- public void failoverComplete()
- {
- _failoverComplete.countDown();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/systests/src/test/java/org/apache/qpid/server/failover/FailoverMethodTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/failover/FailoverMethodTest.java b/systests/src/test/java/org/apache/qpid/server/failover/FailoverMethodTest.java
deleted file mode 100644
index 1916664..0000000
--- a/systests/src/test/java/org/apache/qpid/server/failover/FailoverMethodTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.failover;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.AMQConnectionClosedException;
-import org.apache.qpid.AMQDisconnectedException;
-import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.util.SystemUtils;
-
-public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionListener
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(FailoverMethodTest.class);
- private CountDownLatch _failoverComplete = new CountDownLatch(1);
- private final int _freePortWithNoBroker = findFreePort();
-
- /**
- * Test that the round robin method has the correct delays.
- * The first connection will work but the localhost connection should fail but the duration it takes
- * to report the failure is what is being tested.
- *
- */
- public void testFailoverRoundRobinDelay() throws Exception
- {
- if (SystemUtils.isWindows())
- {
- //TODO Test requires redevelopment - timings/behaviour on windows mean it fails
- return;
- }
-
- //note: The first broker has no connect delay and the default 1 retry
- // while the tcp:localhost broker has 3 retries with a 2s connect delay
- String connectionString = "amqp://guest:guest@/test?brokerlist=" +
- "'tcp://localhost:" + getDefaultAmqpPort() +
- ";tcp://localhost:" + _freePortWithNoBroker + "?connectdelay='2000',retries='3''";
-
- AMQConnectionURL url = new AMQConnectionURL(connectionString);
-
- try
- {
- long start = System.currentTimeMillis();
- AMQConnection connection = new AMQConnection(url);
-
- connection.setExceptionListener(this);
-
- LOGGER.debug("Stopping broker");
- stopDefaultBroker();
- LOGGER.debug("Stopped broker");
-
- _failoverComplete.await(30, TimeUnit.SECONDS);
- assertEquals("failoverLatch was not decremented in given timeframe",
- 0, _failoverComplete.getCount());
-
- long end = System.currentTimeMillis();
-
- long duration = (end - start);
-
- //Failover should take more that 6 seconds.
- // 3 Retries
- // so VM Broker NoDelay 0 (Connect) NoDelay 0
- // then TCP NoDelay 0 Delay 1 Delay 2 Delay 3
- // so 3 delays of 2s in total for connection
- // as this is a tcp connection it will take 1second per connection to fail
- // so max time is 6seconds of delay plus 4 seconds of TCP Delay + 1 second of runtime. == 11 seconds
-
- // Ensure we actually had the delay
- assertTrue("Failover took less than 6 seconds", duration > 6000);
-
- // Ensure we don't have delays before initial connection and reconnection.
- // We allow 1 second for initial connection and failover logic on top of 6s of sleep.
- assertTrue("Failover took more than 11 seconds:(" + duration + ")", duration < 11000);
- }
- catch (QpidException e)
- {
- fail(e.getMessage());
- }
- }
-
- public void testFailoverSingleDelay() throws Exception
- {
- if (SystemUtils.isWindows())
- {
- //TODO Test requires redevelopment - timings/behaviour on windows mean it fails
- return;
- }
-
- String connectionString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:" + getDefaultAmqpPort() + "?connectdelay='2000',retries='3''";
-
- AMQConnectionURL url = new AMQConnectionURL(connectionString);
-
- try
- {
- long start = System.currentTimeMillis();
- AMQConnection connection = new AMQConnection(url);
-
- connection.setExceptionListener(this);
-
- LOGGER.debug("Stopping broker");
- stopDefaultBroker();
- LOGGER.debug("Stopped broker");
-
- _failoverComplete.await(30, TimeUnit.SECONDS);
- assertEquals("failoverLatch was not decremented in given timeframe",
- 0, _failoverComplete.getCount());
-
- long end = System.currentTimeMillis();
-
- long duration = (end - start);
-
- //Failover should take more that 6 seconds.
- // 3 Retries
- // so NoDelay 0 (Connect) NoDelay 0 Delay 1 Delay 2 Delay 3
- // so 3 delays of 2s in total for connection
- // so max time is 6 seconds of delay + 1 second of runtime. == 7 seconds
-
- // Ensure we actually had the delay
- assertTrue("Failover took less than 6 seconds", duration > 6000);
-
- // Ensure we don't have delays before initial connection and reconnection.
- // We allow 3 second for initial connection and failover logic on top of 6s of sleep.
- assertTrue("Failover took more than 9 seconds:(" + duration + ")", duration < 9000);
- }
- catch (QpidException e)
- {
- fail(e.getMessage());
- }
- }
-
-
- /**
- * Test that setting 'nofailover' as the failover policy does not result in
- * delays or connection attempts when the initial connection is lost.
- *
- * Test validates that there is a connection delay as required on initial
- * connection.
- */
- public void testNoFailover() throws Exception
- {
- if (SystemUtils.isWindows())
- {
- //TODO Test requires redevelopment - timings/behaviour on windows mean it fails
- return;
- }
-
- int CONNECT_DELAY = 2000;
- String connectionString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:" + getDefaultAmqpPort() + "?connectdelay='" + CONNECT_DELAY + "'," +
- "retries='3'',failover='nofailover'";
-
-
- AMQConnectionURL url = new AMQConnectionURL(connectionString);
-
- Thread brokerStart = null;
- try
- {
- //Kill initial broker
- stopDefaultBroker();
-
- //Create a thread to start the broker asynchronously
- brokerStart = new Thread(new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- //Wait before starting broker
- // The wait should allow at least 1 retries to fail before broker is ready
- Thread.sleep(750);
- startDefaultBroker();
- }
- catch (Exception e)
- {
- LOGGER.error("Exception whilst starting broker", e);
- }
- }
- });
-
- brokerStart.start();
- long start = System.currentTimeMillis();
- //Start the connection so it will use the retries
- AMQConnection connection = new AMQConnection(url);
-
- long end = System.currentTimeMillis();
- long duration = (end - start);
-
- // Check that we actually had a delay in connection
- assertTrue("Initial connection should be longer than 1 delay : " + CONNECT_DELAY + " <:(" + duration + ")", duration > CONNECT_DELAY);
-
-
- connection.setExceptionListener(this);
-
- //Ensure we collect the brokerStart thread
- brokerStart.join();
- brokerStart = null;
-
- start = System.currentTimeMillis();
-
- //Kill connection
- stopDefaultBroker();
-
- _failoverComplete.await(30, TimeUnit.SECONDS);
- assertEquals("failoverLatch was not decremented in given timeframe", 0, _failoverComplete.getCount());
-
- end = System.currentTimeMillis();
-
- duration = (end - start);
-
- // Notification of the connection failure should be very quick as we are denying the ability to failover.
- // It may not be as quick for Java profile tests so lets just make sure it is less than the connectiondelay
- // Occasionally it takes 1s so we have to set CONNECT_DELAY to be higher to take that in to account.
- assertTrue("Notification of the connection failure took was : " + CONNECT_DELAY + " >:(" + duration + ")", duration < CONNECT_DELAY);
- }
- catch (QpidException e)
- {
- fail(e.getMessage());
- }
- finally
- {
- // Guard against the case where the broker took too long to start
- // and the initial connection failed to be formed.
- if (brokerStart != null)
- {
- brokerStart.join();
- }
- }
- }
-
- @Override
- public void onException(JMSException e)
- {
- if (e.getLinkedException() instanceof AMQDisconnectedException || e.getLinkedException() instanceof AMQConnectionClosedException)
- {
- LOGGER.debug("Received AMQDisconnectedException");
- _failoverComplete.countDown();
- }
- else
- {
- LOGGER.error("Unexpected underlying exception", e.getLinkedException());
- }
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org