You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/21 10:11:12 UTC
[1/5] qpid-broker-j git commit: QPID-6933: [System Tests] Move AMQP
0-x client specific test failover to client suite
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 25f11ba27 -> b4ba6158f
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/systests/src/test/java/org/apache/qpid/test/client/failover/FailoverTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/client/failover/FailoverTest.java b/systests/src/test/java/org/apache/qpid/test/client/failover/FailoverTest.java
deleted file mode 100644
index ba0f4a8..0000000
--- a/systests/src/test/java/org/apache/qpid/test/client/failover/FailoverTest.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.test.client.failover;
-
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.BrokerHolder;
-import org.apache.qpid.test.utils.FailoverBaseCase;
-
-public class FailoverTest extends FailoverBaseCase
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(FailoverTest.class);
-
- private static final int DEFAULT_NUM_MESSAGES = 10;
- private static final int DEFAULT_SEED = 20080921;
- protected int numMessages = 0;
- protected Connection connection;
- private Session producerSession;
- private Queue queue;
- private MessageProducer producer;
- private Session consumerSession;
- private MessageConsumer consumer;
-
- private boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
- private int seed;
- private Random rand;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
-
- numMessages = Integer.getInteger("profile.failoverMsgCount", DEFAULT_NUM_MESSAGES);
- seed = Integer.getInteger("profile.failoverRandomSeed", DEFAULT_SEED);
- rand = new Random(seed);
-
- connection = getConnection();
- ((AMQConnection) connection).setConnectionListener(this);
- connection.start();
- }
-
- private void init(boolean transacted, int mode) throws Exception
- {
- consumerSession = connection.createSession(transacted, mode);
- queue = consumerSession.createQueue(getName()+System.currentTimeMillis());
- consumer = consumerSession.createConsumer(queue);
-
- producerSession = connection.createSession(transacted, mode);
- producer = producerSession.createProducer(queue);
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- connection.close();
- }
- catch (Exception e)
- {
-
- }
-
- try
- {
- _alternativeBroker.shutdown();
- }
- finally
- {
- super.tearDown();
- }
- }
-
- private void consumeMessages(int startIndex,int endIndex, boolean transacted) throws JMSException
- {
- Message msg;
- LOGGER.debug("**************** Receive (Start: " + startIndex + ", End:" + endIndex + ")***********************");
-
- for (int i = startIndex; i < endIndex; i++)
- {
- msg = consumer.receive(1000);
- assertNotNull("Message " + i + " was null!", msg);
-
- LOGGER.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
- LOGGER.debug("Received : " + ((TextMessage) msg).getText());
- LOGGER.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
-
- assertEquals("Invalid message order","message " + i, ((TextMessage) msg).getText());
-
- }
- LOGGER.debug("***********************************************************");
-
- if (transacted)
- {
- consumerSession.commit();
- }
- }
-
- private void sendMessages(int startIndex,int endIndex, boolean transacted) throws Exception
- {
- LOGGER.debug("**************** Send (Start: " + startIndex + ", End:" + endIndex + ")***********************");
-
- for (int i = startIndex; i < endIndex; i++)
- {
- producer.send(producerSession.createTextMessage("message " + i));
-
- LOGGER.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
- LOGGER.debug("Sending message"+i);
- LOGGER.debug("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
- }
-
- LOGGER.debug("***********************************************************");
-
- if (transacted)
- {
- producerSession.commit();
- }
- else
- {
- ((AMQSession<?, ?>)producerSession).sync();
- }
- }
-
- public void testP2PFailover() throws Exception
- {
- testP2PFailover(numMessages, true, true, false);
- }
-
- public void testP2PFailoverWithMessagesLeftToConsumeAndProduce() throws Exception
- {
- if (CLUSTERED)
- {
- testP2PFailover(numMessages, false, false, false);
- }
- }
-
- public void testP2PFailoverWithMessagesLeftToConsume() throws Exception
- {
- if (CLUSTERED)
- {
- testP2PFailover(numMessages, false, true, false);
- }
- }
-
- public void testP2PFailoverTransacted() throws Exception
- {
- testP2PFailover(numMessages, true, true, true);
- }
-
- public void testP2PFailoverTransactedWithMessagesLeftToConsumeAndProduce() throws Exception
- {
- // Currently the cluster does not support transactions that span a failover
- if (CLUSTERED)
- {
- testP2PFailover(numMessages, false, false, false);
- }
- }
- private void testP2PFailover(int totalMessages, boolean consumeAll, boolean produceAll , boolean transacted) throws Exception
- {
- init(transacted, Session.AUTO_ACKNOWLEDGE);
- runP2PFailover(getDefaultBroker(), totalMessages,consumeAll, produceAll , transacted);
- }
-
- private void runP2PFailover(BrokerHolder broker, int totalMessages, boolean consumeAll, boolean produceAll , boolean transacted) throws Exception
- {
- int toProduce = totalMessages;
-
- LOGGER.debug("===================================================================");
- LOGGER.debug("Total messages used for the test " + totalMessages + " messages");
- LOGGER.debug("===================================================================");
-
- if (!produceAll)
- {
- toProduce = totalMessages - rand.nextInt(totalMessages);
- }
-
- LOGGER.debug("==================");
- LOGGER.debug("Sending " + toProduce + " messages");
- LOGGER.debug("==================");
-
- sendMessages(0, toProduce, transacted);
-
- // Consume some messages
- int toConsume = toProduce;
- if (!consumeAll)
- {
- toConsume = toProduce - rand.nextInt(toProduce);
- }
-
- consumeMessages(0, toConsume, transacted);
-
- LOGGER.debug("==================");
- LOGGER.debug("Consuming " + toConsume + " messages");
- LOGGER.debug("==================");
-
- LOGGER.info("Failing over");
-
- causeFailure(broker, DEFAULT_FAILOVER_TIME);
-
- // Check that you produce and consume the rest of messages.
- LOGGER.debug("==================");
- LOGGER.debug("Sending " + (totalMessages-toProduce) + " messages");
- LOGGER.debug("==================");
-
- sendMessages(toProduce, totalMessages, transacted);
- consumeMessages(toConsume, totalMessages, transacted);
-
- LOGGER.debug("==================");
- LOGGER.debug("Consuming " + (totalMessages-toConsume) + " messages");
- LOGGER.debug("==================");
- }
-
- private void causeFailure(BrokerHolder broker, long delay)
- {
-
- failBroker(broker);
-
- LOGGER.info("Awaiting Failover completion");
- try
- {
- if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS))
- {
- fail("failover did not complete");
- }
- }
- catch (InterruptedException e)
- {
- //evil ignore IE.
- }
- }
-
- public void testClientAckFailover() throws Exception
- {
- init(false, Session.CLIENT_ACKNOWLEDGE);
- sendMessages(0,1, false);
- Message msg = consumer.receive();
- assertNotNull("Expected msgs not received", msg);
-
- causeFailure(getDefaultBroker(), DEFAULT_FAILOVER_TIME);
-
- Exception failure = null;
- try
- {
- msg.acknowledge();
- }
- catch (Exception e)
- {
- failure = e;
- }
- assertNotNull("Exception should be thrown", failure);
- }
-
- /**
- * The idea is to run a failover test in a loop by failing over
- * to the other broker each time.
- */
- public void testFailoverInALoop() throws Exception
- {
- if (!CLUSTERED)
- {
- return;
- }
-
- BrokerHolder currentBroker = getDefaultBroker();
- int iterations = Integer.getInteger("profile.failoverIterations", 3);
- LOGGER.debug("LQ: iterations {}", iterations);
- boolean useDefaultBroker = true;
- init(false, Session.AUTO_ACKNOWLEDGE);
- for (int i=0; i < iterations; i++)
- {
- LOGGER.debug("===================================================================");
- LOGGER.debug("Failover In a loop : iteration number " + i);
- LOGGER.debug("===================================================================");
-
- runP2PFailover(currentBroker, numMessages, false, false, false);
- restartBroker(currentBroker);
- if (useDefaultBroker)
- {
- currentBroker = _alternativeBroker;
- useDefaultBroker = false;
- }
- else
- {
- currentBroker = getDefaultBroker();
- useDefaultBroker = true;
- }
- }
- //To prevent any failover logic being initiated when we shutdown the brokers.
- connection.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/systests/src/test/java/org/apache/qpid/test/utils/FailoverBaseCase.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/systests/src/test/java/org/apache/qpid/test/utils/FailoverBaseCase.java
deleted file mode 100644
index b781058..0000000
--- a/systests/src/test/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.utils;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.naming.NamingException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.jms.ConnectionListener;
-
-public class FailoverBaseCase extends QpidBrokerTestCase implements ConnectionListener
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(FailoverBaseCase.class);
-
- public static final long DEFAULT_FAILOVER_TIME = Long.getLong("FailoverBaseCase.defaultFailoverTime", 10000L);
-
- protected CountDownLatch _failoverStarted;
- protected CountDownLatch _failoverComplete;
- protected BrokerHolder _alternativeBroker;
- protected int _port;
- protected int _alternativePort;
- private final List<Connection> _connections = new ArrayList<>();
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- _failoverComplete = new CountDownLatch(1);
- _failoverStarted = new CountDownLatch(1);
-
- _alternativeBroker = createSpawnedBroker();
- _alternativeBroker.start();
- _alternativePort = _alternativeBroker.getAmqpPort();
-
- _port = getDefaultBroker().getAmqpPort();
- setTestSystemProperty("test.port.alt", String.valueOf(_alternativePort));
- setTestSystemProperty("test.port", String.valueOf(_port));
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- try
- {
- for(Connection c : _connections)
- {
- try
- {
- c.close();
- }
- catch (JMSException e)
- {
- }
- }
- }
- finally
- {
- super.tearDown();
- }
- }
-
- /**
- * We are using failover factories
- *
- * @return a connection
- * @throws Exception
- */
- @Override
- public ConnectionFactory getConnectionFactory() throws NamingException
- {
- LOGGER.info("get ConnectionFactory");
- return getConnectionBuilder().setFailover(true)
- .setTls(Boolean.getBoolean("profile.use_ssl"))
- .buildConnectionFactory();
- }
-
- @Override
- public javax.jms.Connection getConnection() throws JMSException, NamingException
- {
- final Connection connection = getConnectionFactory().createConnection(GUEST_USERNAME, GUEST_PASSWORD);
- _connections.add(connection);
- return connection;
- }
-
- public void failDefaultBroker()
- {
- failBroker(getDefaultBroker());
- }
-
- public void failBroker(BrokerHolder broker)
- {
- try
- {
- //TODO: use killBroker instead
- broker.shutdown();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void restartBroker(BrokerHolder broker) throws Exception
- {
- broker.restart();
- }
-
- @Override
- public void bytesSent(long count)
- {
- }
-
- @Override
- public void bytesReceived(long count)
- {
- }
-
- @Override
- public boolean preFailover(boolean redirect)
- {
- _failoverStarted.countDown();
- return true;
- }
-
- @Override
- public boolean preResubscribe()
- {
- return true;
- }
-
- @Override
- public void failoverComplete()
- {
- _failoverComplete.countDown();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index ba925a1..5636a05 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -60,11 +60,6 @@ org.apache.qpid.test.client.ProducerFlowControlTest#*
//QPID-3986 : Flow control invoked on total store disk usage
org.apache.qpid.server.store.StoreOverfullTest#*
-// 0-8/0-9/0-9-1 and/or Qpid Broker-J specific failover tests related to the above Producer Flow Control mechanisms
-org.apache.qpid.client.failover.FailoverBehaviourTest#testFailoverHandlerTimeoutExpires
-org.apache.qpid.client.failover.FailoverBehaviourTest#testFlowControlFlagResetOnFailover
-org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFailoverHandlerTimeoutExpires
-org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFlowControlFlagResetOnFailover
// Excluded because plugins from Qpid Broker-J are not used in CPP broker
org.apache.qpid.server.virtualhost.plugin.*
@@ -83,9 +78,6 @@ org.apache.qpid.server.plugins.PluginTest#*
org.apache.qpid.server.SupportedProtocolVersionsTest#*
org.apache.qpid.server.stats.StatisticsReportingTest#*
-// QPID-3133: On 0-10, the exception listener is currently not invoked when reconnection fails to occurs.
-org.apache.qpid.server.failover.FailoverMethodTest#*
-
// passwd script is a Qpid Broker-J specific command line tool
org.apache.qpid.scripts.QpidPasswdTest#*
@@ -98,8 +90,6 @@ org.apache.qpid.systest.disttest.endtoend.*
org.apache.qpid.systest.rest.*
org.apache.qpid.systest.rest.acl.*
-// Exclude failover tests requiring virtual host functionality
-org.apache.qpid.client.failover.MultipleBrokersFailoverTest#*
// CPP Broker does not timeout connections with no activity like the Qpid Broker-J
org.apache.qpid.transport.ProtocolNegotiationTest#testNoProtocolHeaderSent_BrokerClosesConnection
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/test-profiles/ExternalBrokerTests
----------------------------------------------------------------------
diff --git a/test-profiles/ExternalBrokerTests b/test-profiles/ExternalBrokerTests
index 1abe0e6..969b927 100644
--- a/test-profiles/ExternalBrokerTests
+++ b/test-profiles/ExternalBrokerTests
@@ -17,12 +17,3 @@
// under the License.
//
-#These tests use external spawned brokers
-
-#=============================
-# In the java-mms.0-10 profile
-#=============================
-org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#*
-org.apache.qpid.client.failover.FailoverBehaviourTest#*
-org.apache.qpid.client.failover.MultipleBrokersFailoverTest#*
-org.apache.qpid.test.client.failover.FailoverTest#*
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/test-profiles/Java010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java010Excludes b/test-profiles/Java010Excludes
index dace6ea..30176f1 100755
--- a/test-profiles/Java010Excludes
+++ b/test-profiles/Java010Excludes
@@ -21,9 +21,6 @@
org.apache.qpid.systest.rest.BrokerRestTest#testSetCloseOnNoRoute
org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testUnsubscribeWhenUsingSelectorMakesTopicUnreachable
-// Exclude tests of connection URL failover method property
-org.apache.qpid.server.failover.FailoverMethodTest#*
-
// 0-10 and 0-9 connections dont generate the exact same logging due to protocol differences
org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartsFlowStopped
org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartConsumerFlowStarted
@@ -32,19 +29,8 @@ org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartConsumerFlowSt
org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#*
org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#*
-// These tests test the behaviour of 0-8..-0-9-1 specific system property qpid.failover_method_timeout
-org.apache.qpid.client.failover.FailoverBehaviourTest#testFailoverHandlerTimeoutExpires
-org.apache.qpid.client.failover.FailoverBehaviourTest#testFailoverHandlerTimeoutReconnected
-org.apache.qpid.client.failover.FailoverBehaviourTest#testFlowControlFlagResetOnFailover
-org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFailoverHandlerTimeoutExpires
-org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFailoverHandlerTimeoutReconnected
-org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFlowControlFlagResetOnFailover
-
org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNotExistLegacyBehaviour_08_091
-// QPID-6722: Race client side means that session close can end in exception when failover is in progress.
-org.apache.qpid.client.failover.FailoverBehaviourTest#testConnectionCloseInterruptsFailover
-org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testConnectionCloseInterruptsFailover
# Exclude the JMS 2.0 test suite
org.apache.qpid.systests.jms_2_0.*
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/test-profiles/Java10Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10Excludes b/test-profiles/Java10Excludes
index d52d70e..4b385c1 100644
--- a/test-profiles/Java10Excludes
+++ b/test-profiles/Java10Excludes
@@ -20,12 +20,6 @@
// Exclude client test of initial context factory, as the 1.0 profile uses the 1.0 context factory
org.apache.qpid.jndi.PropertiesFileInitialContextFactoryTest#*
-// Exclude Address based tests
-org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#*
-
-// Exclude tests of connection URL failover method property
-org.apache.qpid.server.failover.FailoverMethodTest#*
-
// Uses an 0-x client API to acknowledge up to a particular message rather than the most recent
org.apache.qpid.test.unit.ack.RecoverTest#testRecoverResendsMsgsAckOnEarlier
// Tests the effect of setting the prefetch value
@@ -50,13 +44,6 @@ org.apache.qpid.transport.ProtocolNegotiationTest#testProtocolNegotiationFromUns
// Tests are tests of the 0-x client behaviour
org.apache.qpid.test.client.ProducerFlowControlTest#*
-// Failover tests are tests of the 0-x client behaviour
-org.apache.qpid.client.failover.FailoverBehaviourTest#*
-org.apache.qpid.client.failover.MultipleBrokersFailoverTest#*
-org.apache.qpid.test.client.failover.FailoverTest#*
-// QPID-3133: On 0-10, the exception listener is currently not invoked when reconnection fails to occurs.
-org.apache.qpid.server.failover.FailoverMethodTest#*
-
// Message encryption not currently supported by the 1.0 client
org.apache.qpid.systest.messageencryption.MessageEncryptionTest#*
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/test-profiles/JavaPre010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/JavaPre010Excludes b/test-profiles/JavaPre010Excludes
index c89f812..538a93e 100644
--- a/test-profiles/JavaPre010Excludes
+++ b/test-profiles/JavaPre010Excludes
@@ -24,9 +24,6 @@
// These tests requires a broker capable of 0-8/0-9/0-9-1 and 0-10 concurrently
org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend
-// The new addressing based syntax is not supported for AMQP 0-8/0-9 versions
-org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#*
-
// Those tests are written against the 0.10 path
org.apache.qpid.client.SynchReceiveTest#testReceiveNoWait
org.apache.qpid.server.logging.ChannelLoggingTest#testChannelClosedOnExclusiveQueueDeclaredOnDifferentSession
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/test-profiles/JavaTransientExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/JavaTransientExcludes b/test-profiles/JavaTransientExcludes
index d4052de..f397a12 100644
--- a/test-profiles/JavaTransientExcludes
+++ b/test-profiles/JavaTransientExcludes
@@ -34,7 +34,3 @@ org.apache.qpid.systest.rest.VirtualHostRestTest#testMutateStateOfVirtualHostWit
org.apache.qpid.systest.rest.VirtualHostNodeRestTest#testCreateAndDeleteVirtualHostNode
-org.apache.qpid.client.failover.FailoverBehaviourTest#testFlowControlFlagResetOnFailover
-org.apache.qpid.client.failover.FailoverBehaviourTest#testFailoverHandlerTimeoutReconnected
-org.apache.qpid.server.failover.FailoverMethodTest#testNoFailover
-
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/5] qpid-broker-j git commit: QPID-6933: [System Tests] Move AMQP
0-x client specific test failover to client suite
Posted by kw...@apache.org.
QPID-6933: [System Tests] Move AMQP 0-x client specific test failover to client suite
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/588c65f7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/588c65f7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/588c65f7
Branch: refs/heads/master
Commit: 588c65f77406318c1884cf0aed37bf74f1f495ae
Parents: 25f11ba
Author: Keith Wall <kw...@apache.org>
Authored: Sun Jan 21 08:48:15 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Sun Jan 21 08:48:15 2018 +0000
----------------------------------------------------------------------
.../AddressBasedFailoverBehaviourTest.java | 34 -
.../client/failover/FailoverBehaviourTest.java | 1630 ------------------
.../failover/MultipleBrokersFailoverTest.java | 272 ---
.../server/failover/FailoverMethodTest.java | 271 ---
.../qpid/test/client/failover/FailoverTest.java | 325 ----
.../qpid/test/utils/FailoverBaseCase.java | 163 --
test-profiles/CPPExcludes | 10 -
test-profiles/ExternalBrokerTests | 9 -
test-profiles/Java010Excludes | 14 -
test-profiles/Java10Excludes | 13 -
test-profiles/JavaPre010Excludes | 3 -
test-profiles/JavaTransientExcludes | 4 -
12 files changed, 2748 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/systests/src/test/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java b/systests/src/test/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java
deleted file mode 100644
index 99fcbc5..0000000
--- a/systests/src/test/java/org/apache/qpid/client/failover/AddressBasedFailoverBehaviourTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.failover;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-public class AddressBasedFailoverBehaviourTest extends FailoverBehaviourTest
-{
- @Override
- protected Destination createDestination(Session session) throws JMSException
- {
- return session.createQueue("ADDR:" +getTestQueueName() + "_" + System.currentTimeMillis() + "; {create: always}");
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
deleted file mode 100644
index dd04d6d..0000000
--- a/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
+++ /dev/null
@@ -1,1630 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.client.failover;
-
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.TransactionRolledBackException;
-import javax.naming.NamingException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.BrokerDetails;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.FailoverPolicy;
-import org.apache.qpid.test.utils.FailoverBaseCase;
-import org.apache.qpid.url.URLSyntaxException;
-
-/**
- * Test suite to test all possible failover corner cases
- */
-public class FailoverBehaviourTest extends FailoverBaseCase implements ExceptionListener
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(FailoverBehaviourTest.class);
-
- private static final String TEST_MESSAGE_FORMAT = "test message {0}";
-
- /** Indicates whether tests are run against clustered broker */
- private static boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
-
- /** Default number of messages to send before failover */
- private static final int DEFAULT_NUMBER_OF_MESSAGES = 40;
-
- /** Actual number of messages to send before failover */
- protected int _messageNumber = Integer.getInteger("profile.failoverMsgCount", DEFAULT_NUMBER_OF_MESSAGES);
-
- /** Test connection */
- protected Connection _connection;
-
- /**
- * Consumer session
- */
- private Session _consumerSession;
-
- /**
- * Test destination
- */
- private Destination _destination;
-
- /**
- * Consumer
- */
- private MessageConsumer _consumer;
-
- /**
- * Producer session
- */
- private Session _producerSession;
-
- /**
- * Producer
- */
- private MessageProducer _producer;
-
- /**
- * Holds exception sent into {@link ExceptionListener} on failover
- */
- private JMSException _exceptionListenerException;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
-
- _connection = getConnection();
- _connection.setExceptionListener(this);
- ((AMQConnection) _connection).setConnectionListener(this);
- }
-
- /**
- * Test whether MessageProducer can successfully publish messages after
- * failover and rollback transaction
- */
- public void testMessageProducingAndRollbackAfterFailover() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
- produceMessages();
- causeFailure();
-
- assertFailoverException();
- // producer should be able to send messages after failover
- _producer.send(_producerSession.createTextMessage("test message " + _messageNumber));
-
- // rollback after failover
- _producerSession.rollback();
-
- // tests whether sending and committing is working after failover
- produceMessages();
- _producerSession.commit();
-
- // tests whether receiving and committing is working after failover
- consumeMessages();
- _consumerSession.commit();
- }
-
- /**
- * Test whether {@link TransactionRolledBackException} is thrown on commit
- * of dirty transacted session after failover.
- * <p>
- * Verifies whether second after failover commit is successful.
- */
- public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnProducingMessages() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
- produceMessages();
- causeFailure();
-
- assertFailoverException();
-
- // producer should be able to send messages after failover
- _producer.send(_producerSession.createTextMessage("test message " + _messageNumber));
-
- try
- {
- _producerSession.commit();
- fail("TransactionRolledBackException is expected on commit after failover with dirty session!");
- }
- catch (JMSException t)
- {
- assertTrue("Expected TransactionRolledBackException but thrown " + t,
- t instanceof TransactionRolledBackException);
- }
-
- // simulate process of user replaying the transaction
- produceMessages("replayed test message {0}", _messageNumber, false);
-
- // no exception should be thrown
- _producerSession.commit();
-
- // only messages sent after rollback should be received
- consumeMessages("replayed test message {0}", _messageNumber);
-
- // no exception should be thrown
- _consumerSession.commit();
- }
-
- /**
- * Tests JMSException is not thrown on commit with a clean session after
- * failover
- */
- public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanProducerSession() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
-
- causeFailure();
-
- assertFailoverException();
-
- // should not throw an exception for a clean session
- _producerSession.commit();
-
- // tests whether sending and committing is working after failover
- produceMessages();
- _producerSession.commit();
-
- // tests whether receiving and committing is working after failover
- consumeMessages();
- _consumerSession.commit();
- }
-
- /**
- * Tests {@link TransactionRolledBackException} is thrown on commit of dirty
- * transacted session after failover.
- * <p>
- * Verifies whether second after failover commit is successful.
- */
- public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnMessageReceiving() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
- produceMessages();
- _producerSession.commit();
-
- // receive messages but do not commit
- consumeMessages();
-
- causeFailure();
-
- assertFailoverException();
-
- try
- {
- // should throw TransactionRolledBackException
- _consumerSession.commit();
- fail("TransactionRolledBackException is expected on commit after failover");
- }
- catch (Exception t)
- {
- assertTrue("Expected TransactionRolledBackException but thrown " + t,
- t instanceof TransactionRolledBackException);
- }
-
- resendMessagesIfNecessary();
-
- // consume messages successfully
- consumeMessages();
- _consumerSession.commit();
- }
-
- /**
- * Tests JMSException is not thrown on commit with a clean session after failover
- */
- public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanConsumerSession() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
- produceMessages();
- _producerSession.commit();
-
- consumeMessages();
- _consumerSession.commit();
-
- causeFailure();
-
- assertFailoverException();
-
- // should not throw an exception with a clean consumer session
- _consumerSession.commit();
- }
-
- /**
- * Test that TransactionRolledBackException is thrown on commit of
- * dirty session in asynchronous consumer after failover.
- */
- public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnReceivingMessagesAsynchronously()
- throws Exception
- {
- init(Session.SESSION_TRANSACTED, false);
- FailoverTestMessageListener ml = new FailoverTestMessageListener();
- _consumer.setMessageListener(ml);
-
- _connection.start();
-
- produceMessages();
- _producerSession.commit();
-
- // wait for message receiving
- ml.awaitForEnd();
-
- assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
-
- // assert messages
- int counter = 0;
- for (Message message : ml.getReceivedMessages())
- {
- assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
- }
- ml.reset();
-
- causeFailure();
- assertFailoverException();
-
-
- try
- {
- _consumerSession.commit();
- fail("TransactionRolledBackException should be thrown!");
- }
- catch (TransactionRolledBackException e)
- {
- // that is what is expected
- }
-
- resendMessagesIfNecessary();
-
- // wait for message receiving
- ml.awaitForEnd();
-
- assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
-
- // assert messages
- counter = 0;
- for (Message message : ml.getReceivedMessages())
- {
- assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
- }
-
- // commit again. It should be successful
- _consumerSession.commit();
- }
-
- /**
- * Test that {@link Session#rollback()} does not throw exception after failover
- * and that we are able to consume messages.
- */
- public void testRollbackAfterFailover() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
-
- produceMessages();
- _producerSession.commit();
-
- consumeMessages();
-
- causeFailure();
-
- assertFailoverException();
-
- _consumerSession.rollback();
-
- resendMessagesIfNecessary();
-
- // tests whether receiving and committing is working after failover
- consumeMessages();
- _consumerSession.commit();
- }
-
- /**
- * Test that {@link Session#rollback()} does not throw exception after receiving further messages
- * after failover, and we can receive published messages after rollback.
- */
- public void testRollbackAfterReceivingAfterFailover() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
-
- produceMessages();
- _producerSession.commit();
-
- consumeMessages();
- causeFailure();
-
- assertFailoverException();
-
- resendMessagesIfNecessary();
-
- consumeMessages();
-
- _consumerSession.rollback();
-
- // tests whether receiving and committing is working after failover
- consumeMessages();
- _consumerSession.commit();
- }
-
- /**
- * Test that {@link Session#recover()} does not throw an exception after failover
- * and that we can consume messages after recover.
- */
- public void testRecoverAfterFailover() throws Exception
- {
- init(Session.CLIENT_ACKNOWLEDGE, true);
-
- produceMessages();
-
- // consume messages but do not acknowledge them
- consumeMessages();
-
- causeFailure();
-
- assertFailoverException();
-
- _consumerSession.recover();
-
- resendMessagesIfNecessary();
-
- // tests whether receiving and acknowledgment is working after recover
- Message lastMessage = consumeMessages();
- lastMessage.acknowledge();
- }
-
- /**
- * Test that receiving more messages after failover and then calling
- * {@link Session#recover()} does not throw an exception
- * and that we can consume messages after recover.
- */
- public void testRecoverWithConsumedMessagesAfterFailover() throws Exception
- {
- init(Session.CLIENT_ACKNOWLEDGE, true);
-
- produceMessages();
-
- // consume messages but do not acknowledge them
- consumeMessages();
-
- causeFailure();
-
- assertFailoverException();
-
- // publishing should work after failover
- resendMessagesIfNecessary();
-
- // consume messages again on a dirty session
- consumeMessages();
-
- // recover should successfully restore session
- _consumerSession.recover();
-
- // tests whether receiving and acknowledgment is working after recover
- Message lastMessage = consumeMessages();
- lastMessage.acknowledge();
- }
-
- /**
- * Test that first call to {@link Message#acknowledge()} after failover
- * throws a JMSEXception if session is dirty.
- */
- public void testAcknowledgeAfterFailover() throws Exception
- {
- init(Session.CLIENT_ACKNOWLEDGE, true);
-
- produceMessages();
-
- // consume messages but do not acknowledge them
- Message lastMessage = consumeMessages();
- causeFailure();
-
- assertFailoverException();
-
- try
- {
- // an implicit recover performed when acknowledge throws an exception due to failover
- lastMessage.acknowledge();
- fail("JMSException should be thrown");
- }
- catch (JMSException t)
- {
- // TODO: assert error code and/or expected exception type
- }
-
- resendMessagesIfNecessary();
-
- // tests whether receiving and acknowledgment is working after recover
- lastMessage = consumeMessages();
- lastMessage.acknowledge();
- }
-
- /**
- * Test that calling acknowledge before failover leaves the session
- * clean for use after failover.
- */
- public void testAcknowledgeBeforeFailover() throws Exception
- {
- init(Session.CLIENT_ACKNOWLEDGE, true);
-
- produceMessages();
-
- // consume messages and acknowledge them
- Message lastMessage = consumeMessages();
- lastMessage.acknowledge();
-
- causeFailure();
-
- assertFailoverException();
-
- produceMessages();
-
- // tests whether receiving and acknowledgment is working after recover
- lastMessage = consumeMessages();
- lastMessage.acknowledge();
- }
-
- /**
- * Test that receiving of messages after failover prior to calling
- * {@link Message#acknowledge()} still results in acknowledge throwing an exception.
- */
- public void testAcknowledgeAfterMessageReceivingAfterFailover() throws Exception
- {
- init(Session.CLIENT_ACKNOWLEDGE, true);
-
- produceMessages();
-
- // consume messages but do not acknowledge them
- consumeMessages();
- causeFailure();
-
- assertFailoverException();
-
- resendMessagesIfNecessary();
-
- // consume again on dirty session
- Message lastMessage = consumeMessages();
- try
- {
- // an implicit recover performed when acknowledge throws an exception due to failover
- lastMessage.acknowledge();
- fail("JMSException should be thrown");
- }
- catch (JMSException t)
- {
- // TODO: assert error code and/or expected exception type
- }
-
- // tests whether receiving and acknowledgment is working on a clean session
- lastMessage = consumeMessages();
- lastMessage.acknowledge();
- }
-
- /**
- * Tests that call to {@link Message#acknowledge()} after failover throws an exception in asynchronous consumer
- * and we can consume messages after acknowledge.
- */
- public void testAcknowledgeAfterFailoverForAsynchronousConsumer() throws Exception
- {
- init(Session.CLIENT_ACKNOWLEDGE, false);
- FailoverTestMessageListener ml = new FailoverTestMessageListener();
- _consumer.setMessageListener(ml);
- _connection.start();
-
- produceMessages();
-
- // wait for message receiving
- ml.awaitForEnd();
-
- assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
-
- // assert messages
- int counter = 0;
- Message currentMessage = null;
- for (Message message : ml.getReceivedMessages())
- {
- assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
- currentMessage = message;
- }
- ml.reset();
-
- causeFailure();
- assertFailoverException();
-
-
- try
- {
- currentMessage.acknowledge();
- fail("JMSException should be thrown!");
- }
- catch (JMSException e)
- {
- // TODO: assert error code and/or expected exception type
- }
-
- resendMessagesIfNecessary();
-
- // wait for message receiving
- ml.awaitForEnd();
-
- assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
-
- // assert messages
- counter = 0;
- for (Message message : ml.getReceivedMessages())
- {
- assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
- currentMessage = message;
- }
-
- // acknowledge again. It should be successful
- currentMessage.acknowledge();
- }
-
- /**
- * Test whether {@link Session#recover()} works as expected after failover
- * in AA mode.
- */
- public void testRecoverAfterFailoverInAutoAcknowledgeMode() throws Exception
- {
- init(Session.AUTO_ACKNOWLEDGE, true);
-
- produceMessages();
-
- // receive first message in order to start a dispatcher thread
- Message receivedMessage = _consumer.receive(1000l);
- assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
-
- causeFailure();
-
- assertFailoverException();
-
- _consumerSession.recover();
-
- resendMessagesIfNecessary();
-
- // tests whether receiving is working after recover
- consumeMessages();
- }
-
- public void testClientAcknowledgedSessionCloseAfterFailover() throws Exception
- {
- sessionCloseAfterFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
- }
-
- public void testTransactedSessionCloseAfterFailover() throws Exception
- {
- sessionCloseAfterFailoverImpl(Session.SESSION_TRANSACTED);
- }
-
- public void testAutoAcknowledgedSessionCloseAfterFailover() throws Exception
- {
- sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE);
- }
-
- public void testPublishAutoAcknowledgedWhileFailover() throws Exception
- {
- publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE);
- }
-
- public void testPublishClientAcknowledgedWhileFailover() throws Exception
- {
- Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE);
- receivedMessage.acknowledge();
- }
-
- public void testPublishTransactedAcknowledgedWhileFailover() throws Exception
- {
- publishWhileFailingOver(Session.SESSION_TRANSACTED);
- _consumerSession.commit();
- }
-
- public void testPublishAutoAcknowledgedWithFailoverMutex() throws Exception
- {
- publishWithFailoverMutex(Session.AUTO_ACKNOWLEDGE);
- }
-
- public void testPublishClientAcknowledgedWithFailoverMutex() throws Exception
- {
- publishWithFailoverMutex(Session.CLIENT_ACKNOWLEDGE);
-
- }
-
- public void testPublishTransactedAcknowledgedWithFailoverMutex() throws Exception
- {
- publishWithFailoverMutex(Session.SESSION_TRANSACTED);
- }
-
- public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception
- {
- sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
- }
-
- public void testTransactedSessionCloseWhileFailover() throws Exception
- {
- sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
- }
-
- public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception
- {
- sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
- }
-
- public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
- {
- browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
- }
-
- public void testTransactedQueueBrowserCloseWhileFailover() throws Exception
- {
- browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
- }
-
- public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
- {
- browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
- }
-
- public void testKillBrokerFailoverWhilstPublishingInFlight() throws Exception
- {
- doFailoverWhilstPublishingInFlight(true);
- }
-
- public void testStopBrokerFailoverWhilstPublishingInFlight() throws Exception
- {
- doFailoverWhilstPublishingInFlight(false);
- }
-
- private void doFailoverWhilstPublishingInFlight(boolean hardKill) throws Exception
- {
- init(Session.SESSION_TRANSACTED, false);
-
- final int numberOfMessages = 200;
-
- final CountDownLatch halfWay = new CountDownLatch(1);
- final CountDownLatch allDone = new CountDownLatch(1);
- final AtomicReference<Exception> exception = new AtomicReference<>();
-
- Runnable producerRunnable = new Runnable()
- {
- @Override
- public void run()
- {
- Thread.currentThread().setName("ProducingThread");
-
- try
- {
- for(int i=0; i< numberOfMessages; i++)
- {
- boolean success = false;
- while(!success)
- {
- try
- {
- Message message = _producerSession.createMessage();
- message.setIntProperty("msgNum", i);
- _producer.send(message);
- _producerSession.commit();
- success = true;
- }
- catch (javax.jms.IllegalStateException e)
- {
- // fail - failover should not leave a JMS object in an illegal state
- throw e;
- }
- catch (JMSException e)
- {
- // OK we will be failing over
- LOGGER.debug("Got JMS exception, probably just failing over", e);
- }
- }
-
- if (i > numberOfMessages / 2 && halfWay.getCount() == 1)
- {
- halfWay.countDown();
- }
- }
-
- allDone.countDown();
- }
- catch (Exception e)
- {
- exception.set(e);
- }
- }
- };
-
- Thread producerThread = new Thread(producerRunnable);
- producerThread.start();
-
- assertTrue("Didn't get to half way within timeout", halfWay.await(30000, TimeUnit.MILLISECONDS));
-
- if (hardKill)
- {
- LOGGER.debug("Killing the Broker");
- killDefaultBroker();
- }
- else
- {
- LOGGER.debug("Stopping the Broker");
- stopDefaultBroker();
- }
-
- if (exception.get() != null)
- {
- LOGGER.error("Unexpected exception from producer thread", exception.get());
- }
- assertNull("Producer thread should not have got an exception", exception.get());
-
- assertTrue("All producing work was not completed", allDone.await(30000, TimeUnit.MILLISECONDS));
-
- producerThread.join(30000);
-
- // Extra work to prove the session still okay
- assertNotNull(_producerSession.createTemporaryQueue());
- }
-
-
- private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException
- {
- setDelayedFailoverPolicy(5);
- init(autoAcknowledge, true);
-
- String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
- Message message = _producerSession.createTextMessage(text);
-
- failDefaultBroker();
-
- if(!_failoverStarted.await(5, TimeUnit.SECONDS))
- {
- fail("Did not receieve notification failover had started");
- }
-
- _producer.send(message);
-
- if (_producerSession.getTransacted())
- {
- _producerSession.commit();
- }
-
- Message receivedMessage = _consumer.receive(1000l);
- assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
- return receivedMessage;
- }
-
- private void publishWithFailoverMutex(int autoAcknowledge) throws JMSException, InterruptedException
- {
- setDelayedFailoverPolicy(5);
- init(autoAcknowledge, true);
-
- String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
- Message message = _producerSession.createTextMessage(text);
-
- AMQConnection connection = (AMQConnection)_connection;
-
- // holding failover mutex should prevent the failover from
- // proceeding before we try to send the message
- synchronized(connection.getFailoverMutex())
- {
- failDefaultBroker();
-
- // wait to make sure that connection is lost
- while(!connection.isFailingOver())
- {
- Thread.sleep(25l);
- }
-
- try
- {
- _producer.send(message);
- fail("Sending should fail because connection was lost and failover has not yet completed");
- }
- catch(JMSException e)
- {
- // JMSException is expected
- }
- }
- // wait for failover completion, thus ensuring it actually
- //got started, before allowing the test to tear down
- awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
- }
-
- /**
- * This test only tests 0-8/0-9/0-9-1 failover timeout
- */
- public void testFailoverHandlerTimeoutExpires() throws Exception
- {
- _connection.close();
- setTestSystemProperty("qpid.failover_method_timeout", "10000");
- AMQConnection connection = null;
- try
- {
- connection = createConnectionWithFailover();
-
- // holding failover mutex should prevent the failover from proceeding
- synchronized(connection.getFailoverMutex())
- {
- killDefaultBroker();
- startDefaultBroker();
-
- // sleep interval exceeds failover timeout interval
- Thread.sleep(11000l);
- }
-
- // allows the failover thread to proceed
- Thread.yield();
- assertFalse("Unexpected failover", _failoverComplete.await(2000l, TimeUnit.MILLISECONDS));
- assertTrue("Failover should not succeed due to timeout", connection.isClosed());
- }
- finally
- {
- if (connection != null)
- {
- connection.close();
- }
- }
- }
-
- public void testFailoverHandlerTimeoutReconnected() throws Exception
- {
- _connection.close();
- setTestSystemProperty("qpid.failover_method_timeout", "10000");
- AMQConnection connection = null;
- try
- {
- connection = createConnectionWithFailover();
-
- // holding failover mutex should prevent the failover from proceeding
- synchronized(connection.getFailoverMutex())
- {
- killDefaultBroker();
- startDefaultBroker();
- }
-
- // allows the failover thread to proceed
- Thread.yield();
- awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
- assertFalse("Failover should restore connectivity", connection.isClosed());
- }
- finally
- {
- if (connection != null)
- {
- connection.close();
- }
- }
- }
-
- /**
- * Tests that the producer flow control flag is reset when failover occurs while
- * the producers are being blocked by the broker.
- *
- * Uses Apache Qpid Broker-J specific queue configuration to enabled PSFC.
- */
- public void testFlowControlFlagResetOnFailover() throws Exception
- {
- // we do not need the connection failing to second broker
- _connection.close();
-
- // make sure that failover timeout is bigger than flow control timeout
- setTestSystemProperty("qpid.failover_method_timeout", "60000");
- setTestSystemProperty("qpid.flow_control_wait_failure", "10000");
-
- AMQConnection connection = null;
- try
- {
- connection = createConnectionWithFailover(Collections.singletonMap(ConnectionURL.OPTIONS_SYNC_PUBLISH, "all"));
-
- final Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
- final Queue queue = createAndBindQueueWithFlowControlEnabled(producerSession, getTestQueueName(), DEFAULT_MESSAGE_SIZE * 3, DEFAULT_MESSAGE_SIZE * 2);
- final AtomicInteger counter = new AtomicInteger();
- // try to send 5 messages (should block after 4)
- new Thread(new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- MessageProducer producer = producerSession.createProducer(queue);
- for (int i=0; i < 5; i++)
- {
- Message next = createNextMessage(producerSession, i);
- producer.send(next);
- producerSession.commit();
- counter.incrementAndGet();
- }
- }
- catch(Exception e)
- {
- // ignore
- }
- }
- }).start();
-
- long limit= 30000l;
- long start = System.currentTimeMillis();
-
- // wait until session is blocked
- while(!((AMQSession<?,?>)producerSession).isFlowBlocked() && System.currentTimeMillis() - start < limit)
- {
- Thread.sleep(100l);
- }
-
- assertTrue("Flow is not blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
-
- final int currentCounter = counter.get();
- assertTrue("Unexpected number of sent messages:" + currentCounter, currentCounter >=3);
-
- killDefaultBroker();
- startDefaultBroker();
-
- // allows the failover thread to proceed
- Thread.yield();
- awaitForFailoverCompletion(60000l);
-
- assertFalse("Flow is blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
- }
- finally
- {
- if (connection != null)
- {
- connection.close();
- }
- }
- }
-
- public void testFailoverWhenConnectionStopped() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
-
- produceMessages();
- _producerSession.commit();
-
- final CountDownLatch stopFlag = new CountDownLatch(1);
- final AtomicReference<Exception> exception = new AtomicReference<>();
- final CountDownLatch expectedMessageLatch = new CountDownLatch(_messageNumber);
- final AtomicInteger counter = new AtomicInteger();
-
- _consumer.setMessageListener(new MessageListener()
- {
- @Override
- public void onMessage(Message message)
- {
- if (stopFlag.getCount() == 1)
- {
- try
- {
- LOGGER.debug("Stopping connection from dispatcher thread");
- _connection.stop();
- LOGGER.debug("Connection stopped from dispatcher thread");
-
- }
- catch (Exception e)
- {
- exception.set(e);
- }
- finally
- {
- stopFlag.countDown();
-
- failDefaultBroker();
- }
-
- }
- else
- {
- try
- {
- _consumerSession.commit();
- counter.incrementAndGet();
- expectedMessageLatch.countDown();
- }
- catch (Exception e)
- {
- exception.set(e);
- }
- }
- }
- });
-
-
- boolean stopResult = stopFlag.await(2000, TimeUnit.MILLISECONDS);
- assertTrue("Connection was not stopped" + (exception.get() == null ? "." : ":" + exception.get().getMessage()),
- stopResult);
- assertNull("Unexpected exception on stop :" + exception.get(), exception.get());
-
- // wait for failover to complete
- awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
- assertFailoverException();
-
- resendMessagesIfNecessary();
- _producerSession.commit();
-
- _connection.start();
-
- assertTrue("Not all messages were delivered. Remaining message number " + expectedMessageLatch.getCount(), expectedMessageLatch.await(11000, TimeUnit.MILLISECONDS));
-
- Thread.sleep(500l);
- assertEquals("Unexpected messages recieved ", _messageNumber, counter.get());
-
- _connection.close();
- }
-
- public void testConnectionCloseInterruptsFailover() throws Exception
- {
- _connection.close();
-
- final AtomicBoolean failoverCompleted = new AtomicBoolean(false);
- final CountDownLatch failoverBegun = new CountDownLatch(1);
-
- AMQConnection connection = createConnectionWithFailover();
- connection.setConnectionListener(new ConnectionListener()
- {
- @Override
- public void bytesSent(final long count)
- {
- }
-
- @Override
- public void bytesReceived(final long count)
- {
- }
-
- @Override
- public boolean preFailover(final boolean redirect)
- {
- failoverBegun.countDown();
- LOGGER.info("Failover started");
- return true;
- }
-
- @Override
- public boolean preResubscribe()
- {
- return true;
- }
-
- @Override
- public void failoverComplete()
- {
- failoverCompleted.set(true);
- }
- });
-
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- assertNotNull("Session should be created", session);
- killDefaultBroker();
-
- boolean failingOver = failoverBegun.await(5000, TimeUnit.MILLISECONDS);
- assertTrue("Failover did not begin with a reasonable time", failingOver);
-
- // Failover will now be in flight
- connection.close();
- assertTrue("Failover policy is unexpectedly exhausted", connection.getFailoverPolicy().failoverAllowed());
- }
-
- private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
- {
- final Map<String, Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity", capacity);
- arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
- ((AMQSession<?, ?>) session).createQueue(queueName, false, true, false, arguments);
- Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true
- + "'&autodelete='" + false + "'");
- ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue);
- return queue;
- }
-
- private AMQConnection createConnectionWithFailover() throws NamingException, JMSException, URLSyntaxException
- {
- return createConnectionWithFailover(null);
- }
-
- private AMQConnection createConnectionWithFailover(Map<String,String> connectionOptions) throws NamingException, JMSException, URLSyntaxException
- {
- String retries = "200";
- String connectdelay = "1000";
- String cycleCount = "2";
-
- String newUrlFormat="amqp://username:password@clientid/test?brokerlist=" +
- "'tcp://%s:%s?retries='%s'&connectdelay='%s''&failover='singlebroker?cyclecount='%s''";
-
- String newUrl = String.format(newUrlFormat, "localhost", getDefaultAmqpPort(),
- retries, connectdelay, cycleCount);
-
- if (connectionOptions != null)
- {
- for (Map.Entry<String,String> option: connectionOptions.entrySet())
- {
- newUrl+= "&" + option.getKey() + "='" + option.getValue() + "'";
- }
- }
- ConnectionFactory connectionFactory = new AMQConnectionFactory(newUrl);
- AMQConnection connection = (AMQConnection) connectionFactory.createConnection("admin", "admin");
- connection.setConnectionListener(this);
- return connection;
- }
-
- /**
- * Tests {@link Session#close()} for session with given acknowledge mode
- * to ensure that close works after failover.
- *
- * @param acknowledgeMode session acknowledge mode
- * @throws JMSException
- */
- private void sessionCloseAfterFailoverImpl(int acknowledgeMode) throws JMSException
- {
- init(acknowledgeMode, true);
- produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
- if (acknowledgeMode == Session.SESSION_TRANSACTED)
- {
- _producerSession.commit();
- }
-
- // intentionally receive message but do not commit or acknowledge it in
- // case of transacted or CLIENT_ACK session
- Message receivedMessage = _consumer.receive(1000l);
- assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
-
- causeFailure();
-
- assertFailoverException();
-
- // for transacted/client_ack session
- // no exception should be thrown but transaction should be automatically
- // rolled back
- _consumerSession.close();
- }
-
- /**
- * A helper method to instantiate produce and consumer sessions, producer
- * and consumer.
- *
- * @param acknowledgeMode
- * acknowledge mode
- * @param startConnection
- * indicates whether connection should be started
- * @throws JMSException
- */
- private void init(int acknowledgeMode, boolean startConnection) throws JMSException
- {
- boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;
-
- _consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
- _destination = createDestination(_consumerSession);
- _consumer = _consumerSession.createConsumer(_destination);
-
- if (startConnection)
- {
- _connection.start();
- }
-
- _producerSession = _connection.createSession(isTransacted, acknowledgeMode);
- _producer = _producerSession.createProducer(_destination);
-
- }
-
- protected Destination createDestination(Session session) throws JMSException
- {
- return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
- }
-
- /**
- * Resends messages if reconnected to a non-clustered broker
- *
- * @throws JMSException
- */
- private void resendMessagesIfNecessary() throws JMSException
- {
- if (!CLUSTERED)
- {
- // assert that a new broker does not have messages on a queue
- if (_consumer.getMessageListener() == null)
- {
- Message message = _consumer.receive(100l);
- assertNull("Received a message after failover with non-clustered broker!", message);
- }
- // re-sending messages if reconnected to a non-clustered broker
- produceMessages(true);
- }
- }
-
- /**
- * Produces a default number of messages with default text content into test
- * queue
- *
- * @throws JMSException
- */
- private void produceMessages() throws JMSException
- {
- produceMessages(false);
- }
-
- private void produceMessages(boolean seperateProducer) throws JMSException
- {
- produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, seperateProducer);
- }
-
- /**
- * Consumes a default number of messages and asserts their content.
- *
- * @return last consumed message
- * @throws JMSException
- */
- private Message consumeMessages() throws JMSException
- {
- return consumeMessages(TEST_MESSAGE_FORMAT, _messageNumber);
- }
-
- /**
- * Produces given number of text messages with content matching given
- * content pattern
- *
- * @param messagePattern message content pattern
- * @param messageNumber number of messages to send
- * @param standaloneProducer whether to use the existing producer or a new one.
- * @throws JMSException
- */
- private void produceMessages(String messagePattern, int messageNumber, boolean standaloneProducer) throws JMSException
- {
- Session producerSession;
- MessageProducer producer;
-
- if(standaloneProducer)
- {
- producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
- producer = producerSession.createProducer(_destination);
- }
- else
- {
- producerSession = _producerSession;
- producer = _producer;
- }
-
- for (int i = 0; i < messageNumber; i++)
- {
- String text = MessageFormat.format(messagePattern, i);
- Message message = producerSession.createTextMessage(text);
- producer.send(message);
- LOGGER.debug("Test message number " + i + " produced with text = " + text + ", and JMSMessageID = " + message.getJMSMessageID());
- }
-
- if(standaloneProducer)
- {
- producerSession.commit();
- }
- }
-
- /**
- * Consumes given number of text messages and asserts that their content
- * matches given pattern
- *
- * @param messagePattern
- * messages content pattern
- * @param messageNumber
- * message number to received
- * @return last consumed message
- * @throws JMSException
- */
- private Message consumeMessages(String messagePattern, int messageNumber) throws JMSException
- {
- Message receivedMesssage = null;
- for (int i = 0; i < messageNumber; i++)
- {
- receivedMesssage = _consumer.receive(1000l);
- assertReceivedMessage(receivedMesssage, messagePattern, i);
- }
- return receivedMesssage;
- }
-
- /**
- * Asserts received message
- *
- * @param receivedMessage
- * received message
- * @param messagePattern
- * messages content pattern
- * @param messageIndex
- * message index
- */
- private void assertReceivedMessage(Message receivedMessage, String messagePattern, int messageIndex) throws JMSException
- {
- assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage);
- assertTrue("Failure to receive message [" + messageIndex + "], expected TextMessage but received "
- + receivedMessage, receivedMessage instanceof TextMessage);
- String expectedText = MessageFormat.format(messagePattern, messageIndex);
- String receivedText = null;
- try
- {
- receivedText = ((TextMessage) receivedMessage).getText();
- }
- catch (JMSException e)
- {
- fail("JMSException occured while getting message text:" + e.getMessage());
- }
- LOGGER.debug("Test message number " + messageIndex + " consumed with text = " + receivedText + ", and JMSMessageID = " + receivedMessage.getJMSMessageID());
- assertEquals("Failover is broken! Expected [" + expectedText + "] but got [" + receivedText + "]",
- expectedText, receivedText);
- }
-
- /**
- * Causes failover and waits till connection is re-established.
- */
- private void causeFailure()
- {
- causeFailure(DEFAULT_FAILOVER_TIME * 2);
- }
-
- /**
- * Causes failover by stopping broker and waits till
- * connection is re-established during given time interval.
- *
- * @param delay
- * time interval to wait for connection re-establishement
- */
- private void causeFailure(long delay)
- {
- failDefaultBroker();
-
- awaitForFailoverCompletion(delay);
- }
-
- private void awaitForFailoverCompletion(long delay)
- {
- LOGGER.info("Awaiting {} ms for failover completion..", delay);
- try
- {
- if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS))
- {
- fail("Failover did not complete");
- }
- }
- catch (InterruptedException e)
- {
- fail("Test was interrupted:" + e.getMessage());
- }
- }
-
- private void assertFailoverException()
- {
- // TODO: assert exception is received (once implemented)
- // along with error code and/or expected exception type
- }
-
-
- @Override
- public void onException(JMSException e)
- {
- _exceptionListenerException = e;
- }
-
- /**
- * Causes 1 second delay before reconnect in order to test whether JMS
- * methods block while failover is in progress
- */
- private static class DelayingFailoverPolicy extends FailoverPolicy
- {
-
- private CountDownLatch _suspendLatch;
- private long _delay;
-
- public DelayingFailoverPolicy(AMQConnection connection, long delay)
- {
- super(connection.getConnectionURL(), connection);
- _suspendLatch = new CountDownLatch(1);
- _delay = delay;
- }
-
- @Override
- public void attainedConnection()
- {
- try
- {
- _suspendLatch.await(_delay, TimeUnit.SECONDS);
- }
- catch (InterruptedException e)
- {
- // continue
- }
- super.attainedConnection();
- }
-
- }
-
-
- private class FailoverTestMessageListener implements MessageListener
- {
- // message counter
- private AtomicInteger _counter = new AtomicInteger();
-
- private List<Message> _receivedMessage = new ArrayList<Message>();
-
- private volatile CountDownLatch _endLatch;
-
- public FailoverTestMessageListener() throws JMSException
- {
- _endLatch = new CountDownLatch(1);
- }
-
- @Override
- public void onMessage(Message message)
- {
- _receivedMessage.add(message);
- if (_counter.incrementAndGet() % _messageNumber == 0)
- {
- _endLatch.countDown();
- }
- }
-
- public void reset()
- {
- _receivedMessage.clear();
- _endLatch = new CountDownLatch(1);
- _counter.set(0);
- }
-
- public List<Message> getReceivedMessages()
- {
- return _receivedMessage;
- }
-
- public Object awaitForEnd() throws InterruptedException
- {
- return _endLatch.await((long) _messageNumber, TimeUnit.SECONDS);
- }
-
- public int getMessageCounter()
- {
- return _counter.get();
- }
- }
-
- /**
- * Tests {@link Session#close()} for session with given acknowledge mode
- * to ensure that it blocks until failover implementation restores connection.
- *
- * @param acknowledgeMode session acknowledge mode
- * @throws JMSException
- */
- private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws Exception
- {
- initDelayedFailover(acknowledgeMode);
-
- // intentionally receive message but not commit or acknowledge it in
- // case of transacted or CLIENT_ACK session
- Message receivedMessage = _consumer.receive(1000l);
- assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
-
- failDefaultBroker();
-
- // wait until failover is started
- _failoverStarted.await(5, TimeUnit.SECONDS);
-
- // test whether session#close blocks while failover is in progress
- _consumerSession.close();
-
- assertTrue("Failover has not completed yet but session was closed", _failoverComplete.await(5, TimeUnit.SECONDS));
-
- assertFailoverException();
- }
-
- /**
- * A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing.
- *
- * @param acknowledgeMode session acknowledge mode
- * @return queue browser
- * @throws JMSException
- */
- private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException, QpidException
- {
- init(acknowledgeMode, false);
- _consumer.close();
- _connection.start();
-
- produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
- if (acknowledgeMode == Session.SESSION_TRANSACTED)
- {
- _producerSession.commit();
- }
- else
- {
- ((AMQSession)_producerSession).sync();
- }
-
- QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination);
- return browser;
- }
-
- /**
- * Tests {@link QueueBrowser#close()} for session with given acknowledge mode
- * to ensure that it blocks until failover implementation restores connection.
- *
- * @param acknowledgeMode session acknowledge mode
- * @throws JMSException
- */
- private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws Exception
- {
- QueueBrowser browser = prepareQueueBrowser(acknowledgeMode);
-
- @SuppressWarnings("unchecked")
- Enumeration<Message> messages = browser.getEnumeration();
- Message receivedMessage = (Message) messages.nextElement();
- assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
-
- failDefaultBroker();
-
- // wait until failover is started
- _failoverStarted.await(5, TimeUnit.SECONDS);
-
- browser.close();
-
- assertTrue("Failover has not completed yet but browser was closed", _failoverComplete.await(5, TimeUnit.SECONDS));
-
- assertFailoverException();
- }
-
- private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException
- {
- DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy();
- init(acknowledgeMode, true);
- produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
- if (acknowledgeMode == Session.SESSION_TRANSACTED)
- {
- _producerSession.commit();
- }
- return failoverPolicy;
- }
-
- private DelayingFailoverPolicy setDelayedFailoverPolicy()
- {
- return setDelayedFailoverPolicy(2);
- }
-
- private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay)
- {
- AMQConnection amqConnection = (AMQConnection) _connection;
- DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay);
- ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy);
- return failoverPolicy;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java b/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
deleted file mode 100644
index f78b9d2..0000000
--- a/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client.failover;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.test.utils.BrokerHolder;
-import org.apache.qpid.test.utils.FailoverBaseCase;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-/*
- * we need to create 4 brokers:
- * 1st broker will be running in test JVM and will not have failover host (only tcp connection will established, amqp connection will be closed)
- * 2d broker will be spawn in separate JVM and should have a failover host (amqp connection should be established)
- * 3d broker will be spawn in separate JVM and should not have a failover host (only tcp connection will established, amqp connection will be closed)
- * 4d broker will be spawn in separate JVM and should have a failover host (amqp connection should be established)
- */
-public class MultipleBrokersFailoverTest extends QpidBrokerTestCase implements ConnectionListener
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(MultipleBrokersFailoverTest.class);
-
- private static final String FAILOVER_VIRTUAL_HOST = "failover";
- private static final String NON_FAILOVER_VIRTUAL_HOST = "nonfailover";
- private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
- private static final int FAILOVER_RETRIES = 0;
- private static final int FAILOVER_CONNECTDELAY = 0;
- private static final long FAILOVER_AWAIT_TIME = FailoverBaseCase.DEFAULT_FAILOVER_TIME;
- private static final int NUMBER_OF_BROKERS = 4;
-
- private BrokerHolder[] _brokerHolders;
- private String _connectionURL;
- private Connection _connection;
- private CountDownLatch _failoverComplete;
- private CountDownLatch _failoverStarted;
- private Session _consumerSession;
- private Destination _destination;
- private MessageConsumer _consumer;
- private Session _producerSession;
- private MessageProducer _producer;
-
- @Override
- public void startDefaultBroker()
- {
- // do not start the default broker for this test
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- _brokerHolders = new BrokerHolder[NUMBER_OF_BROKERS];
-
- // the test should connect to the second broker first and fail over to the forth broker
- // after unsuccessful try to establish the connection to the 3d broker
- for (int i = 0; i < NUMBER_OF_BROKERS; i++)
- {
- String host = null;
- if (i == 1 || i == NUMBER_OF_BROKERS - 1)
- {
- host = FAILOVER_VIRTUAL_HOST;
- }
- else
- {
- host = NON_FAILOVER_VIRTUAL_HOST;
- }
-
- BrokerHolder brokerHolder = createSpawnedBroker();
- createTestVirtualHostNode(brokerHolder, host, true);
- brokerHolder.start();
-
- _brokerHolders[i] = brokerHolder;
- }
-
- _connectionURL = generateUrlString(NUMBER_OF_BROKERS);
-
- _connection = getConnection(_connectionURL);
- ((AMQConnection) _connection).setConnectionListener(this);
- _failoverComplete = new CountDownLatch(1);
- _failoverStarted = new CountDownLatch(1);
- }
-
- private String generateUrlString(int numBrokers)
- {
- String baseString = "amqp://guest:guest@test/" + FAILOVER_VIRTUAL_HOST
- + "?&failover='roundrobin?cyclecount='1''&brokerlist='";
- StringBuffer buffer = new StringBuffer(baseString);
-
- for(int i = 0; i< numBrokers ; i++)
- {
- if(i != 0)
- {
- buffer.append(";");
- }
-
- String broker = String.format(BROKER_PORTION_FORMAT, _brokerHolders[i].getAmqpPort(),
- FAILOVER_CONNECTDELAY, FAILOVER_RETRIES);
- buffer.append(broker);
- }
- buffer.append("'");
-
- return buffer.toString();
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- super.tearDown();
- }
- finally
- {
- for (BrokerHolder broker : _brokerHolders)
- {
- stopBrokerSafely(broker);
- }
- }
- }
-
-
- public void testFailoverOnBrokerKill() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
- assertConnectionPort(_brokerHolders[1].getAmqpPort());
-
- assertSendReceive(0);
-
- _brokerHolders[1].kill();
-
- awaitForFailoverCompletion(FAILOVER_AWAIT_TIME);
- assertEquals("Failover did not start within " + FAILOVER_AWAIT_TIME + "ms.", 0, _failoverStarted.getCount());
-
- assertSendReceive(2);
- assertConnectionPort(_brokerHolders[NUMBER_OF_BROKERS - 1].getAmqpPort());
- }
-
- public void testFailoverOnBrokerStop() throws Exception
- {
- init(Session.SESSION_TRANSACTED, true);
- assertConnectionPort(_brokerHolders[1].getAmqpPort());
-
- assertSendReceive(0);
-
- _brokerHolders[1].shutdown();
-
- awaitForFailoverCompletion(FAILOVER_AWAIT_TIME);
- assertEquals("Failover did not start within " + FAILOVER_AWAIT_TIME + "ms.", 0, _failoverStarted.getCount());
-
- assertSendReceive(1);
- assertConnectionPort(_brokerHolders[NUMBER_OF_BROKERS - 1].getAmqpPort());
- }
-
- private void assertConnectionPort(int brokerPort)
- {
- int connectionPort = ((AMQConnection)_connection).getActiveBrokerDetails().getPort();
- assertEquals("Unexpected broker port", brokerPort, connectionPort);
- }
-
- private void assertSendReceive(int index) throws JMSException
- {
- Message message = createNextMessage(_producerSession, index);
- _producer.send(message);
- if (_producerSession.getTransacted())
- {
- _producerSession.commit();
- }
- Message receivedMessage = _consumer.receive(1000l);
- assertReceivedMessage(receivedMessage, index);
- if (_consumerSession.getTransacted())
- {
- _consumerSession.commit();
- }
- }
-
- private void awaitForFailoverCompletion(long delay) throws Exception
- {
- LOGGER.info("Awaiting Failover completion..");
- if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS))
- {
- fail("Failover did not complete within " + delay + "ms.");
- }
- }
-
- private void assertReceivedMessage(Message receivedMessage, int messageIndex)
- {
- assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage);
- assertTrue(
- "Failure to receive message [" + messageIndex + "], expected TextMessage but received " + receivedMessage,
- receivedMessage instanceof TextMessage);
- }
-
- private void init(int acknowledgeMode, boolean startConnection) throws Exception
- {
- boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;
-
- _consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
- _destination = _consumerSession.createQueue(getTestQueueName());
- _consumer = _consumerSession.createConsumer(_destination);
-
- if (startConnection)
- {
- _connection.start();
- }
-
- _producerSession = _connection.createSession(isTransacted, acknowledgeMode);
- _producer = _producerSession.createProducer(_destination);
-
- }
-
- @Override
- public void bytesSent(long count)
- {
- }
-
- @Override
- public void bytesReceived(long count)
- {
- }
-
- @Override
- public boolean preFailover(boolean redirect)
- {
- _failoverStarted.countDown();
- return true;
- }
-
- @Override
- public boolean preResubscribe()
- {
- return true;
- }
-
- @Override
- public void failoverComplete()
- {
- _failoverComplete.countDown();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/588c65f7/systests/src/test/java/org/apache/qpid/server/failover/FailoverMethodTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/failover/FailoverMethodTest.java b/systests/src/test/java/org/apache/qpid/server/failover/FailoverMethodTest.java
deleted file mode 100644
index 1916664..0000000
--- a/systests/src/test/java/org/apache/qpid/server/failover/FailoverMethodTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.failover;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.AMQConnectionClosedException;
-import org.apache.qpid.AMQDisconnectedException;
-import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.util.SystemUtils;
-
-public class FailoverMethodTest extends QpidBrokerTestCase implements ExceptionListener
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(FailoverMethodTest.class);
- private CountDownLatch _failoverComplete = new CountDownLatch(1);
- private final int _freePortWithNoBroker = findFreePort();
-
- /**
- * Test that the round robin method has the correct delays.
- * The first connection will work but the localhost connection should fail but the duration it takes
- * to report the failure is what is being tested.
- *
- */
- public void testFailoverRoundRobinDelay() throws Exception
- {
- if (SystemUtils.isWindows())
- {
- //TODO Test requires redevelopment - timings/behaviour on windows mean it fails
- return;
- }
-
- //note: The first broker has no connect delay and the default 1 retry
- // while the tcp:localhost broker has 3 retries with a 2s connect delay
- String connectionString = "amqp://guest:guest@/test?brokerlist=" +
- "'tcp://localhost:" + getDefaultAmqpPort() +
- ";tcp://localhost:" + _freePortWithNoBroker + "?connectdelay='2000',retries='3''";
-
- AMQConnectionURL url = new AMQConnectionURL(connectionString);
-
- try
- {
- long start = System.currentTimeMillis();
- AMQConnection connection = new AMQConnection(url);
-
- connection.setExceptionListener(this);
-
- LOGGER.debug("Stopping broker");
- stopDefaultBroker();
- LOGGER.debug("Stopped broker");
-
- _failoverComplete.await(30, TimeUnit.SECONDS);
- assertEquals("failoverLatch was not decremented in given timeframe",
- 0, _failoverComplete.getCount());
-
- long end = System.currentTimeMillis();
-
- long duration = (end - start);
-
- //Failover should take more that 6 seconds.
- // 3 Retries
- // so VM Broker NoDelay 0 (Connect) NoDelay 0
- // then TCP NoDelay 0 Delay 1 Delay 2 Delay 3
- // so 3 delays of 2s in total for connection
- // as this is a tcp connection it will take 1second per connection to fail
- // so max time is 6seconds of delay plus 4 seconds of TCP Delay + 1 second of runtime. == 11 seconds
-
- // Ensure we actually had the delay
- assertTrue("Failover took less than 6 seconds", duration > 6000);
-
- // Ensure we don't have delays before initial connection and reconnection.
- // We allow 1 second for initial connection and failover logic on top of 6s of sleep.
- assertTrue("Failover took more than 11 seconds:(" + duration + ")", duration < 11000);
- }
- catch (QpidException e)
- {
- fail(e.getMessage());
- }
- }
-
- public void testFailoverSingleDelay() throws Exception
- {
- if (SystemUtils.isWindows())
- {
- //TODO Test requires redevelopment - timings/behaviour on windows mean it fails
- return;
- }
-
- String connectionString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:" + getDefaultAmqpPort() + "?connectdelay='2000',retries='3''";
-
- AMQConnectionURL url = new AMQConnectionURL(connectionString);
-
- try
- {
- long start = System.currentTimeMillis();
- AMQConnection connection = new AMQConnection(url);
-
- connection.setExceptionListener(this);
-
- LOGGER.debug("Stopping broker");
- stopDefaultBroker();
- LOGGER.debug("Stopped broker");
-
- _failoverComplete.await(30, TimeUnit.SECONDS);
- assertEquals("failoverLatch was not decremented in given timeframe",
- 0, _failoverComplete.getCount());
-
- long end = System.currentTimeMillis();
-
- long duration = (end - start);
-
- //Failover should take more that 6 seconds.
- // 3 Retries
- // so NoDelay 0 (Connect) NoDelay 0 Delay 1 Delay 2 Delay 3
- // so 3 delays of 2s in total for connection
- // so max time is 6 seconds of delay + 1 second of runtime. == 7 seconds
-
- // Ensure we actually had the delay
- assertTrue("Failover took less than 6 seconds", duration > 6000);
-
- // Ensure we don't have delays before initial connection and reconnection.
- // We allow 3 second for initial connection and failover logic on top of 6s of sleep.
- assertTrue("Failover took more than 9 seconds:(" + duration + ")", duration < 9000);
- }
- catch (QpidException e)
- {
- fail(e.getMessage());
- }
- }
-
-
- /**
- * Test that setting 'nofailover' as the failover policy does not result in
- * delays or connection attempts when the initial connection is lost.
- *
- * Test validates that there is a connection delay as required on initial
- * connection.
- */
- public void testNoFailover() throws Exception
- {
- if (SystemUtils.isWindows())
- {
- //TODO Test requires redevelopment - timings/behaviour on windows mean it fails
- return;
- }
-
- int CONNECT_DELAY = 2000;
- String connectionString = "amqp://guest:guest@/test?brokerlist='tcp://localhost:" + getDefaultAmqpPort() + "?connectdelay='" + CONNECT_DELAY + "'," +
- "retries='3'',failover='nofailover'";
-
-
- AMQConnectionURL url = new AMQConnectionURL(connectionString);
-
- Thread brokerStart = null;
- try
- {
- //Kill initial broker
- stopDefaultBroker();
-
- //Create a thread to start the broker asynchronously
- brokerStart = new Thread(new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- //Wait before starting broker
- // The wait should allow at least 1 retries to fail before broker is ready
- Thread.sleep(750);
- startDefaultBroker();
- }
- catch (Exception e)
- {
- LOGGER.error("Exception whilst starting broker", e);
- }
- }
- });
-
- brokerStart.start();
- long start = System.currentTimeMillis();
- //Start the connection so it will use the retries
- AMQConnection connection = new AMQConnection(url);
-
- long end = System.currentTimeMillis();
- long duration = (end - start);
-
- // Check that we actually had a delay in connection
- assertTrue("Initial connection should be longer than 1 delay : " + CONNECT_DELAY + " <:(" + duration + ")", duration > CONNECT_DELAY);
-
-
- connection.setExceptionListener(this);
-
- //Ensure we collect the brokerStart thread
- brokerStart.join();
- brokerStart = null;
-
- start = System.currentTimeMillis();
-
- //Kill connection
- stopDefaultBroker();
-
- _failoverComplete.await(30, TimeUnit.SECONDS);
- assertEquals("failoverLatch was not decremented in given timeframe", 0, _failoverComplete.getCount());
-
- end = System.currentTimeMillis();
-
- duration = (end - start);
-
- // Notification of the connection failure should be very quick as we are denying the ability to failover.
- // It may not be as quick for Java profile tests so lets just make sure it is less than the connectiondelay
- // Occasionally it takes 1s so we have to set CONNECT_DELAY to be higher to take that in to account.
- assertTrue("Notification of the connection failure took was : " + CONNECT_DELAY + " >:(" + duration + ")", duration < CONNECT_DELAY);
- }
- catch (QpidException e)
- {
- fail(e.getMessage());
- }
- finally
- {
- // Guard against the case where the broker took too long to start
- // and the initial connection failed to be formed.
- if (brokerStart != null)
- {
- brokerStart.join();
- }
- }
- }
-
- @Override
- public void onException(JMSException e)
- {
- if (e.getLinkedException() instanceof AMQDisconnectedException || e.getLinkedException() instanceof AMQConnectionClosedException)
- {
- LOGGER.debug("Received AMQDisconnectedException");
- _failoverComplete.countDown();
- }
- else
- {
- LOGGER.error("Unexpected underlying exception", e.getLinkedException());
- }
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[4/5] qpid-broker-j git commit: QPID-6933: [System Tests] Move AMQP
0-x client specific JMSDestinationTest to client suite
Posted by kw...@apache.org.
QPID-6933: [System Tests] Move AMQP 0-x client specific JMSDestinationTest to client suite
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b6934dfc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b6934dfc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b6934dfc
Branch: refs/heads/master
Commit: b6934dfcbc07c9f2ddd9c79e5a07c1d2245df3f4
Parents: 799247d
Author: Keith Wall <kw...@apache.org>
Authored: Sun Jan 21 09:47:09 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Sun Jan 21 09:47:09 2018 +0000
----------------------------------------------------------------------
.../test/client/message/JMSDestinationTest.java | 172 -------------------
test-profiles/Java10Excludes | 7 -
test-profiles/JavaPre010Excludes | 3 -
test-profiles/cpp.excludes | 2 -
4 files changed, 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b6934dfc/systests/src/test/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/client/message/JMSDestinationTest.java b/systests/src/test/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
deleted file mode 100644
index 1fcc494..0000000
--- a/systests/src/test/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.client.message;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.CustomJMSXProperty;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-/**
- * From the API Docs getJMSDestination:
- *
- * When a message is received, its JMSDestination value must be equivalent to
- * the value assigned when it was sent.
- */
-public class JMSDestinationTest extends QpidBrokerTestCase
-{
-
- private Connection _connection;
- private Session _session;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
-
- _connection = getConnection();
-
- _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- /**
- * Test a message received without the JMS_QPID_DESTTYPE can be resent
- * and correctly have the property set.
- *
- * To do this we need to create a 0-10 connection and send a message
- * which is then received by a 0-8/9 client.
- *
- * @throws Exception
- */
- public void testReceiveResend() throws Exception
- {
- // Create a 0-10 Connection and send message
- setSystemProperty(ClientProperties.AMQP_VERSION, "0-10");
-
- Connection connection010 = getConnection();
-
- Session session010 = connection010.createSession(true, Session.SESSION_TRANSACTED);
-
- // Create queue for testing
- Queue queue = session010.createQueue(getTestQueueName());
-
- // Ensure queue exists
- session010.createConsumer(queue).close();
-
- sendMessage(session010, queue, 1);
-
- // Close the 010 connection
- connection010.close();
-
- // Create a 0-8 Connection and receive message
- setSystemProperty(ClientProperties.AMQP_VERSION, "0-8");
-
- Connection connection08 = getConnection();
-
- Session session08 = connection08.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer consumer = session08.createConsumer(queue);
-
- connection08.start();
-
- Message message = consumer.receive(1000);
-
- assertNotNull("Didn't receive 0-10 message.", message);
-
- // Validate that JMS_QPID_DESTTYPE is not set
- try
- {
- message.getIntProperty(CustomJMSXProperty.JMS_QPID_DESTTYPE.toString());
- fail("JMS_QPID_DESTTYPE should not be set, so should throw NumberFormatException");
- }
- catch (NumberFormatException nfe)
- {
-
- }
-
- // Resend message back to queue and validate that
- // a) getJMSDestination works without the JMS_QPID_DESTTYPE
- // b) we can actually send without a BufferOverFlow.
-
- MessageProducer producer = session08.createProducer(queue);
- producer.send(message);
-
- message = consumer.receive(1000);
-
- assertNotNull("Didn't receive recent 0-8 message.", message);
-
- // Validate that JMS_QPID_DESTTYPE is not set
- assertEquals("JMS_QPID_DESTTYPE should be set to a Queue", AMQDestination.QUEUE_TYPE,
- message.getIntProperty(CustomJMSXProperty.JMS_QPID_DESTTYPE.toString()));
-
- }
-
- public void testQueueWithBindingUrlUsingCustomExchange() throws Exception
- {
- String exchangeName = "exch_" + getTestQueueName();
- String queueName = "queue_" + getTestQueueName();
-
- String address = String.format("direct://%s/%s/%s?routingkey='%s'", exchangeName, queueName, queueName, queueName);
- sendReceive(address);
- }
-
- public void testQueueWithBindingUrlUsingAmqDirectExchange() throws Exception
- {
- String queueName = getTestQueueName();
- String address = String.format("direct://amq.direct/%s/%s?routingkey='%s'", queueName, queueName, queueName);
- sendReceive(address);
- }
-
- public void testQueueWithBindingUrlUsingDefaultExchange() throws Exception
- {
- String queueName = getTestQueueName();
- String address = String.format("direct:///%s/%s?routingkey='%s'", queueName, queueName, queueName);
- sendReceive(address);
- }
-
- private void sendReceive(String address) throws JMSException, Exception
- {
- Destination dest = _session.createQueue(address);
- MessageConsumer consumer = _session.createConsumer(dest);
-
- _connection.start();
-
- sendMessage(_session, dest, 1);
-
- Message receivedMessage = consumer.receive(10000);
-
- assertNotNull("Message should not be null", receivedMessage);
-
- Destination receivedDestination = receivedMessage.getJMSDestination();
-
- assertNotNull("JMSDestination should not be null", receivedDestination);
- assertEquals("JMSDestination should match that sent", address, receivedDestination.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b6934dfc/test-profiles/Java10Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10Excludes b/test-profiles/Java10Excludes
index 99db5ce..aa97f78 100644
--- a/test-profiles/Java10Excludes
+++ b/test-profiles/Java10Excludes
@@ -48,13 +48,6 @@ org.apache.qpid.systest.rest.MessageContentCompressionRestTest#*
// Tests the interaction between the Broker's supported protocols and what the 0-x client agrees to
org.apache.qpid.server.SupportedProtocolVersionsTest#*
-// test of 0-10 client specific behaviour
-org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend
-// BURL specific tests
-org.apache.qpid.test.client.message.JMSDestinationTest#testQueueWithBindingUrlUsingCustomExchange
-org.apache.qpid.test.client.message.JMSDestinationTest#testQueueWithBindingUrlUsingAmqDirectExchange
-org.apache.qpid.test.client.message.JMSDestinationTest#testQueueWithBindingUrlUsingDefaultExchange
-
// Durable topic subscriptions will be reimplemented with the shared topic subscriptions (QPID-7569)
org.apache.qpid.server.logging.ConsumerLoggingTest#testSubscriptionCreateDurable
org.apache.qpid.server.logging.ConsumerLoggingTest#testSubscriptionCreateDurableWithArguments
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b6934dfc/test-profiles/JavaPre010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/JavaPre010Excludes b/test-profiles/JavaPre010Excludes
index 538a93e..7d3925e 100644
--- a/test-profiles/JavaPre010Excludes
+++ b/test-profiles/JavaPre010Excludes
@@ -21,9 +21,6 @@
//Exclude the following from brokers using the 0-8/0-9/0-9-1 protocols
//======================================================================
-// These tests requires a broker capable of 0-8/0-9/0-9-1 and 0-10 concurrently
-org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend
-
// Those tests are written against the 0.10 path
org.apache.qpid.client.SynchReceiveTest#testReceiveNoWait
org.apache.qpid.server.logging.ChannelLoggingTest#testChannelClosedOnExclusiveQueueDeclaredOnDifferentSession
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b6934dfc/test-profiles/cpp.excludes
----------------------------------------------------------------------
diff --git a/test-profiles/cpp.excludes b/test-profiles/cpp.excludes
index d41d8fe..0c6c80e 100644
--- a/test-profiles/cpp.excludes
+++ b/test-profiles/cpp.excludes
@@ -21,8 +21,6 @@
//Exclude the following tests when running all cpp test profilies
//======================================================================
-// This test requires a broker capable of 0-8/9 and 0-10
-org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend
//BDB System Tests
org.apache.qpid.server.store.berkeleydb.*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[5/5] qpid-broker-j git commit: QPID-6933: [System Tests] Remove
DurableSubscriptionTest - redundant/poorly focused
Posted by kw...@apache.org.
QPID-6933: [System Tests] Remove DurableSubscriptionTest - redundant/poorly focused
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b4ba6158
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b4ba6158
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b4ba6158
Branch: refs/heads/master
Commit: b4ba6158f3fc9f86b07cf1dd24f2e778fbc182fc
Parents: b6934df
Author: Keith Wall <kw...@apache.org>
Authored: Sun Jan 21 09:55:01 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Sun Jan 21 09:55:01 2018 +0000
----------------------------------------------------------------------
.../unit/topic/DurableSubscriptionTest.java | 586 -------------------
test-profiles/CPPExcludes | 1 -
test-profiles/CPPNoPrefetchExcludes | 2 -
test-profiles/Java010Excludes | 1 -
test-profiles/Java10BrokenTestsExcludes | 2 -
5 files changed, 592 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4ba6158/systests/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/systests/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
deleted file mode 100644
index cd9bd7d..0000000
--- a/systests/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ /dev/null
@@ -1,586 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.topic;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSubscriber;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQNoRouteException;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-/**
- * The tests in the suite only test 0-x client specific behaviour.
- * The tests should be moved into client or removed
- */
-public class DurableSubscriptionTest extends QpidBrokerTestCase
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(DurableSubscriptionTest.class);
-
- private static final String MY_TOPIC = "MyTopic";
-
- private static final String MY_SUBSCRIPTION = "MySubscription";
-
- /**
- * Specifically uses a subscriber with a selector because QPID-4731 found that selectors
- * can prevent queue removal.
- */
- public void testUnsubscribeWhenUsingSelectorMakesTopicUnreachable() throws Exception
- {
- setTestClientSystemProperty("qpid.default_mandatory_topic","true");
-
- // set up subscription
- AMQConnection connection = (AMQConnection) getConnection();
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = createTopic(connection, MY_TOPIC);
- MessageProducer producer = session.createProducer(topic);
-
- TopicSubscriber subscriber = session.createDurableSubscriber(topic, MY_SUBSCRIPTION, "1 = 1", false);
- StoringExceptionListener exceptionListener = new StoringExceptionListener();
- connection.setExceptionListener(exceptionListener);
-
- // send message and verify it was consumed
- producer.send(session.createTextMessage("message1"));
- assertNotNull("Message should have been successfully received", subscriber.receive(getReceiveTimeout()));
- assertEquals(null, exceptionListener.getException());
- session.unsubscribe(MY_SUBSCRIPTION);
-
- // send another message and verify that the connection exception listener was fired.
- StoringExceptionListener exceptionListener2 = new StoringExceptionListener();
- connection.setExceptionListener(exceptionListener2);
-
- producer.send(session.createTextMessage("message that should be unroutable"));
- ((AMQSession<?, ?>) session).sync();
-
- JMSException exception = exceptionListener2.awaitException();
- assertNotNull("Expected exception as message should no longer be routable", exception);
-
- Throwable linkedException = exception.getLinkedException();
- assertNotNull("The linked exception of " + exception + " should be the 'no route' exception", linkedException);
- assertEquals(AMQNoRouteException.class, linkedException.getClass());
- }
-
- private final class StoringExceptionListener implements ExceptionListener
- {
- private volatile JMSException _exception;
- private CountDownLatch _latch = new CountDownLatch(1);
-
- @Override
- public void onException(JMSException exception)
- {
- _exception = exception;
- LOGGER.info("Exception listener received: " + exception);
- _latch.countDown();
- }
-
- public JMSException awaitException() throws InterruptedException
- {
- _latch.await(getReceiveTimeout(), TimeUnit.MILLISECONDS);
- return _exception;
- }
-
- public JMSException getException()
- {
- return _exception;
- }
- }
-
- public void testDurabilityNOACK() throws Exception
- {
- durabilityImpl(AMQSession.NO_ACKNOWLEDGE, false);
- }
-
- public void testDurabilityAUTOACK() throws Exception
- {
- durabilityImpl(Session.AUTO_ACKNOWLEDGE, false);
- }
-
- public void testDurabilityAUTOACKwithRestartIfPersistent() throws Exception
- {
- if(!isBrokerStorePersistent())
- {
- LOGGER.warn("The broker store is not persistent, skipping this test");
- return;
- }
-
- durabilityImpl(Session.AUTO_ACKNOWLEDGE, true);
- }
-
- public void testDurabilityNOACKSessionPerConnection() throws Exception
- {
- durabilityImplSessionPerConnection(AMQSession.NO_ACKNOWLEDGE);
- }
-
- public void testDurabilityAUTOACKSessionPerConnection() throws Exception
- {
- durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE);
- }
-
- private void durabilityImpl(int ackMode, boolean restartBroker) throws Exception
- {
- TopicConnection con = (TopicConnection) getConnection();
- Topic topic = createTopic(con, MY_TOPIC);
- Session session1 = con.createSession(false, ackMode);
- MessageConsumer consumer1 = session1.createConsumer(topic);
-
- Session sessionProd = con.createSession(false, ackMode);
- MessageProducer producer = sessionProd.createProducer(topic);
-
- Session session2 = con.createSession(false, ackMode);
- TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, MY_SUBSCRIPTION);
-
- con.start();
-
- //send message A and check both consumers receive
- producer.send(session1.createTextMessage("A"));
-
- Message msg;
- LOGGER.info("Receive message on consumer 1 :expecting A");
- msg = consumer1.receive(getReceiveTimeout());
- assertNotNull("Message should have been received",msg);
- assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer1.receive(getShortReceiveTimeout());
- assertEquals(null, msg);
-
- LOGGER.info("Receive message on consumer 2 :expecting A");
- msg = consumer2.receive(getReceiveTimeout());
- assertNotNull("Message should have been received",msg);
- assertEquals("A", ((TextMessage) msg).getText());
- msg = consumer2.receive(getShortReceiveTimeout());
- assertEquals(null, msg);
-
- //send message B, receive with consumer 1, and disconnect consumer 2 to leave the message behind (if not NO_ACK)
- producer.send(session1.createTextMessage("B"));
-
- LOGGER.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(getReceiveTimeout());
- assertNotNull("Consumer 1 should get message 'B'.", msg);
- assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText());
- LOGGER.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(getShortReceiveTimeout());
- assertNull("There should be no more messages for consumption on consumer1.", msg);
-
- consumer2.close();
- session2.close();
-
- //Send message C, then connect consumer 3 to durable subscription and get
- //message B if not using NO_ACK, then receive C with consumer 1 and 3
- producer.send(session1.createTextMessage("C"));
-
- Session session3 = con.createSession(false, ackMode);
- MessageConsumer consumer3 = session3.createDurableSubscriber(topic, MY_SUBSCRIPTION);
-
- if(ackMode == AMQSession.NO_ACKNOWLEDGE)
- {
- //Do nothing if NO_ACK was used, as prefetch means the message was dropped
- //when we didn't call receive() to get it before closing consumer 2
- }
- else
- {
- LOGGER.info("Receive message on consumer 3 :expecting B");
- msg = consumer3.receive(getReceiveTimeout());
- assertNotNull("Consumer 3 should get message 'B'.", msg);
- assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText());
- }
-
- LOGGER.info("Receive message on consumer 1 :expecting C");
- msg = consumer1.receive(getReceiveTimeout());
- assertNotNull("Consumer 1 should get message 'C'.", msg);
- assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText());
- LOGGER.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(getShortReceiveTimeout());
- assertNull("There should be no more messages for consumption on consumer1.", msg);
-
- LOGGER.info("Receive message on consumer 3 :expecting C");
- msg = consumer3.receive(getReceiveTimeout());
- assertNotNull("Consumer 3 should get message 'C'.", msg);
- assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText());
- LOGGER.info("Receive message on consumer 3 :expecting null");
- msg = consumer3.receive(getShortReceiveTimeout());
- assertNull("There should be no more messages for consumption on consumer3.", msg);
-
- consumer1.close();
- consumer3.close();
-
- session3.unsubscribe(MY_SUBSCRIPTION);
-
- con.close();
-
- if(restartBroker)
- {
- try
- {
- restartDefaultBroker();
- }
- catch (Exception e)
- {
- fail("Error restarting the broker");
- }
- }
- }
-
- private void durabilityImplSessionPerConnection(int ackMode) throws Exception
- {
- Message msg;
- // Create producer.
- TopicConnection con0 = (TopicConnection) getConnection();
- con0.start();
- Session session0 = con0.createSession(false, ackMode);
-
- Topic topic = createTopic(con0, MY_TOPIC);
-
- Session sessionProd = con0.createSession(false, ackMode);
- MessageProducer producer = sessionProd.createProducer(topic);
-
- // Create consumer 1.
- Connection con1 = getConnection();
- con1.start();
- Session session1 = con1.createSession(false, ackMode);
-
- MessageConsumer consumer1 = session1.createConsumer(topic);
-
- // Create consumer 2.
- Connection con2 = getConnection();
- con2.start();
- Session session2 = con2.createSession(false, ackMode);
-
- TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, MY_SUBSCRIPTION);
-
- // Send message and check that both consumers get it and only it.
- producer.send(session0.createTextMessage("A"));
-
- msg = consumer1.receive(getReceiveTimeout());
- assertNotNull("Message should be available", msg);
- assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
- msg = consumer1.receive(getShortReceiveTimeout());
- assertNull("There should be no more messages for consumption on consumer1.", msg);
-
- msg = consumer2.receive(getReceiveTimeout());
- assertNotNull("Message should have been received",msg);
- assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
- msg = consumer2.receive(getShortReceiveTimeout());
- assertNull("There should be no more messages for consumption on consumer2.", msg);
-
- // Send message and receive on consumer 1.
- producer.send(session0.createTextMessage("B"));
-
- LOGGER.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(getReceiveTimeout());
- assertEquals("B", ((TextMessage) msg).getText());
- LOGGER.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(getShortReceiveTimeout());
- assertEquals(null, msg);
-
- // Detach the durable subscriber.
- consumer2.close();
- session2.close();
- con2.close();
-
- // Send message C and receive on consumer 1
- producer.send(session0.createTextMessage("C"));
-
- LOGGER.info("Receive message on consumer 1 :expecting C");
- msg = consumer1.receive(getReceiveTimeout());
- assertEquals("C", ((TextMessage) msg).getText());
- LOGGER.info("Receive message on consumer 1 :expecting null");
- msg = consumer1.receive(getShortReceiveTimeout());
- assertEquals(null, msg);
-
- // Re-attach a new consumer to the durable subscription, and check that it gets message B it left (if not NO_ACK)
- // and also gets message C sent after it was disconnected.
- AMQConnection con3 = (AMQConnection) getConnection();
- con3.start();
- Session session3 = con3.createSession(false, ackMode);
-
- TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, MY_SUBSCRIPTION);
-
- if(ackMode == AMQSession.NO_ACKNOWLEDGE)
- {
- //Do nothing if NO_ACK was used, as prefetch means the message was dropped
- //when we didn't call receive() to get it before closing consumer 2
- }
- else
- {
- LOGGER.info("Receive message on consumer 3 :expecting B");
- msg = consumer3.receive(getReceiveTimeout());
- assertNotNull(msg);
- assertEquals("B", ((TextMessage) msg).getText());
- }
-
- LOGGER.info("Receive message on consumer 3 :expecting C");
- msg = consumer3.receive(getReceiveTimeout());
- assertNotNull("Consumer 3 should get message 'C'.", msg);
- assertEquals("Incorrect Message recevied on consumer3.", "C", ((TextMessage) msg).getText());
- LOGGER.info("Receive message on consumer 3 :expecting null");
- msg = consumer3.receive(getShortReceiveTimeout());
- assertNull("There should be no more messages for consumption on consumer3.", msg);
-
- consumer1.close();
- consumer3.close();
-
- session3.unsubscribe(MY_SUBSCRIPTION);
-
- con0.close();
- con1.close();
- con3.close();
- }
-
- /**
- * This tests the fix for QPID-1085
- * Creates a durable subscriber with an invalid selector, checks that the
- * exception is thrown correctly and that the subscription is not created.
- * @throws Exception
- */
- public void testDurableWithInvalidSelector() throws Exception
- {
- Connection conn = getConnection();
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = createTopic(conn, "MyTestDurableWithInvalidSelectorTopic");
- MessageProducer producer = session.createProducer(topic);
- producer.send(session.createTextMessage("testDurableWithInvalidSelector1"));
- try
- {
- TopicSubscriber deadSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub",
- "=TEST 'test", true);
- assertNull("Subscriber should not have been created", deadSubscriber);
- }
- catch (JMSException e)
- {
- assertTrue("Wrong type of exception thrown", e instanceof InvalidSelectorException);
- }
- TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub");
- assertNotNull("Subscriber should have been created", liveSubscriber);
-
- producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
-
- Message msg = liveSubscriber.receive(getReceiveTimeout());
- assertNotNull ("Message should have been received", msg);
- assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
- assertNull("Should not receive subsequent message", liveSubscriber.receive(getShortReceiveTimeout()));
- liveSubscriber.close();
- session.unsubscribe("testDurableWithInvalidSelectorSub");
- }
-
- /**
- * This tests the fix for QPID-1085
- * Creates a durable subscriber with an invalid destination, checks that the
- * exception is thrown correctly and that the subscription is not created.
- * @throws Exception
- */
- public void testDurableWithInvalidDestination() throws Exception
- {
- Connection conn = getConnection();
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = createTopic(conn, "testDurableWithInvalidDestinationTopic");
- try
- {
- TopicSubscriber deadSubscriber = session.createDurableSubscriber(null, "testDurableWithInvalidDestinationsub");
- assertNull("Subscriber should not have been created", deadSubscriber);
- }
- catch (InvalidDestinationException e)
- {
- // This was expected
- }
- MessageProducer producer = session.createProducer(topic);
- producer.send(session.createTextMessage("testDurableWithInvalidSelector1"));
-
- TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidDestinationsub");
- assertNotNull("Subscriber should have been created", liveSubscriber);
-
- producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
- Message msg = liveSubscriber.receive(getReceiveTimeout());
- assertNotNull ("Message should have been received", msg);
- assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
- assertNull("Should not receive subsequent message", liveSubscriber.receive(getShortReceiveTimeout()));
-
- session.unsubscribe("testDurableWithInvalidDestinationsub");
- }
-
- /**
- * <ul>
- * <li>create and register a durable subscriber with a message selector
- * <li>create another durable subscriber with a different selector and same name
- * <li>check first subscriber is now closed
- * <li>create a publisher and send messages
- * <li>check messages are received correctly
- * </ul>
- * <p>
- * QPID-2418
- *
- * TODO: it seems that client behaves in not jms spec compliant:
- * the client allows subscription recreation with a new selector whilst an active subscriber is connected
- */
- public void testResubscribeWithChangedSelectorNoClose() throws Exception
- {
- Connection conn = getConnection();
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = createTopic(conn, "testResubscribeWithChangedSelectorNoClose");
-
- // Create durable subscriber that matches A
- TopicSubscriber subA = session.createDurableSubscriber(topic,
- "testResubscribeWithChangedSelectorNoClose",
- "Match = True", false);
-
- // Reconnect with new selector that matches B
- TopicSubscriber subB = session.createDurableSubscriber(topic,
- "testResubscribeWithChangedSelectorNoClose",
- "Match = false", false);
-
- // First subscription has been closed
- try
- {
- subA.receive(getShortReceiveTimeout());
- fail("First subscription was not closed");
- }
- catch (Exception e)
- {
- LOGGER.error("Receive error",e);
- }
-
- conn.stop();
-
- // Send 1 matching message and 1 non-matching message
- MessageProducer producer = session.createProducer(topic);
- TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
- msg.setBooleanProperty("Match", true);
- producer.send(msg);
- msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
- msg.setBooleanProperty("Match", false);
- producer.send(msg);
-
- // should be 1 or 2 messages on queue now
- // (1 for the Apache Qpid Broker-J due to use of server side selectors, and 2 for the cpp broker due to client side selectors only)
- AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "testResubscribeWithChangedSelectorNoClose");
- assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue, true));
-
- conn.start();
-
- Message rMsg = subB.receive(getReceiveTimeout());
- assertNotNull(rMsg);
- assertEquals("Content was wrong",
- "testResubscribeWithChangedSelectorAndRestart2",
- ((TextMessage) rMsg).getText());
-
- rMsg = subB.receive(getShortReceiveTimeout());
- assertNull(rMsg);
-
- // Check queue has no messages
- assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue, true));
-
- conn.close();
- }
-
- /**
- * <ul>
- * <li>create and register a durable subscriber with no message selector
- * <li>create another durable subscriber with a selector and same name
- * <li>check first subscriber is now closed
- * <li>create a publisher and send messages
- * <li>check messages are received correctly
- * </ul>
- * <p>
- * QPID-2418
- *
- * TODO: it seems that client behaves in not jms spec compliant:
- * the client allows subscription recreation with a new selector whilst active subscriber is connected
- */
- public void testDurSubAddMessageSelectorNoClose() throws Exception
- {
- Connection conn = getConnection();
- conn.start();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = createTopic(conn, "subscriptionName");
-
- // create and register a durable subscriber with no message selector
- TopicSubscriber subOne = session.createDurableSubscriber(topic, "subscriptionName", null, false);
-
- // now create a durable subscriber with a selector
- TopicSubscriber subTwo = session.createDurableSubscriber(topic, "subscriptionName", "testprop = TRUE", false);
-
- // First subscription has been closed
- try
- {
- subOne.receive(getShortReceiveTimeout());
- fail("First subscription was not closed");
- }
- catch (Exception e)
- {
- LOGGER.error("Receive error",e);
- }
-
- conn.stop();
-
- // Send 1 matching message and 1 non-matching message
- MessageProducer producer = session.createProducer(topic);
- TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
- msg.setBooleanProperty("testprop", true);
- producer.send(msg);
- msg = session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
- msg.setBooleanProperty("testprop", false);
- producer.send(msg);
-
- // should be 1 or 2 messages on queue now
- // (1 for the Apache Qpid Broker-J due to use of server side selectors, and 2 for the cpp broker due to client side selectors only)
- AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" + "subscriptionName");
- assertEquals("Queue depth is wrong", isJavaBroker() ? 1 : 2, ((AMQSession<?, ?>) session).getQueueDepth(queue, true));
-
- conn.start();
-
- Message rMsg = subTwo.receive(getReceiveTimeout());
- assertNotNull(rMsg);
- assertEquals("Content was wrong",
- "testResubscribeWithChangedSelectorAndRestart1",
- ((TextMessage) rMsg).getText());
-
- rMsg = subTwo.receive(getShortReceiveTimeout());
- assertNull(rMsg);
-
- // Check queue has no messages
- assertEquals("Queue should be empty", 0, ((AMQSession<?, ?>) session).getQueueDepth(queue, true));
-
- conn.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4ba6158/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index 5636a05..68acca2 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -28,7 +28,6 @@ org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#*
// QPID-1262, QPID-1119 : This test fails occasionally due to potential protocol issue.
org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#*
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testUnsubscribeWhenUsingSelectorMakesTopicUnreachable
// QPID-1727 , QPID-1726 :c++ broker does not support flow to disk on transient queues. Also it requries a persistent store impl. for Apache
org.apache.qpid.test.client.QueueBrowsingFlowToDiskTest#*
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4ba6158/test-profiles/CPPNoPrefetchExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPNoPrefetchExcludes b/test-profiles/CPPNoPrefetchExcludes
index ddfba4d..969b927 100644
--- a/test-profiles/CPPNoPrefetchExcludes
+++ b/test-profiles/CPPNoPrefetchExcludes
@@ -17,5 +17,3 @@
// under the License.
//
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurabilityNOACK
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurabilityNOACKSessionPerConnection
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4ba6158/test-profiles/Java010Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java010Excludes b/test-profiles/Java010Excludes
index 30176f1..327c146 100755
--- a/test-profiles/Java010Excludes
+++ b/test-profiles/Java010Excludes
@@ -19,7 +19,6 @@
// Those tests are testing 0.8..-0-9-1 specific semantics
org.apache.qpid.systest.rest.BrokerRestTest#testSetCloseOnNoRoute
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testUnsubscribeWhenUsingSelectorMakesTopicUnreachable
// 0-10 and 0-9 connections dont generate the exact same logging due to protocol differences
org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartsFlowStopped
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4ba6158/test-profiles/Java10BrokenTestsExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10BrokenTestsExcludes b/test-profiles/Java10BrokenTestsExcludes
index 6edd58b..5686325 100644
--- a/test-profiles/Java10BrokenTestsExcludes
+++ b/test-profiles/Java10BrokenTestsExcludes
@@ -30,6 +30,4 @@ org.apache.qpid.systest.rest.PublishMessageRestTest#testPublishListMessage
// this test fails - likely a client bug with the modification racing the send
org.apache.qpid.test.unit.basic.BytesMessageTest#testModificationAfterSend
-// Test uses AMQP 0-x ack modes and assumes the name of the queues backing subscriptions
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/5] qpid-broker-j git commit: QPID-6933: [System Tests] Move AMQP
0-x client specific test testing acknowledgeThis to client suite
Posted by kw...@apache.org.
QPID-6933: [System Tests] Move AMQP 0-x client specific test testing acknowledgeThis to client suite
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/799247db
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/799247db
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/799247db
Branch: refs/heads/master
Commit: 799247db22b5bb77001f09299ca3da698e2f9b5d
Parents: 588c65f
Author: Keith Wall <kw...@apache.org>
Authored: Sun Jan 21 09:19:48 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Sun Jan 21 09:19:48 2018 +0000
----------------------------------------------------------------------
.../apache/qpid/test/unit/ack/RecoverTest.java | 275 -------------------
test-profiles/Java10Excludes | 5 -
2 files changed, 280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/799247db/systests/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/systests/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
deleted file mode 100644
index 06200ea..0000000
--- a/systests/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.ack;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-/**
- * Legacy JMS client specific tests
- */
-public class RecoverTest extends QpidBrokerTestCase
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(RecoverTest.class);
-
- private static final int SENT_COUNT = 4;
-
- private volatile Exception _error;
- private long _timeout;
- private Connection _connection;
- private Session _consumerSession;
- private MessageConsumer _consumer;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- _error = null;
- _timeout = getReceiveTimeout();
- }
-
- private void initTest() throws Exception
- {
- _connection = getConnection();
-
- _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = createTestQueue(_consumerSession);
-
- _consumer = _consumerSession.createConsumer(queue);
-
- LOGGER.info("Sending four messages");
- sendMessage(_connection.createSession(false, Session.AUTO_ACKNOWLEDGE), queue, SENT_COUNT);
- LOGGER.info("Starting connection");
- _connection.start();
- }
-
- private Message validateNextMessages(int nextCount, int startIndex) throws JMSException
- {
- Message message = null;
-
- for (int index = 0; index < nextCount; index++)
- {
- message = _consumer.receive(_timeout);
- assertEquals(startIndex + index, message.getIntProperty(INDEX));
- }
- return message;
- }
-
- private void validateRemainingMessages(int remaining) throws JMSException
- {
- int index = SENT_COUNT - remaining;
-
- Message message = null;
- while (index != SENT_COUNT)
- {
- message = _consumer.receive(_timeout);
- assertNotNull(message);
- int expected = index++;
- assertEquals("Message has unexpected index", expected, message.getIntProperty(INDEX));
- }
-
- if (message != null)
- {
- LOGGER.info("Received redelivery of three messages. Acknowledging last message");
- message.acknowledge();
- }
-
- LOGGER.info("Calling acknowledge with no outstanding messages");
- // all acked so no messages to be delivered
- _consumerSession.recover();
-
- message = _consumer.receiveNoWait();
- assertNull(message);
- LOGGER.info("No messages redelivered as is expected");
- }
-
- public void testRecoverResendsMsgsAckOnEarlier() throws Exception
- {
- initTest();
-
- Message message = validateNextMessages(2, 0);
- message.acknowledge();
- LOGGER.info("Received 2 messages, acknowledge() first message, should acknowledge both");
-
- _consumer.receive();
- _consumer.receive();
- LOGGER.info("Received all four messages. Calling recover with two outstanding messages");
- // no ack for last three messages so when I call recover I expect to get three messages back
- _consumerSession.recover();
-
- Message message2 = _consumer.receive(_timeout);
- assertNotNull(message2);
- assertEquals(2, message2.getIntProperty(INDEX));
-
- Message message3 = _consumer.receive(_timeout);
- assertNotNull(message3);
- assertEquals(3, message3.getIntProperty(INDEX));
-
- LOGGER.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message");
- ((org.apache.qpid.jms.Message) message2).acknowledgeThis();
-
- LOGGER.info("Calling recover");
- // all acked so no messages to be delivered
- _consumerSession.recover();
-
- message3 = _consumer.receive(_timeout);
- assertNotNull(message3);
- assertEquals(3, message3.getIntProperty(INDEX));
- ((org.apache.qpid.jms.Message) message3).acknowledgeThis();
-
- // all acked so no messages to be delivered
- validateRemainingMessages(0);
- }
-
- /**
- * Goal : Same as testOderingWithSyncConsumer
- * Test strategy :
- * Same as testOderingWithSyncConsumer but using a
- * Message Listener instead of a sync receive().
- */
- public void testOrderingWithAsyncConsumer() throws Exception
- {
- Connection con = getConnection();
- final Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Destination topic = createTopic(con, "myTopic");
- MessageConsumer cons = session.createConsumer(topic);
-
- sendMessage(session,topic,8);
- con.start();
-
- final Object lock = new Object();
- final AtomicBoolean pass = new AtomicBoolean(false); //used as work around for 'final'
-
- cons.setMessageListener(new MessageListener()
- {
- private int messageSeen = 0;
- private int expectedIndex = 0;
-
- @Override
- public void onMessage(Message message)
- {
- try
- {
- int actualIndex = message.getIntProperty(INDEX);
- assertEquals("Received Message Out Of Order", expectedIndex, actualIndex);
-
- //don't ack the message until we receive it 5 times
- if( messageSeen < 5 )
- {
- LOGGER.debug("Ignoring message " + actualIndex + " and calling recover");
- session.recover();
- messageSeen++;
- }
- else
- {
- messageSeen = 0;
- expectedIndex++;
- message.acknowledge();
- LOGGER.debug("Acknowledging message " + actualIndex);
- if (expectedIndex == 8)
- {
- pass.set(true);
- synchronized (lock)
- {
- lock.notifyAll();
- }
- }
- }
- }
- catch (JMSException e)
- {
- _error = e;
- synchronized (lock)
- {
- lock.notifyAll();
- }
- }
- }
- });
-
- synchronized(lock)
- {
- // Based on historical data, on average the test takes about 6 secs to complete.
- lock.wait(8000);
- }
-
- assertNull("Unexpected exception thrown by async listener", _error);
-
- if (!pass.get())
- {
- fail("Test did not complete on time. Please check the logs");
- }
- }
-
- /**
- * This test ensures that after exhausting credit (prefetch), a {@link Session#recover()} successfully
- * restores credit and allows the same messages to be re-received.
- */
- public void testRecoverSessionAfterCreditExhausted() throws Exception
- {
- final int maxPrefetch = 5;
-
- // We send more messages than prefetch size. This ensure that if the 0-10 client were to
- // complete the message commands before the rollback command is sent, the broker would
- // send additional messages utilising the release credit. This problem would manifest itself
- // as an incorrect message (or no message at all) being received at the end of the test.
-
- final int numMessages = maxPrefetch * 2;
-
- setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, String.valueOf(maxPrefetch));
-
- Connection con = getConnection();
- final javax.jms.Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Destination dest = session.createQueue(getTestQueueName());
- MessageConsumer cons = session.createConsumer(dest);
-
- sendMessage(session, dest, numMessages);
- con.start();
-
- for (int i=0; i< maxPrefetch; i++)
- {
- final Message message = cons.receive(_timeout);
- assertNotNull("Received:" + i, message);
- assertEquals("Unexpected message received", i, message.getIntProperty(INDEX));
- }
-
- LOGGER.info("Recovering");
- session.recover();
-
- Message result = cons.receive(_timeout);
- // Expect the first message
- assertEquals("Unexpected message received", 0, result.getIntProperty(INDEX));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/799247db/test-profiles/Java10Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10Excludes b/test-profiles/Java10Excludes
index 4b385c1..99db5ce 100644
--- a/test-profiles/Java10Excludes
+++ b/test-profiles/Java10Excludes
@@ -20,11 +20,6 @@
// Exclude client test of initial context factory, as the 1.0 profile uses the 1.0 context factory
org.apache.qpid.jndi.PropertiesFileInitialContextFactoryTest#*
-// Uses an 0-x client API to acknowledge up to a particular message rather than the most recent
-org.apache.qpid.test.unit.ack.RecoverTest#testRecoverResendsMsgsAckOnEarlier
-// Tests the effect of setting the prefetch value
-org.apache.qpid.test.unit.ack.RecoverTest#testRecoverSessionAfterCreditExhausted
-
// The binding logging tests focus on the behaviour of the old client with regard to creating (and binding) queues on
// the creation of consumers.
org.apache.qpid.server.logging.BindingLoggingTest#*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org