You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:17 UTC
[08/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the
openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
index f80b09a..564fd86 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
@@ -20,7 +20,6 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,21 +34,23 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
-import junit.framework.TestCase;
-
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.Queue;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
/**
* TestCase showing the message-destroying described in AMQ-1925
*/
-public class AMQ1925Test extends TestCase implements ExceptionListener {
+public class AMQ1925Test extends OpenwireArtemisBaseTest implements ExceptionListener {
private static final Logger log = Logger.getLogger(AMQ1925Test.class);
@@ -57,7 +58,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
private static final String PROPERTY_MSG_NUMBER = "NUMBER";
private static final int MESSAGE_COUNT = 10000;
- private BrokerService bs;
+ private EmbeddedJMS bs;
private URI tcpUri;
private ActiveMQConnectionFactory cf;
@@ -74,17 +75,13 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
final CountDownLatch starter = new CountDownLatch(1);
final AtomicBoolean restarted = new AtomicBoolean();
new Thread(new Runnable() {
- @Override
public void run() {
try {
starter.await();
// Simulate broker failure & restart
bs.stop();
- bs = new BrokerService();
- bs.setPersistent(true);
- bs.setUseJmx(true);
- bs.addConnector(tcpUri);
+ bs = createNewServer();
bs.start();
restarted.set(true);
@@ -97,21 +94,21 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message = consumer.receive(500);
- assertNotNull("No Message " + i + " found", message);
+ Assert.assertNotNull("No Message " + i + " found", message);
if (i < 10)
- assertFalse("Timing problem, restarted too soon", restarted.get());
+ Assert.assertFalse("Timing problem, restarted too soon", restarted.get());
if (i == 10) {
starter.countDown();
}
if (i > MESSAGE_COUNT - 100) {
- assertTrue("Timing problem, restarted too late", restarted.get());
+ Assert.assertTrue("Timing problem, restarted too late", restarted.get());
}
- assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
+ Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
session.commit();
}
- assertNull(consumer.receive(500));
+ Assert.assertNull(consumer.receive(500));
consumer.close();
session.close();
@@ -133,17 +130,13 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
final CountDownLatch starter = new CountDownLatch(1);
final AtomicBoolean restarted = new AtomicBoolean();
new Thread(new Runnable() {
- @Override
public void run() {
try {
starter.await();
// Simulate broker failure & restart
bs.stop();
- bs = new BrokerService();
- bs.setPersistent(true);
- bs.setUseJmx(true);
- bs.addConnector(tcpUri);
+ bs = createNewServer();
bs.start();
restarted.set(true);
@@ -172,12 +165,12 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
}
if (i < 10)
- assertFalse("Timing problem, restarted too soon", restarted.get());
+ Assert.assertFalse("Timing problem, restarted too soon", restarted.get());
if (i == 10) {
starter.countDown();
}
if (i > MESSAGE_COUNT - 50) {
- assertTrue("Timing problem, restarted too late", restarted.get());
+ Assert.assertTrue("Timing problem, restarted too late", restarted.get());
}
if (message1 != null) {
@@ -189,8 +182,8 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
session2.commit();
}
}
- assertNull(consumer1.receive(500));
- assertNull(consumer2.receive(500));
+ Assert.assertNull(consumer1.receive(500));
+ Assert.assertNull(consumer2.receive(500));
consumer1.close();
session1.close();
@@ -203,7 +196,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
foundMissingMessages = tryToFetchMissingMessages();
}
for (int i = 0; i < MESSAGE_COUNT; i++) {
- assertTrue("Message-Nr " + i + " not found (" + results.size() + " total, " + foundMissingMessages + " have been found 'lingering' in the queue)", results.contains(i));
+ Assert.assertTrue("Message-Nr " + i + " not found (" + results.size() + " total, " + foundMissingMessages + " have been found 'lingering' in the queue)", results.contains(i));
}
assertQueueEmpty();
}
@@ -231,6 +224,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
return count;
}
+ @Test
public void testAMQ1925_TXBegin() throws Exception {
Connection connection = cf.createConnection();
connection.start();
@@ -239,40 +233,45 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
boolean restartDone = false;
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message message = consumer.receive(5000);
- assertNotNull(message);
+ try {
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message message = consumer.receive(5000);
+ Assert.assertNotNull(message);
- if (i == 222 && !restartDone) {
- // Simulate broker failure & restart
- bs.stop();
- bs = new BrokerService();
- bs.setPersistent(true);
- bs.setUseJmx(true);
- bs.addConnector(tcpUri);
- bs.start();
- restartDone = true;
- }
+ if (i == 222 && !restartDone) {
+ // Simulate broker failure & restart
+ bs.stop();
+ bs = createNewServer();
+ bs.start();
+ restartDone = true;
+ }
- assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
- try {
- session.commit();
- }
- catch (TransactionRolledBackException expectedOnOccasion) {
- log.info("got rollback: " + expectedOnOccasion);
- i--;
+ Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
+ try {
+ session.commit();
+ }
+ catch (TransactionRolledBackException expectedOnOccasion) {
+ log.info("got rollback: " + expectedOnOccasion);
+ i--;
+ }
}
+ Assert.assertNull(consumer.receive(500));
+ }
+ catch (Exception eee) {
+ log.error("got exception", eee);
+ throw eee;
+ }
+ finally {
+ consumer.close();
+ session.close();
+ connection.close();
}
- assertNull(consumer.receive(500));
-
- consumer.close();
- session.close();
- connection.close();
assertQueueEmpty();
- assertNull("no exception on connection listener: " + exception, exception);
+ Assert.assertNull("no exception on connection listener: " + exception, exception);
}
+ @Test
public void testAMQ1925_TXCommited() throws Exception {
Connection connection = cf.createConnection();
connection.start();
@@ -281,22 +280,19 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message = consumer.receive(5000);
- assertNotNull(message);
+ Assert.assertNotNull(message);
- assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
+ Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
session.commit();
if (i == 222) {
// Simulate broker failure & restart
bs.stop();
- bs = new BrokerService();
- bs.setPersistent(true);
- bs.setUseJmx(true);
- bs.addConnector(tcpUri);
+ bs = createNewServer();
bs.start();
}
}
- assertNull(consumer.receive(500));
+ Assert.assertNull(consumer.receive(500));
consumer.close();
session.close();
@@ -313,7 +309,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
Message msg = consumer.receive(500);
if (msg != null) {
- fail(msg.toString());
+ Assert.fail(msg.toString());
}
consumer.close();
@@ -324,9 +320,12 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
}
private void assertQueueLength(int len) throws Exception, IOException {
- Set<Destination> destinations = bs.getBroker().getDestinations(new ActiveMQQueue(QUEUE_NAME));
- Queue queue = (Queue) destinations.iterator().next();
- assertEquals(len, queue.getMessageStore().getMessageCount());
+ QueueImpl queue = (QueueImpl) bs.getActiveMQServer().getPostOffice().getBinding(new SimpleString("jms.queue." + QUEUE_NAME)).getBindable();
+ if (len > queue.getMessageCount()) {
+ //we wait for a moment as the tx might still in afterCommit stage (async op)
+ Thread.sleep(5000);
+ }
+ Assert.assertEquals(len, queue.getMessageCount());
}
private void sendMessagesToQueue() throws Exception {
@@ -349,30 +348,40 @@ public class AMQ1925Test extends TestCase implements ExceptionListener {
assertQueueLength(MESSAGE_COUNT);
}
- @Override
- protected void setUp() throws Exception {
+ @Before
+ public void setUp() throws Exception {
exception = null;
- bs = new BrokerService();
- bs.setDeleteAllMessagesOnStartup(true);
- bs.setPersistent(true);
- bs.setUseJmx(true);
- TransportConnector connector = bs.addConnector("tcp://localhost:0");
+ bs = createNewServer();
bs.start();
- tcpUri = connector.getConnectUri();
+ //auto created queue can't survive a restart, so we need this
+ bs.getJMSServerManager().createQueue(false, QUEUE_NAME, null, true, QUEUE_NAME);
+
+ tcpUri = new URI(newURI(0));
cf = new ActiveMQConnectionFactory("failover://(" + tcpUri + ")");
sendMessagesToQueue();
}
- @Override
- protected void tearDown() throws Exception {
- new ServiceStopper().stop(bs);
+ @After
+ public void tearDown() throws Exception {
+ try {
+ if (bs != null) {
+ bs.stop();
+ bs = null;
+ }
+ } catch (Exception e) {
+ log.error(e);
+ }
}
- @Override
public void onException(JMSException exception) {
this.exception = exception;
}
+ private EmbeddedJMS createNewServer() throws Exception {
+ Configuration config = createConfig("localhost", 0);
+ EmbeddedJMS server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
+ return server;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java
deleted file mode 100644
index 8cac09a..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.failover;
-
-import java.io.IOException;
-import java.net.URI;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class BadConnectionTest extends TestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(BadConnectionTest.class);
-
- protected Transport transport;
-
- public void testConnectingToUnavailableServer() throws Exception {
- try {
- transport.asyncRequest(new ActiveMQMessage(), null);
- fail("This should never succeed");
- }
- catch (IOException e) {
- LOG.info("Caught expected exception: " + e, e);
- }
- }
-
- protected Transport createTransport() throws Exception {
- return TransportFactory.connect(new URI("failover://(tcp://doesNotExist:1234)?useExponentialBackOff=false&maxReconnectAttempts=3&initialReconnectDelay=100"));
- }
-
- @Override
- protected void setUp() throws Exception {
- transport = createTransport();
- transport.setTransportListener(new TransportListener() {
-
- @Override
- public void onCommand(Object command) {
- }
-
- @Override
- public void onException(IOException error) {
- }
-
- @Override
- public void transportInterupted() {
- }
-
- @Override
- public void transportResumed() {
- }
- });
- transport.start();
- }
-
- @Override
- protected void tearDown() throws Exception {
- if (transport != null) {
- transport.stop();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ConnectionHangOnStartupTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ConnectionHangOnStartupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ConnectionHangOnStartupTest.java
index 110e2fc..99e22b4 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ConnectionHangOnStartupTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ConnectionHangOnStartupTest.java
@@ -22,18 +22,20 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.core.io.ClassPathResource;
/**
* Tests for AMQ-3719
*/
-public class ConnectionHangOnStartupTest {
+public class ConnectionHangOnStartupTest extends OpenwireArtemisBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(ConnectionHangOnStartupTest.class);
@@ -41,13 +43,13 @@ public class ConnectionHangOnStartupTest {
// maxReconnectDelay so that the test runs faster (because it will retry
// connection sooner)
protected String uriString = "failover://(tcp://localhost:62001?wireFormat.maxInactivityDurationInitalDelay=1,tcp://localhost:62002?wireFormat.maxInactivityDurationInitalDelay=1)?randomize=false&maxReconnectDelay=200";
- protected BrokerService master = null;
- protected AtomicReference<BrokerService> slave = new AtomicReference<>();
+ protected EmbeddedJMS master = null;
+ protected AtomicReference<EmbeddedJMS> slave = new AtomicReference<EmbeddedJMS>();
@After
public void tearDown() throws Exception {
- BrokerService brokerService = slave.get();
+ EmbeddedJMS brokerService = slave.get();
if (brokerService != null) {
brokerService.stop();
}
@@ -60,28 +62,18 @@ public class ConnectionHangOnStartupTest {
}
protected void createMaster() throws Exception {
- BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getMasterXml()));
- brokerFactory.afterPropertiesSet();
- master = brokerFactory.getBroker();
+ Configuration config = createConfig("localhost", 0, 62001);
+ master = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
master.start();
}
protected void createSlave() throws Exception {
- BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getSlaveXml()));
- brokerFactory.afterPropertiesSet();
- BrokerService broker = brokerFactory.getBroker();
+ Configuration config = createConfig("localhost", 1, 62002);
+ EmbeddedJMS broker = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
broker.start();
slave.set(broker);
}
- protected String getSlaveXml() {
- return "org/apache/activemq/broker/ft/sharedFileSlave.xml";
- }
-
- protected String getMasterXml() {
- return "org/apache/activemq/broker/ft/sharedFileMaster.xml";
- }
-
@Test(timeout = 60000)
public void testInitialWireFormatNegotiationTimeout() throws Exception {
final AtomicReference<Connection> conn = new AtomicReference<>();
@@ -102,10 +94,10 @@ public class ConnectionHangOnStartupTest {
};
t.start();
createMaster();
+
// slave will never start unless the master dies!
//createSlave();
- conn.get().stop();
+ conn.get().close();
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverBackupLeakTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverBackupLeakTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverBackupLeakTest.java
index cf1d43d..0875a61 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverBackupLeakTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverBackupLeakTest.java
@@ -22,58 +22,59 @@ import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.artemis.api.jms.management.JMSServerControl;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.management.ManagementService;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.util.Wait;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import java.util.concurrent.TimeUnit;
+
/**
* Ensures connections aren't leaked when when we use backup=true and randomize=false
*/
-public class FailoverBackupLeakTest {
+public class FailoverBackupLeakTest extends OpenwireArtemisBaseTest {
+
+ private EmbeddedJMS s1, s2;
+
+ @Before
+ public void setUp() throws Exception {
- private static BrokerService s1, s2;
+ Configuration config0 = createConfig("127.0.0.1", 0);
+ Configuration config1 = createConfig("127.0.0.1", 1);
- @BeforeClass
- public static void setUp() throws Exception {
- s1 = buildBroker("broker1");
- s2 = buildBroker("broker2");
+ deployClusterConfiguration(config0, 1);
+ deployClusterConfiguration(config1, 0);
+ s1 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ s2 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
s1.start();
- s1.waitUntilStarted();
s2.start();
- s2.waitUntilStarted();
+
+ Assert.assertTrue(s1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(s2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
}
- @AfterClass
- public static void tearDown() throws Exception {
+ @After
+ public void tearDown() throws Exception {
if (s2 != null) {
s2.stop();
- s2.waitUntilStopped();
}
if (s1 != null) {
s1.stop();
- s1.waitUntilStopped();
}
}
- private static String getConnectString(BrokerService service) throws Exception {
- return service.getTransportConnectors().get(0).getPublishableConnectString();
- }
-
- private static BrokerService buildBroker(String brokerName) throws Exception {
- BrokerService service = new BrokerService();
- service.setBrokerName(brokerName);
- service.setUseJmx(false);
- service.setPersistent(false);
- service.setUseShutdownHook(false);
- service.addConnector("tcp://0.0.0.0:0?transport.closeAsync=false");
- return service;
- }
-
@Test
public void backupNoRandomize() throws Exception {
check("backup=true&randomize=false");
@@ -85,9 +86,12 @@ public class FailoverBackupLeakTest {
}
private void check(String connectionProperties) throws Exception {
- String s1URL = getConnectString(s1), s2URL = getConnectString(s2);
+ String s1URL = newURI(0), s2URL = newURI(1);
String uri = "failover://(" + s1URL + "," + s2URL + ")?" + connectionProperties;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
+ final int initCount1 = getConnectionCount(s1);
+ final int initCount2 = getConnectionCount(s2);
+
for (int i = 0; i < 10; i++) {
buildConnection(factory);
}
@@ -96,7 +100,7 @@ public class FailoverBackupLeakTest {
@Override
public boolean isSatisified() throws Exception {
- return getConnectionCount(s1) == 0;
+ return getConnectionCount(s1) == initCount1;
}
}));
@@ -104,16 +108,22 @@ public class FailoverBackupLeakTest {
@Override
public boolean isSatisified() throws Exception {
- return getConnectionCount(s2) == 0;
+ return getConnectionCount(s2) == initCount2;
}
}));
}
- private int getConnectionCount(BrokerService service) {
- return service.getTransportConnectors().get(0).getConnections().size();
+ private int getConnectionCount(EmbeddedJMS server) throws Exception {
+ ManagementService managementService = server.getActiveMQServer().getManagementService();
+ JMSServerControl jmsControl = (JMSServerControl) managementService.getResource("jms.server");
+ String[] ids = jmsControl.listConnectionIDs();
+ if (ids != null) {
+ return ids.length;
+ }
+ return 0;
}
- private void buildConnection(ConnectionFactory local) throws JMSException {
+ private void buildConnection(ConnectionFactory local) throws Exception {
Connection conn = null;
Session sess = null;
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
index c0c529d..74fa6aa 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
@@ -16,146 +16,134 @@
*/
package org.apache.activemq.transport.failover;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
import java.net.URI;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
+import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
-public class FailoverClusterTest extends TestCase {
+public class FailoverClusterTest extends OpenwireArtemisBaseTest {
private static final int NUMBER = 10;
- private static final String BROKER_BIND_ADDRESS = "tcp://0.0.0.0:0";
- private static final String BROKER_A_NAME = "BROKERA";
- private static final String BROKER_B_NAME = "BROKERB";
- private BrokerService brokerA;
- private BrokerService brokerB;
private String clientUrl;
private final List<ActiveMQConnection> connections = new ArrayList<>();
+ EmbeddedJMS server1;
+ EmbeddedJMS server2;
+
+
+ @Before
+ public void setUp() throws Exception {
+ Map<String, String> params = new HashMap<String, String>();
+
+ params.put("rebalanceClusterClients", "true");
+ params.put("updateClusterClients", "true");
+
+ Configuration config1 = createConfig("localhost", 1, params);
+ Configuration config2 = createConfig("localhost", 2, params);
+ deployClusterConfiguration(config1, 2);
+ deployClusterConfiguration(config2, 1);
+
+ server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ server2 = new EmbeddedJMS().setConfiguration(config2).setJmsConfiguration(new JMSConfigurationImpl());
+
+ clientUrl = "failover://(" + newURI(1) + "," + newURI(2) + ")";
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ for (Connection c : connections) {
+ c.close();
+ }
+ server1.stop();
+ server2.stop();
+ }
+
+ @Test
public void testClusterConnectedAfterClients() throws Exception {
+ server1.start();
createClients();
- if (brokerB == null) {
- brokerB = createBrokerB(BROKER_BIND_ADDRESS);
- }
- Thread.sleep(3000);
Set<String> set = new HashSet<>();
+ server2.start();
+ Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(server2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+
+ Thread.sleep(3000);
+
for (ActiveMQConnection c : connections) {
+ System.out.println("======> adding address: " + c.getTransportChannel().getRemoteAddress());
set.add(c.getTransportChannel().getRemoteAddress());
}
- assertTrue(set.size() > 1);
+ System.out.println("============final size: " + set.size());
+ Assert.assertTrue(set.size() > 1);
}
+ //this test seems the same as the above one as long as artemis broker
+ //is concerned.
+ @Test
public void testClusterURIOptionsStrip() throws Exception {
+ server1.start();
+
createClients();
- if (brokerB == null) {
- // add in server side only url param, should not be propagated
- brokerB = createBrokerB(BROKER_BIND_ADDRESS + "?transport.closeAsync=false");
- }
+ server2.start();
+ Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(server2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+
Thread.sleep(3000);
- Set<String> set = new HashSet<>();
+
+ Set<String> set = new HashSet<String>();
for (ActiveMQConnection c : connections) {
set.add(c.getTransportChannel().getRemoteAddress());
}
- assertTrue(set.size() > 1);
+ Assert.assertTrue(set.size() > 1);
}
+ @Test
public void testClusterConnectedBeforeClients() throws Exception {
- if (brokerB == null) {
- brokerB = createBrokerB(BROKER_BIND_ADDRESS);
- }
- Thread.sleep(5000);
+ server1.start();
+ server2.start();
+ Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(server2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+
createClients();
- Thread.sleep(2000);
- brokerA.stop();
- Thread.sleep(2000);
+ server1.stop();
+ Thread.sleep(1000);
- URI brokerBURI = new URI(brokerB.getTransportConnectors().get(0).getPublishableConnectString());
+ URI brokerBURI = new URI(newURI(2));
for (ActiveMQConnection c : connections) {
String addr = c.getTransportChannel().getRemoteAddress();
- assertTrue(addr.indexOf("" + brokerBURI.getPort()) > 0);
- }
- }
-
- @Override
- protected void setUp() throws Exception {
- if (brokerA == null) {
- brokerA = createBrokerA(BROKER_BIND_ADDRESS + "?transport.closeAsync=false");
- clientUrl = "failover://(" + brokerA.getTransportConnectors().get(0).getPublishableConnectString() + ")";
- }
- }
-
- @Override
- protected void tearDown() throws Exception {
- for (Connection c : connections) {
- c.close();
- }
- if (brokerB != null) {
- brokerB.stop();
- brokerB = null;
+ Assert.assertTrue(addr.indexOf("" + brokerBURI.getPort()) > 0);
}
- if (brokerA != null) {
- brokerA.stop();
- brokerA = null;
- }
- }
-
- protected BrokerService createBrokerA(String uri) throws Exception {
- BrokerService answer = new BrokerService();
- answer.setUseJmx(false);
- configureConsumerBroker(answer, uri);
- answer.start();
- return answer;
- }
-
- protected void configureConsumerBroker(BrokerService answer, String uri) throws Exception {
- answer.setBrokerName(BROKER_A_NAME);
- answer.setPersistent(false);
- TransportConnector connector = answer.addConnector(uri);
- connector.setRebalanceClusterClients(true);
- connector.setUpdateClusterClients(true);
- answer.setUseShutdownHook(false);
- }
-
- protected BrokerService createBrokerB(String uri) throws Exception {
- BrokerService answer = new BrokerService();
- answer.setUseJmx(false);
- configureNetwork(answer, uri);
- answer.start();
- return answer;
- }
-
- protected void configureNetwork(BrokerService answer, String uri) throws Exception {
- answer.setBrokerName(BROKER_B_NAME);
- answer.setPersistent(false);
- NetworkConnector network = answer.addNetworkConnector("static://" + brokerA.getTransportConnectors().get(0).getPublishableConnectString());
- network.setDuplex(true);
- TransportConnector connector = answer.addConnector(uri);
- connector.setRebalanceClusterClients(true);
- connector.setUpdateClusterClients(true);
- answer.setUseShutdownHook(false);
}
- @SuppressWarnings("unused")
protected void createClients() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
for (int i = 0; i < NUMBER; i++) {
+ System.out.println("*****create connection using url: " + clientUrl);
ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
+ System.out.println("got connection, starting it ...");
c.start();
+ System.out.println("******Started");
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
index 53f0689..fd9ce1f 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
@@ -16,7 +16,29 @@
*/
package org.apache.activemq.transport.failover;
-import org.apache.activemq.broker.TransportConnector;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
/**
* Complex cluster test that will exercise the dynamic failover capabilities of
@@ -25,36 +47,77 @@ import org.apache.activemq.broker.TransportConnector;
* connections on the client should start with 3, then have two after the 3rd
* broker is removed and then show 3 after the 3rd broker is reintroduced.
*/
-public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
+public class FailoverComplexClusterTest extends OpenwireArtemisBaseTest {
private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
- private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618";
- private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://127.0.0.1:61626";
- private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://127.0.0.1:61627";
- private static final String BROKER_C_NOB_TC_ADDRESS = "tcp://127.0.0.1:61628";
- private static final String BROKER_A_NAME = "BROKERA";
- private static final String BROKER_B_NAME = "BROKERB";
- private static final String BROKER_C_NAME = "BROKERC";
+
+ private String clientUrl;
+ private EmbeddedJMS[] servers = new EmbeddedJMS[3];
+
+ private static final int NUMBER_OF_CLIENTS = 30;
+ private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
+
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ //default setup for most tests
+ private void commonSetup() throws Exception {
+ Map<String, String> params = new HashMap<String, String>();
+
+ params.put("rebalanceClusterClients", "true");
+ params.put("updateClusterClients", "true");
+ params.put("updateClusterClientsOnRemove", "true");
+
+ Configuration config0 = createConfig("localhost", 0, params);
+ Configuration config1 = createConfig("localhost", 1, params);
+ Configuration config2 = createConfig("localhost", 2, params);
+
+ deployClusterConfiguration(config0, 1, 2);
+ deployClusterConfiguration(config1, 0, 2);
+ deployClusterConfiguration(config2, 0, 1);
+
+ servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ servers[2] = new EmbeddedJMS().setConfiguration(config2).setJmsConfiguration(new JMSConfigurationImpl());
+
+ servers[0].start();
+ servers[1].start();
+ servers[2].start();
+
+ Assert.assertTrue(servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3));
+ Assert.assertTrue(servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3));
+ Assert.assertTrue(servers[2].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ shutdownClients();
+ for (EmbeddedJMS server : servers) {
+ if (server != null) {
+ server.stop();
+ }
+ }
+ }
/**
* Basic dynamic failover 3 broker test
*
* @throws Exception
*/
+ @Test
public void testThreeBrokerClusterSingleConnectorBasic() throws Exception {
-
- initSingleTcBroker("", null, null);
-
- Thread.sleep(2000);
-
+ commonSetup();
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
createClients();
- Thread.sleep(2000);
+ Thread.sleep(3000);
runTests(false, null, null, null);
}
+
/**
* Tests a 3 broker configuration to ensure that the backup is random and
* supported in a cluster. useExponentialBackOff is set to false and
@@ -63,10 +126,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
*
* @throws Exception
*/
+ @Test
public void testThreeBrokerClusterSingleConnectorBackupFailoverConfig() throws Exception {
-
- initSingleTcBroker("", null, null);
-
+ commonSetup();
Thread.sleep(2000);
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?backup=true&backupPoolSize=2&useExponentialBackOff=false&initialReconnectDelay=500");
@@ -84,10 +146,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
*
* @throws Exception
*/
+ @Test
public void testThreeBrokerClusterSingleConnectorWithParams() throws Exception {
-
- initSingleTcBroker("?transport.closeAsync=false", null, null);
-
+ commonSetup();
Thread.sleep(2000);
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
createClients();
@@ -101,10 +162,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
*
* @throws Exception
*/
+ @Test
public void testThreeBrokerClusterWithClusterFilter() throws Exception {
-
- initSingleTcBroker("?transport.closeAsync=false", null, null);
-
+ commonSetup();
Thread.sleep(2000);
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
createClients();
@@ -118,10 +178,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
*
* @throws Exception
*/
+ @Test
public void testThreeBrokerClusterMultipleConnectorBasic() throws Exception {
-
- initMultiTcCluster("", null);
-
+ commonSetup();
Thread.sleep(2000);
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
@@ -136,9 +195,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
*
* @throws Exception
*/
+ @Test
public void testOriginalBrokerRestart() throws Exception {
- initSingleTcBroker("", null, null);
-
+ commonSetup();
Thread.sleep(2000);
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
@@ -147,16 +206,13 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
assertClientsConnectedToThreeBrokers();
- getBroker(BROKER_A_NAME).stop();
- getBroker(BROKER_A_NAME).waitUntilStopped();
- removeBroker(BROKER_A_NAME);
+ stopServer(0);
Thread.sleep(5000);
assertClientsConnectedToTwoBrokers();
- createBrokerA(false, null, null, null);
- getBroker(BROKER_A_NAME).waitUntilStarted();
+ restartServer(0);
Thread.sleep(5000);
assertClientsConnectedToThreeBrokers();
@@ -168,10 +224,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
*
* @throws Exception
*/
+ @Test
public void testThreeBrokerClusterClientDistributions() throws Exception {
-
- initSingleTcBroker("", null, null);
-
+ commonSetup();
Thread.sleep(2000);
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false&initialReconnectDelay=500");
createClients(100);
@@ -186,10 +241,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
*
* @throws Exception
*/
+ @Test
public void testThreeBrokerClusterDestinationFilter() throws Exception {
-
- initSingleTcBroker("", null, null);
-
+ commonSetup();
Thread.sleep(2000);
setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
createClients();
@@ -197,28 +251,25 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
runTests(false, null, null, "Queue.TEST.FOO.>");
}
+ @Test
public void testFailOverWithUpdateClientsOnRemove() throws Exception {
// Broker A
- addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
- TransportConnector connectorA = getBroker(BROKER_A_NAME).addConnector(BROKER_A_CLIENT_TC_ADDRESS);
- connectorA.setName("openwire");
- connectorA.setRebalanceClusterClients(true);
- connectorA.setUpdateClusterClients(true);
- connectorA.setUpdateClusterClientsOnRemove(true); //If set to false the test succeeds.
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
- getBroker(BROKER_A_NAME).start();
-
+ Configuration config0 = createConfig(0, "?rebalanceClusterClients=true&updateClusterClients=true&updateClusterClientsOnRemove=true");
// Broker B
- addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
- TransportConnector connectorB = getBroker(BROKER_B_NAME).addConnector(BROKER_B_CLIENT_TC_ADDRESS);
- connectorB.setName("openwire");
- connectorB.setRebalanceClusterClients(true);
- connectorB.setUpdateClusterClients(true);
- connectorB.setUpdateClusterClientsOnRemove(true); //If set to false the test succeeds.
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
- getBroker(BROKER_B_NAME).start();
-
- getBroker(BROKER_B_NAME).waitUntilStarted();
+ Configuration config1 = createConfig(1, "?rebalanceClusterClients=true&updateClusterClients=true&updateClusterClientsOnRemove=true");
+
+ deployClusterConfiguration(config0, 1);
+ deployClusterConfiguration(config1, 0);
+
+ servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ servers[0].start();
+
+ servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ servers[1].start();
+
+ servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2);
+ servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2);
+
Thread.sleep(1000);
// create client connecting only to A. It should receive broker B address whet it connects to A.
@@ -227,9 +278,9 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
Thread.sleep(5000);
// We stop broker A.
- logger.info("Stopping broker A whose address is: {}", BROKER_A_CLIENT_TC_ADDRESS);
- getBroker(BROKER_A_NAME).stop();
- getBroker(BROKER_A_NAME).waitUntilStopped();
+ servers[0].stop();
+ servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 1);
+
Thread.sleep(5000);
// Client should failover to B.
@@ -258,138 +309,150 @@ public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
String destinationFilter) throws Exception, InterruptedException {
assertClientsConnectedToThreeBrokers();
- getBroker(BROKER_C_NAME).stop();
- getBroker(BROKER_C_NAME).waitUntilStopped();
- removeBroker(BROKER_C_NAME);
+ stopServer(2);
Thread.sleep(5000);
assertClientsConnectedToTwoBrokers();
- createBrokerC(multi, tcParams, clusterFilter, destinationFilter);
- getBroker(BROKER_C_NAME).waitUntilStarted();
+ restartServer(2);
+
Thread.sleep(5000);
assertClientsConnectedToThreeBrokers();
}
- /**
- * @param multi
- * @param tcParams
- * @param clusterFilter
- * @param destinationFilter
- * @throws Exception
- * @throws InterruptedException
- */
+ public void setClientUrl(String clientUrl) {
+ this.clientUrl = clientUrl;
+ }
+
+ protected void createClients() throws Exception {
+ createClients(NUMBER_OF_CLIENTS);
+ }
+
+ protected void createClients(int numOfClients) throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
+ for (int i = 0; i < numOfClients; i++) {
+ ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
+ c.start();
+ Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = s.createQueue(getClass().getName());
+ MessageConsumer consumer = s.createConsumer(queue);
+ connections.add(c);
+ }
+ }
+
+ protected void shutdownClients() throws JMSException {
+ for (Connection c : connections) {
+ c.close();
+ }
+ }
+
+ protected void assertClientsConnectedToThreeBrokers() {
+ Set<String> set = new HashSet<String>();
+ for (ActiveMQConnection c : connections) {
+ if (c.getTransportChannel().getRemoteAddress() != null) {
+ set.add(c.getTransportChannel().getRemoteAddress());
+ }
+ }
+ Assert.assertTrue("Only 3 connections should be found: " + set, set.size() == 3);
+ }
+
+ protected void assertClientsConnectedToTwoBrokers() {
+ Set<String> set = new HashSet<String>();
+ for (ActiveMQConnection c : connections) {
+ if (c.getTransportChannel().getRemoteAddress() != null) {
+ set.add(c.getTransportChannel().getRemoteAddress());
+ }
+ }
+ Assert.assertTrue("Only 2 connections should be found: " + set, set.size() == 2);
+ }
+
+ private void stopServer(int serverID) throws Exception {
+ servers[serverID].stop();
+ for (int i = 0; i < servers.length; i++) {
+ if (i != serverID) {
+ Assert.assertTrue(servers[i].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, servers.length - 1));
+ }
+ }
+ }
+
+ private void restartServer(int serverID) throws Exception {
+ servers[serverID].start();
+
+ for (int i = 0; i < servers.length; i++) {
+ Assert.assertTrue(servers[i].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, servers.length));
+ }
+ }
+
private void runClientDistributionTests(boolean multi,
String tcParams,
String clusterFilter,
String destinationFilter) throws Exception, InterruptedException {
assertClientsConnectedToThreeBrokers();
- assertClientsConnectionsEvenlyDistributed(.25);
+ //if 2/3 or more of total connections connect to one node, we consider it wrong
+ //if 1/4 or less of total connects to one node, we consider it wrong
+ assertClientsConnectionsEvenlyDistributed(.25, .67);
- getBroker(BROKER_C_NAME).stop();
- getBroker(BROKER_C_NAME).waitUntilStopped();
- removeBroker(BROKER_C_NAME);
+ stopServer(2);
Thread.sleep(5000);
assertClientsConnectedToTwoBrokers();
- assertClientsConnectionsEvenlyDistributed(.35);
+ //now there are only 2 nodes
+ //if 2/3 or more of total connections go to either node, we consider it wrong
+ //if 1/3 or less of total connections go to either node, we consider it wrong
+ assertClientsConnectionsEvenlyDistributed(.34, .67);
- createBrokerC(multi, tcParams, clusterFilter, destinationFilter);
- getBroker(BROKER_C_NAME).waitUntilStarted();
+ restartServer(2);
Thread.sleep(5000);
assertClientsConnectedToThreeBrokers();
- assertClientsConnectionsEvenlyDistributed(.20);
- }
-
- @Override
- protected void setUp() throws Exception {
+ //now back to 3 nodes. We assume at least the new node will
+ //have 1/10 of the total connections, and any node's connections
+ //won't exceed 50%
+ assertClientsConnectionsEvenlyDistributed(.10, .50);
}
- @Override
- protected void tearDown() throws Exception {
- shutdownClients();
- Thread.sleep(2000);
- destroyBrokerCluster();
- }
-
- private void initSingleTcBroker(String params, String clusterFilter, String destinationFilter) throws Exception {
- createBrokerA(false, params, clusterFilter, null);
- createBrokerB(false, params, clusterFilter, null);
- createBrokerC(false, params, clusterFilter, null);
- getBroker(BROKER_C_NAME).waitUntilStarted();
- }
-
- private void initMultiTcCluster(String params, String clusterFilter) throws Exception {
- createBrokerA(true, params, clusterFilter, null);
- createBrokerB(true, params, clusterFilter, null);
- createBrokerC(true, params, clusterFilter, null);
- getBroker(BROKER_C_NAME).waitUntilStarted();
- }
-
- private void createBrokerA(boolean multi,
- String params,
- String clusterFilter,
- String destinationFilter) throws Exception {
- final String tcParams = (params == null) ? "" : params;
- if (getBroker(BROKER_A_NAME) == null) {
- addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
- addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + tcParams, true);
- if (multi) {
- addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS + tcParams, false);
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
- }
- else {
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+ protected void assertClientsConnectionsEvenlyDistributed(double minimumPercentage, double maximumPercentage) {
+ Map<String, Double> clientConnectionCounts = new HashMap<String, Double>();
+ int total = 0;
+ for (ActiveMQConnection c : connections) {
+ String key = c.getTransportChannel().getRemoteAddress();
+ if (key != null) {
+ total++;
+ if (clientConnectionCounts.containsKey(key)) {
+ double count = clientConnectionCounts.get(key);
+ count += 1.0;
+ clientConnectionCounts.put(key, count);
+ }
+ else {
+ clientConnectionCounts.put(key, 1.0);
+ }
}
- getBroker(BROKER_A_NAME).start();
}
- }
-
- private void createBrokerB(boolean multi,
- String params,
- String clusterFilter,
- String destinationFilter) throws Exception {
- final String tcParams = (params == null) ? "" : params;
- if (getBroker(BROKER_B_NAME) == null) {
- addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
- addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + tcParams, true);
- if (multi) {
- addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS + tcParams, false);
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+ Set<String> keys = clientConnectionCounts.keySet();
+ List<String> errorMsgs = new ArrayList<String>();
+ for (String key : keys) {
+ double count = clientConnectionCounts.get(key);
+ double percentage = count / total;
+ if (percentage < minimumPercentage || percentage > maximumPercentage) {
+ errorMsgs.add("Connections distribution expected to be within range [ " + minimumPercentage
+ + ", " + maximumPercentage + "]. Actuall distribution was " + percentage + " for connection " + key);
}
- else {
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+ if (errorMsgs.size() > 0) {
+ for (String err : errorMsgs) {
+ System.err.println(err);
+ }
+ Assert.fail("Test failed. Please see the log message for details");
}
- getBroker(BROKER_B_NAME).start();
}
}
- private void createBrokerC(boolean multi,
- String params,
- String clusterFilter,
- String destinationFilter) throws Exception {
- final String tcParams = (params == null) ? "" : params;
- if (getBroker(BROKER_C_NAME) == null) {
- addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
- addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS + tcParams, true);
- if (multi) {
- addTransportConnector(getBroker(BROKER_C_NAME), "network", BROKER_C_NOB_TC_ADDRESS + tcParams, false);
- addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
- addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
- }
- else {
- addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
- addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
- }
- getBroker(BROKER_C_NAME).start();
+ protected void assertAllConnectedTo(String url) throws Exception {
+ for (ActiveMQConnection c : connections) {
+ Assert.assertEquals(url, c.getTransportChannel().getRemoteAddress());
}
}
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
index e33e7ea..40cbccb 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,7 +24,6 @@ import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -39,63 +38,55 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.After;
import org.junit.Test;
-public class FailoverConsumerOutstandingCommitTest {
+@RunWith(BMUnitRunner.class)
+public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerOutstandingCommitTest.class);
private static final String QUEUE_NAME = "FailoverWithOutstandingCommit";
private static final String MESSAGE_TEXT = "Test message ";
- private static final String TRANSPORT_URI = "tcp://localhost:0";
- private String url;
+ private static final String url = newURI(0);
final int prefetch = 10;
- BrokerService broker;
+ private static EmbeddedJMS server;
+ private static final AtomicBoolean doByteman = new AtomicBoolean(false);
+ private static CountDownLatch brokerStopLatch = new CountDownLatch(1);
@After
public void stopBroker() throws Exception {
- if (broker != null) {
- broker.stop();
+ if (server != null) {
+ server.stop();
}
}
- public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
- broker = createBroker(deleteAllMessagesOnStartup);
- broker.start();
- }
-
- public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
- return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
- }
-
- public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
- broker = new BrokerService();
- broker.addConnector(bindAddress);
- broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry defaultEntry = new PolicyEntry();
-
- // optimizedDispatche and sync dispatch ensure that the dispatch happens
- // before the commit reply that the consumer.clearDispatchList is waiting for.
- defaultEntry.setOptimizedDispatch(true);
- policyMap.setDefaultEntry(defaultEntry);
- broker.setDestinationPolicy(policyMap);
-
- url = broker.getTransportConnectors().get(0).getConnectUri().toString();
-
- return broker;
+ public void startServer() throws Exception {
+ server = createBroker();
+ server.start();
}
@Test
+ @BMRules(
+ rules = {@BMRule(
+ name = "set no return response",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
+ targetMethod = "processCommitTransactionOnePhase",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse($0)"), @BMRule(
+ name = "stop broker before commit",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
+ targetMethod = "processCommitTransactionOnePhase",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction()"),})
public void testFailoverConsumerDups() throws Exception {
doTestFailoverConsumerDups(true);
}
@@ -103,30 +94,9 @@ public class FailoverConsumerOutstandingCommitTest {
@SuppressWarnings("unchecked")
public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
- broker = createBroker(true);
-
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
- @Override
- public void commitTransaction(ConnectionContext context,
- TransactionId xid,
- boolean onePhase) throws Exception {
- // so commit will hang as if reply is lost
- context.setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping broker before commit...");
- try {
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }});
- broker.start();
+ server = createBroker();
+ server.start();
+ brokerStopLatch = new CountDownLatch(1);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setWatchTopicAdvisories(watchTopicAdvisories);
@@ -144,9 +114,9 @@ public class FailoverConsumerOutstandingCommitTest {
final CountDownLatch messagesReceived = new CountDownLatch(2);
final MessageConsumer testConsumer = consumerSession.createConsumer(destination);
+ doByteman.set(true);
testConsumer.setMessageListener(new MessageListener() {
- @Override
public void onMessage(Message message) {
LOG.info("consume one and commit");
@@ -165,8 +135,7 @@ public class FailoverConsumerOutstandingCommitTest {
});
// may block if broker shutodwn happens quickly
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
+ new Thread() {
public void run() {
LOG.info("producer started");
try {
@@ -180,12 +149,14 @@ public class FailoverConsumerOutstandingCommitTest {
}
LOG.info("producer done");
}
- });
+ }.start();
- // will be stopped by the plugin
- broker.waitUntilStopped();
- broker = createBroker(false, url);
- broker.start();
+ // will be stopped by the plugin
+ brokerStopLatch.await();
+ server.stop();
+ server = createBroker();
+ doByteman.set(false);
+ server.start();
assertTrue("consumer added through failover", commitDoneLatch.await(20, TimeUnit.SECONDS));
assertTrue("another message was received after failover", messagesReceived.await(20, TimeUnit.SECONDS));
@@ -194,11 +165,39 @@ public class FailoverConsumerOutstandingCommitTest {
}
@Test
+ @BMRules(
+ rules = {
+ @BMRule(
+ name = "set no return response",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
+ targetMethod = "processCommitTransactionOnePhase",
+ targetLocation = "ENTRY",
+ binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+ action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse($0)"),
+
+ @BMRule(
+ name = "stop broker before commit",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
+ targetMethod = "processCommitTransactionOnePhase",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction();return null")})
public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception {
doTestFailoverConsumerOutstandingSendTx(false);
}
@Test
+ @BMRules(
+ rules = {@BMRule(
+ name = "set no return response",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
+ targetMethod = "processCommitTransactionOnePhase",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse($0)"), @BMRule(
+ name = "stop broker after commit",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
+ targetMethod = "processCommitTransactionOnePhase",
+ targetLocation = "AT EXIT",
+ action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction()")})
public void TestFailoverConsumerOutstandingSendTxComplete() throws Exception {
doTestFailoverConsumerOutstandingSendTx(true);
}
@@ -206,36 +205,9 @@ public class FailoverConsumerOutstandingCommitTest {
@SuppressWarnings("unchecked")
public void doTestFailoverConsumerOutstandingSendTx(final boolean doActualBrokerCommit) throws Exception {
final boolean watchTopicAdvisories = true;
- broker = createBroker(true);
-
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
- @Override
- public void commitTransaction(ConnectionContext context,
- TransactionId xid,
- boolean onePhase) throws Exception {
- // from the consumer perspective whether the commit completed on the broker or
- // not is irrelevant, the transaction is still in doubt in the absence of a reply
- if (doActualBrokerCommit) {
- LOG.info("doing actual broker commit...");
- super.commitTransaction(context, xid, onePhase);
- }
- // so commit will hang as if reply is lost
- context.setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping broker before commit...");
- try {
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }});
- broker.start();
+ server = createBroker();
+ server.start();
+ brokerStopLatch = new CountDownLatch(1);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setWatchTopicAdvisories(watchTopicAdvisories);
@@ -254,17 +226,19 @@ public class FailoverConsumerOutstandingCommitTest {
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
final CountDownLatch messagesReceived = new CountDownLatch(3);
final AtomicBoolean gotCommitException = new AtomicBoolean(false);
- final ArrayList<TextMessage> receivedMessages = new ArrayList<>();
+ final ArrayList<TextMessage> receivedMessages = new ArrayList<TextMessage>();
final MessageConsumer testConsumer = consumerSession.createConsumer(destination);
+ doByteman.set(true);
testConsumer.setMessageListener(new MessageListener() {
- @Override
public void onMessage(Message message) {
- LOG.info("consume one and commit: " + message);
+ LOG.info("consume one: " + message);
assertNotNull("got message", message);
receivedMessages.add((TextMessage) message);
try {
+ LOG.info("send one");
produceMessage(consumerSession, signalDestination, 1);
+ LOG.info("commit session");
consumerSession.commit();
}
catch (JMSException e) {
@@ -278,8 +252,7 @@ public class FailoverConsumerOutstandingCommitTest {
});
// may block if broker shutdown happens quickly
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
+ new Thread() {
public void run() {
LOG.info("producer started");
try {
@@ -293,12 +266,14 @@ public class FailoverConsumerOutstandingCommitTest {
}
LOG.info("producer done");
}
- });
+ }.start();
// will be stopped by the plugin
- broker.waitUntilStopped();
- broker = createBroker(false, url);
- broker.start();
+ brokerStopLatch.await();
+ doByteman.set(false);
+ server.stop();
+ server = createBroker();
+ server.start();
assertTrue("commit done through failover", commitDoneLatch.await(20, TimeUnit.SECONDS));
assertTrue("commit failed", gotCommitException.get());
@@ -313,12 +288,13 @@ public class FailoverConsumerOutstandingCommitTest {
assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(receivedIndex++).getText());
connection.close();
+ server.stop();
}
@Test
public void testRollbackFailoverConsumerTx() throws Exception {
- broker = createBroker(true);
- broker.start();
+ server = createBroker();
+ server.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setConsumerFailoverRedeliveryWaitPeriod(10000);
@@ -340,10 +316,9 @@ public class FailoverConsumerOutstandingCommitTest {
assertNotNull(msg);
// restart with outstanding delivered message
- broker.stop();
- broker.waitUntilStopped();
- broker = createBroker(false, url);
- broker.start();
+ server.stop();
+ server = createBroker();
+ server.start();
consumerSession.rollback();
@@ -379,4 +354,29 @@ public class FailoverConsumerOutstandingCommitTest {
}
producer.close();
}
+
+ public static void holdResponse(OpenWireConnection.CommandProcessor context) {
+ if (doByteman.get()) {
+ context.getContext().setDontSendReponse(true);
+ }
+ }
+
+ public static void stopServerInTransaction() {
+ if (doByteman.get()) {
+ new Thread() {
+ public void run() {
+ LOG.info("Stopping broker in transaction...");
+ try {
+ server.stop();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ finally {
+ brokerStopLatch.countDown();
+ }
+ }
+ }.start();
+ }
+ }
}