You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:14 UTC
[05/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the
openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
index e792228..0a127dd 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
@@ -18,6 +18,8 @@ package org.apache.activemq.transport.failover;
import java.io.File;
import java.io.FileOutputStream;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
@@ -30,38 +32,46 @@ import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.network.NetworkConnector;
import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
-public class FailoverUpdateURIsTest extends TestCase {
+public class FailoverUpdateURIsTest extends OpenwireArtemisBaseTest {
private static final String QUEUE_NAME = "test.failoverupdateuris";
private static final Logger LOG = Logger.getLogger(FailoverUpdateURIsTest.class);
- String firstTcpUri = "tcp://localhost:61616";
- String secondTcpUri = "tcp://localhost:61626";
+ String firstTcpUri = newURI(0);
+ String secondTcpUri = newURI(10);
Connection connection = null;
- BrokerService bs1 = null;
- BrokerService bs2 = null;
+ EmbeddedJMS server0 = null;
+ EmbeddedJMS server1 = null;
- @Override
+ @After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
- if (bs1 != null) {
- bs1.stop();
+ if (server0 != null) {
+ server0.stop();
}
- if (bs2 != null) {
- bs2.stop();
+ if (server1 != null) {
+ server1.stop();
}
}
+ @Test
public void testUpdateURIsViaFile() throws Exception {
- String targetDir = "target/" + getName();
+ String targetDir = "target/testUpdateURIsViaFile";
new File(targetDir).mkdir();
File updateFile = new File(targetDir + "/updateURIsFile.txt");
LOG.info(updateFile);
@@ -72,8 +82,9 @@ public class FailoverUpdateURIsTest extends TestCase {
out.write(firstTcpUri.getBytes());
out.close();
- bs1 = createBroker("bs1", firstTcpUri);
- bs1.start();
+ Configuration config0 = createConfig(0);
+ server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ server0.start();
// no failover uri's to start with, must be read from file...
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:()?updateURIsURL=file:///" + updateFile.getAbsoluteFile());
@@ -86,14 +97,14 @@ public class FailoverUpdateURIsTest extends TestCase {
Message message = session.createTextMessage("Test message");
producer.send(message);
Message msg = consumer.receive(2000);
- assertNotNull(msg);
+ Assert.assertNotNull(msg);
- bs1.stop();
- bs1.waitUntilStopped();
- bs1 = null;
+ server0.stop();
+ server0 = null;
- bs2 = createBroker("bs2", secondTcpUri);
- bs2.start();
+ Configuration config1 = createConfig(10);
+ server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ server1.start();
// add the transport uri for broker number 2
out = new FileOutputStream(updateFile, true);
@@ -103,25 +114,18 @@ public class FailoverUpdateURIsTest extends TestCase {
producer.send(message);
msg = consumer.receive(2000);
- assertNotNull(msg);
- }
-
- private BrokerService createBroker(String name, String tcpUri) throws Exception {
- BrokerService bs = new BrokerService();
- bs.setBrokerName(name);
- bs.setUseJmx(false);
- bs.setPersistent(false);
- bs.addConnector(tcpUri);
- return bs;
+ Assert.assertNotNull(msg);
}
+ @Test
public void testAutoUpdateURIs() throws Exception {
-
- bs1 = new BrokerService();
- bs1.setUseJmx(false);
- TransportConnector transportConnector = bs1.addConnector(firstTcpUri);
- transportConnector.setUpdateClusterClients(true);
- bs1.start();
+ Map<String, String> params = new HashMap<String, String>();
+ params.put("updateClusterClients", "true");
+ Configuration config0 = createConfig("localhost", 0, params);
+ deployClusterConfiguration(config0, 10);
+ server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ server0.start();
+ Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 1));
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + firstTcpUri + ")");
connection = cf.createConnection();
@@ -133,24 +137,23 @@ public class FailoverUpdateURIsTest extends TestCase {
Message message = session.createTextMessage("Test message");
producer.send(message);
Message msg = consumer.receive(4000);
- assertNotNull(msg);
+ Assert.assertNotNull(msg);
- bs2 = createBroker("bs2", secondTcpUri);
- NetworkConnector networkConnector = bs2.addNetworkConnector("static:(" + firstTcpUri + ")");
- networkConnector.setDuplex(true);
- bs2.start();
- LOG.info("started brokerService 2");
- bs2.waitUntilStarted();
+ Configuration config1 = createConfig(10);
+ deployClusterConfiguration(config1, 0);
+ server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ server1.start();
+ Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
TimeUnit.SECONDS.sleep(4);
LOG.info("stopping brokerService 1");
- bs1.stop();
- bs1.waitUntilStopped();
- bs1 = null;
+ server0.stop();
+ server0 = null;
producer.send(message);
msg = consumer.receive(4000);
- assertNotNull(msg);
+ Assert.assertNotNull(msg);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
index ae637ef..a028832 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
@@ -43,4 +43,5 @@ public class FailoverUriTest extends TransportUriTest {
public static Test suite() {
return suite(FailoverUriTest.class);
}
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
index 34e7333..dad241c 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.failover;
import java.io.IOException;
import java.util.Date;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Message;
@@ -26,9 +27,13 @@ import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.transport.TransportListener;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -36,19 +41,20 @@ import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
-public class InitalReconnectDelayTest {
+public class InitalReconnectDelayTest extends OpenwireArtemisBaseTest {
private static final transient Logger LOG = LoggerFactory.getLogger(InitalReconnectDelayTest.class);
- protected BrokerService broker1;
- protected BrokerService broker2;
+ protected EmbeddedJMS server1;
+ protected EmbeddedJMS server2;
+
+// protected BrokerService broker1;
+// protected BrokerService broker2;
@Test
public void testInitialReconnectDelay() throws Exception {
- String uriString = "failover://(tcp://localhost:" +
- broker1.getTransportConnectors().get(0).getConnectUri().getPort() +
- ",tcp://localhost:" +
- broker2.getTransportConnectors().get(0).getConnectUri().getPort() +
+ String uriString = "failover://(" + newURI(1) +
+ "," + newURI(2) +
")?randomize=false&initialReconnectDelay=15000";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString);
@@ -67,7 +73,7 @@ public class InitalReconnectDelayTest {
//Halt the broker1...
LOG.info("Stopping the Broker1...");
start = (new Date()).getTime();
- broker1.stop();
+ server1.stop();
LOG.info("Attempting to send... failover should kick in...");
producer.send(session.createTextMessage("TEST"));
@@ -81,10 +87,8 @@ public class InitalReconnectDelayTest {
@Test
public void testNoSuspendedCallbackOnNoReconnect() throws Exception {
- String uriString = "failover://(tcp://localhost:" +
- broker1.getTransportConnectors().get(0).getConnectUri().getPort() +
- ",tcp://localhost:" +
- broker2.getTransportConnectors().get(0).getConnectUri().getPort() +
+ String uriString = "failover://(" + newURI(1) +
+ "," + newURI(2) +
")?randomize=false&maxReconnectAttempts=0";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString);
@@ -124,7 +128,7 @@ public class InitalReconnectDelayTest {
calls.set(0);
LOG.info("Stopping the Broker1...");
- broker1.stop();
+ server1.stop();
LOG.info("Attempting to send... failover should throw on disconnect");
try {
@@ -140,25 +144,19 @@ public class InitalReconnectDelayTest {
@Before
public void setUp() throws Exception {
- final String dataDir = "target/data/shared";
+ Configuration config1 = createConfig(1);
+ Configuration config2 = createConfig(2);
- broker1 = new BrokerService();
+ deployClusterConfiguration(config1, 2);
+ deployClusterConfiguration(config2, 1);
- broker1.setBrokerName("broker1");
- broker1.setDeleteAllMessagesOnStartup(true);
- broker1.setDataDirectory(dataDir);
- broker1.addConnector("tcp://localhost:0");
- broker1.setUseJmx(false);
- broker1.start();
- broker1.waitUntilStarted();
+ server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ server2 = new EmbeddedJMS().setConfiguration(config2).setJmsConfiguration(new JMSConfigurationImpl());
- broker2 = new BrokerService();
- broker2.setBrokerName("broker2");
- broker2.setDataDirectory(dataDir);
- broker2.setUseJmx(false);
- broker2.addConnector("tcp://localhost:0");
- broker2.start();
- broker2.waitUntilStarted();
+ server1.start();
+ server2.start();
+ Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(server2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
}
@@ -172,16 +170,8 @@ public class InitalReconnectDelayTest {
@After
public void tearDown() throws Exception {
-
- if (broker1.isStarted()) {
- broker1.stop();
- broker1.waitUntilStopped();
- }
-
- if (broker2.isStarted()) {
- broker2.stop();
- broker2.waitUntilStopped();
- }
+ server1.stop();
+ server2.stop();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
index 4ba5516..83d43af 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
@@ -28,29 +28,33 @@ import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
-
-import junit.framework.TestCase;
+import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.mock.MockTransport;
-import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ReconnectTest extends TestCase {
+public class ReconnectTest extends OpenwireArtemisBaseTest {
public static final int MESSAGES_PER_ITTERATION = 10;
public static final int WORKER_COUNT = 10;
private static final Logger LOG = LoggerFactory.getLogger(ReconnectTest.class);
- private BrokerService bs;
+ private EmbeddedJMS bs;
private URI tcpUri;
private final AtomicInteger resumedCount = new AtomicInteger();
private final AtomicInteger interruptedCount = new AtomicInteger();
@@ -102,7 +106,7 @@ public class ReconnectTest extends TestCase {
}
public void start() {
- new Thread(this).start();
+ new Thread(this, name).start();
}
public void stop() {
@@ -129,13 +133,19 @@ public class ReconnectTest extends TestCase {
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
while (!stop.get()) {
+
for (int i = 0; i < MESSAGES_PER_ITTERATION; i++) {
- producer.send(session.createTextMessage("TEST:" + i));
+ TextMessage text = session.createTextMessage(name + " TEST:" + i);
+ text.setStringProperty("myprop", name + " TEST:" + i);
+ producer.send(text);
}
+
for (int i = 0; i < MESSAGES_PER_ITTERATION; i++) {
- consumer.receive();
+ TextMessage m = (TextMessage) consumer.receive();
}
+
iterations.incrementAndGet();
}
session.close();
@@ -159,11 +169,12 @@ public class ReconnectTest extends TestCase {
public synchronized void assertNoErrors() {
if (error != null) {
error.printStackTrace();
- fail("Worker " + name + " got Exception: " + error);
+ Assert.fail("Worker " + name + " got Exception: " + error);
}
}
}
+ @Test
public void testReconnects() throws Exception {
for (int k = 1; k < 10; k++) {
@@ -181,7 +192,7 @@ public class ReconnectTest extends TestCase {
LOG.info("Test run " + k + ": Waiting for worker " + i + " to finish an iteration.");
Thread.sleep(1000);
}
- assertTrue("Test run " + k + ": Worker " + i + " never completed an interation.", c != 0);
+ Assert.assertTrue("Test run " + k + ": Worker " + i + " never completed an interation.", c != 0);
workers[i].assertNoErrors();
}
@@ -192,7 +203,7 @@ public class ReconnectTest extends TestCase {
workers[i].failConnection();
}
- assertTrue("Timed out waiting for all connections to be interrupted.", Wait.waitFor(new Wait.Condition() {
+ Assert.assertTrue("Timed out waiting for all connections to be interrupted.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.debug("Test run waiting for connections to get interrupted.. at: " + interruptedCount.get());
@@ -201,7 +212,7 @@ public class ReconnectTest extends TestCase {
}, TimeUnit.SECONDS.toMillis(60)));
// Wait for the connections to re-establish...
- assertTrue("Timed out waiting for all connections to be resumed.", Wait.waitFor(new Wait.Condition() {
+ Assert.assertTrue("Timed out waiting for all connections to be resumed.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.debug("Test run waiting for connections to get resumed.. at: " + resumedCount.get());
@@ -220,26 +231,25 @@ public class ReconnectTest extends TestCase {
}
}
- @Override
- protected void setUp() throws Exception {
- bs = new BrokerService();
- bs.setPersistent(false);
- bs.setUseJmx(true);
- TransportConnector connector = bs.addConnector("tcp://localhost:0");
+ @Before
+ public void setUp() throws Exception {
+ Configuration config = createConfig(0);
+ bs = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
bs.start();
- tcpUri = connector.getConnectUri();
+ tcpUri = new URI(newURI(0));
+
workers = new Worker[WORKER_COUNT];
for (int i = 0; i < WORKER_COUNT; i++) {
- workers[i] = new Worker("" + i);
+ workers[i] = new Worker("worker-" + i);
workers[i].start();
}
}
- @Override
- protected void tearDown() throws Exception {
+ @After
+ public void tearDown() throws Exception {
for (int i = 0; i < WORKER_COUNT; i++) {
workers[i].stop();
}
- new ServiceStopper().stop(bs);
+ bs.stop();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
index 3a55473..ed6040d 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
@@ -24,15 +24,16 @@ import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.net.ServerSocketFactory;
-import junit.framework.TestCase;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
-public class SlowConnectionTest extends TestCase {
+public class SlowConnectionTest {
private CountDownLatch socketReadyLatch = new CountDownLatch(1);
+ @Test
public void testSlowConnection() throws Exception {
MockBroker broker = new MockBroker();
@@ -57,7 +58,7 @@ public class SlowConnectionTest extends TestCase {
}).start();
int count = 0;
- assertTrue("Transport count: " + count + ", expected <= 1", Wait.waitFor(new Wait.Condition() {
+ Assert.assertTrue("Transport count: " + count + ", expected <= 1", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
int count = 0;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
index 5016e30..5759547 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
@@ -16,22 +16,40 @@
*/
package org.apache.activemq.transport.failover;
-public class TwoBrokerFailoverClusterTest extends FailoverClusterTestSupport {
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
- private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
- private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
- private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://127.0.0.1:61626";
- private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://127.0.0.1:61627";
- private static final String BROKER_A_NAME = "BROKERA";
- private static final String BROKER_B_NAME = "BROKERB";
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+public class TwoBrokerFailoverClusterTest extends OpenwireArtemisBaseTest {
+
+ private static final int NUMBER_OF_CLIENTS = 30;
+ private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
+ private EmbeddedJMS server0;
+ private EmbeddedJMS server1;
+ private String clientUrl;
+
+ @Test
public void testTwoBrokersRestart() throws Exception {
- createBrokerA(false, "", null, null);
- createBrokerB(false, "", null, null);
- getBroker(BROKER_B_NAME).waitUntilStarted();
Thread.sleep(2000);
- setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
createClients();
Thread.sleep(5000);
@@ -39,59 +57,113 @@ public class TwoBrokerFailoverClusterTest extends FailoverClusterTestSupport {
assertClientsConnectedToTwoBrokers();
assertClientsConnectionsEvenlyDistributed(.35);
- getBroker(BROKER_A_NAME).stop();
- getBroker(BROKER_A_NAME).waitUntilStopped();
- removeBroker(BROKER_A_NAME);
+ server0.stop();
+ Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 1));
Thread.sleep(1000);
- assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS);
+ assertAllConnectedTo(newURI("127.0.0.1", 1));
Thread.sleep(5000);
- createBrokerA(false, "", null, null);
- getBroker(BROKER_A_NAME).waitUntilStarted();
+ server0.start();
+ Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
Thread.sleep(5000);
+ //need update-cluster-clients, -on-remove and rebalance set to true.
assertClientsConnectedToTwoBrokers();
assertClientsConnectionsEvenlyDistributed(.35);
}
- private void createBrokerA(boolean multi,
- String params,
- String clusterFilter,
- String destinationFilter) throws Exception {
- final String tcParams = (params == null) ? "" : params;
- if (getBroker(BROKER_A_NAME) == null) {
- addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
- addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + tcParams, true);
- if (multi) {
- addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS + tcParams, false);
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
- }
- else {
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
- }
- getBroker(BROKER_A_NAME).start();
+
+ @Before
+ public void setUp() throws Exception {
+ HashMap<String, String> map = new HashMap<>();
+ map.put("rebalanceClusterClients", "true");
+ map.put("updateClusterClients", "true");
+ map.put("updateClusterClientsOnRemove", "true");
+ Configuration config0 = createConfig("127.0.0.1", 0, map);
+ Configuration config1 = createConfig("127.0.0.1", 1, map);
+
+ deployClusterConfiguration(config0, 1);
+ deployClusterConfiguration(config1, 0);
+
+ server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+
+ clientUrl = "failover://(" + newURI("127.0.0.1", 0) + "," + newURI("127.0.0.1", 1) + ")";
+
+ server0.start();
+ server1.start();
+ Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ for (ActiveMQConnection conn : connections) {
+ conn.close();
}
+ server0.stop();
+ server1.stop();
}
- private void createBrokerB(boolean multi,
- String params,
- String clusterFilter,
- String destinationFilter) throws Exception {
- final String tcParams = (params == null) ? "" : params;
- if (getBroker(BROKER_B_NAME) == null) {
- addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
- addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + tcParams, true);
- if (multi) {
- addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS + tcParams, false);
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+ protected void createClients() throws Exception {
+ createClients(NUMBER_OF_CLIENTS);
+ }
+
+ protected void createClients(int numOfClients) throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
+ for (int i = 0; i < numOfClients; i++) {
+ ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
+ c.start();
+ Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = s.createQueue(getClass().getName());
+ MessageConsumer consumer = s.createConsumer(queue);
+ connections.add(c);
+ }
+ }
+
+ protected void assertClientsConnectedToTwoBrokers() {
+ Set<String> set = new HashSet<String>();
+ for (ActiveMQConnection c : connections) {
+ if (c.getTransportChannel().getRemoteAddress() != null) {
+ set.add(c.getTransportChannel().getRemoteAddress());
}
- else {
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+ }
+ Assert.assertTrue("Only 2 connections should be found: " + set, set.size() == 2);
+ }
+
+ protected void assertClientsConnectionsEvenlyDistributed(double minimumPercentage) {
+ Map<String, Double> clientConnectionCounts = new HashMap<String, Double>();
+ int total = 0;
+ for (ActiveMQConnection c : connections) {
+ String key = c.getTransportChannel().getRemoteAddress();
+ if (key != null) {
+ total++;
+ if (clientConnectionCounts.containsKey(key)) {
+ double count = clientConnectionCounts.get(key);
+ count += 1.0;
+ clientConnectionCounts.put(key, count);
+ }
+ else {
+ clientConnectionCounts.put(key, 1.0);
+ }
}
- getBroker(BROKER_B_NAME).start();
+ }
+ Set<String> keys = clientConnectionCounts.keySet();
+ for (String key : keys) {
+ double count = clientConnectionCounts.get(key);
+ double percentage = count / total;
+ System.out.println(count + " of " + total + " connections for " + key + " = " + percentage);
+ Assert.assertTrue("Connections distribution expected to be >= than " + minimumPercentage + ". Actuall distribution was " + percentage + " for connection " + key, percentage >= minimumPercentage);
+ }
+ }
+
+ protected void assertAllConnectedTo(String url) throws Exception {
+ for (ActiveMQConnection c : connections) {
+ Assert.assertEquals(url, c.getTransportChannel().getRemoteAddress());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
index dc369be..a372b79 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
@@ -26,45 +26,42 @@ import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.util.MessageIdList;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
-public class FanoutTest extends TestCase {
+public class FanoutTest extends OpenwireArtemisBaseTest {
- BrokerService broker1;
- BrokerService broker2;
+ EmbeddedJMS[] servers = new EmbeddedJMS[2];
ActiveMQConnectionFactory producerFactory = new ActiveMQConnectionFactory("fanout:(static:(tcp://localhost:61616,tcp://localhost:61617))?fanOutQueues=true");
Connection producerConnection;
Session producerSession;
int messageCount = 100;
- @Override
+ @Before
public void setUp() throws Exception {
- broker1 = BrokerFactory.createBroker("broker:(tcp://localhost:61616)/brokerA?persistent=false&useJmx=false");
- broker2 = BrokerFactory.createBroker("broker:(tcp://localhost:61617)/brokerB?persistent=false&useJmx=false");
-
- broker1.start();
- broker2.start();
-
- broker1.waitUntilStarted();
- broker2.waitUntilStarted();
+ setUpNonClusterServers(servers);
producerConnection = producerFactory.createConnection();
producerConnection.start();
producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
- @Override
+ @After
public void tearDown() throws Exception {
producerSession.close();
producerConnection.close();
- broker1.stop();
- broker2.stop();
+ shutDownNonClusterServers(servers);
}
+ @Test
public void testSendReceive() throws Exception {
MessageProducer prod = createProducer();
@@ -76,7 +73,6 @@ public class FanoutTest extends TestCase {
assertMessagesReceived("tcp://localhost:61616");
assertMessagesReceived("tcp://localhost:61617");
-
}
protected MessageProducer createProducer() throws Exception {
@@ -95,7 +91,7 @@ public class FanoutTest extends TestCase {
listener.assertMessagesReceived(messageCount);
consumer.close();
- consumerConnection.close();
consumerSession.close();
+ consumerConnection.close();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
index 7e52f13..2cfc136 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
@@ -18,51 +18,111 @@ package org.apache.activemq.transport.fanout;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.DeliveryMode;
+import javax.jms.MessageNotWriteableException;
-import junit.framework.Test;
-
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.network.NetworkTestSupport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.mock.MockTransport;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FanoutTransportBrokerTest extends NetworkTestSupport {
+@RunWith(Parameterized.class)
+public class FanoutTransportBrokerTest extends OpenwireArtemisBaseTest {
+ public static final boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true");
+
+ protected ArrayList<StubConnection> connections = new ArrayList<StubConnection>();
+ protected long idGenerator;
+ protected int msgIdGenerator;
+ protected int maxWait = 10000;
private static final Logger LOG = LoggerFactory.getLogger(FanoutTransportBrokerTest.class);
- public ActiveMQDestination destination;
- public int deliveryMode;
+ private EmbeddedJMS server;
+ private EmbeddedJMS remoteServer;
+
+ private ActiveMQDestination destination;
+ private int deliveryMode;
- public static Test suite() {
- return suite(FanoutTransportBrokerTest.class);
+ @Parameterized.Parameters(name="test-{index}")
+ public static Collection<Object[]> getParams()
+ {
+ return Arrays.asList(new Object[][]{
+ {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQQueue("TEST")},
+ {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQTopic("TEST")},
+ {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQQueue("TEST")},
+ {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQTopic("TEST")}
+ });
}
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
+ public FanoutTransportBrokerTest(int deliveryMode, ActiveMQDestination destination) {
+ this.deliveryMode = deliveryMode;
+ this.destination = destination;
}
- public void initCombosForTestPublisherFansout() {
- addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destination", new Object[]{new ActiveMQTopic("TEST"), new ActiveMQQueue("TEST")});
+ @Before
+ public void setUp() throws Exception {
+ Configuration config0 = createConfig(0);
+ server = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ Configuration config1 = createConfig(1);
+ remoteServer = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ server.start();
+ remoteServer.start();
+
+ }
+ @After
+ public void tearDown() throws Exception {
+ for (StubConnection conn : connections) {
+ try {
+ conn.stop();
+ }
+ catch (Exception e) {
+ }
+ }
+ try {
+ remoteServer.stop();
+ }
+ catch (Exception e) {
+ }
+ try {
+ server.stop();
+ }
+ catch (Exception e) {
+ }
}
+ @Test
public void testPublisherFansout() throws Exception {
-
// Start a normal consumer on the local broker
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
@@ -94,21 +154,28 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
// Send the message using the fail over publisher.
connection3.request(createMessage(producerInfo3, destination, deliveryMode));
- assertNotNull(receiveMessage(connection1));
+ Assert.assertNotNull(receiveMessage(connection1));
assertNoMessagesLeft(connection1);
- assertNotNull(receiveMessage(connection2));
+ Assert.assertNotNull(receiveMessage(connection2));
assertNoMessagesLeft(connection2);
}
+ /*
public void initCombosForTestPublisherWaitsForServerToBeUp() {
addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destination", new Object[]{new ActiveMQTopic("TEST")});
}
+*/
+ @Test
public void testPublisherWaitsForServerToBeUp() throws Exception {
+ if (name.getMethodName().contains("test-0") || name.getMethodName().contains("test-2")) {
+ System.out.println("Discarding invalid test: " + name.getMethodName());
+ return;
+ }
// Start a normal consumer on the local broker
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
@@ -140,19 +207,18 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
// Send the message using the fail over publisher.
connection3.request(createMessage(producerInfo3, destination, deliveryMode));
- assertNotNull(receiveMessage(connection1));
+ Assert.assertNotNull(receiveMessage(connection1));
assertNoMessagesLeft(connection1);
- assertNotNull(receiveMessage(connection2));
+ Assert.assertNotNull(receiveMessage(connection2));
assertNoMessagesLeft(connection2);
final CountDownLatch publishDone = new CountDownLatch(1);
// The MockTransport is on the remote connection.
// Slip in a new transport filter after the MockTransport
- MockTransport mt = connection3.getTransport().narrow(MockTransport.class);
+ MockTransport mt = (MockTransport) connection3.getTransport().narrow(MockTransport.class);
mt.install(new TransportFilter(mt.getNext()) {
- @Override
public void oneway(Object command) throws IOException {
LOG.info("Dropping: " + command);
// just eat it! to simulate a recent failure.
@@ -161,7 +227,6 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
// Send a message (async) as this will block
new Thread() {
- @Override
public void run() {
// Send the message using the fail over publisher.
try {
@@ -175,7 +240,7 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
}.start();
// Assert that we block:
- assertFalse(publishDone.await(3, TimeUnit.SECONDS));
+ Assert.assertFalse(publishDone.await(3, TimeUnit.SECONDS));
// Restart the remote server. State should be re-played and the publish
// should continue.
@@ -184,26 +249,127 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
LOG.info("Broker Restarted");
// This should reconnect, and resend
- assertTrue(publishDone.await(20, TimeUnit.SECONDS));
+ Assert.assertTrue(publishDone.await(20, TimeUnit.SECONDS));
}
- @Override
protected String getLocalURI() {
return "tcp://localhost:61616";
}
- @Override
protected String getRemoteURI() {
return "tcp://localhost:61617";
}
protected StubConnection createFanoutConnection() throws Exception {
- URI fanoutURI = new URI("fanout://(static://(" + connector.getServer().getConnectURI() + "," + "mock://" + remoteConnector.getServer().getConnectURI() + "))?fanOutQueues=true");
+ URI fanoutURI = new URI("fanout://(static://(" + newURI(0) + "," + "mock://" + newURI(1) + "))?fanOutQueues=true");
Transport transport = TransportFactory.connect(fanoutURI);
StubConnection connection = new StubConnection(transport);
connections.add(connection);
return connection;
}
+
+ protected StubConnection createConnection() throws Exception {
+ Transport transport = TransportFactory.connect(new URI(newURI(0)));
+ StubConnection connection = new StubConnection(transport);
+ connections.add(connection);
+ return connection;
+ }
+
+ protected StubConnection createRemoteConnection() throws Exception {
+ Transport transport = TransportFactory.connect(new URI(newURI(1)));
+ StubConnection connection = new StubConnection(transport);
+ connections.add(connection);
+ return connection;
+ }
+
+ protected ConnectionInfo createConnectionInfo() throws Exception {
+ ConnectionInfo info = new ConnectionInfo();
+ info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+ info.setClientId(info.getConnectionId().getValue());
+ return info;
+ }
+
+ protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
+ SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+ return info;
+ }
+
+ protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo,
+ ActiveMQDestination destination) throws Exception {
+ ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+ info.setBrowser(false);
+ info.setDestination(destination);
+ info.setPrefetchSize(1000);
+ info.setDispatchAsync(false);
+ return info;
+ }
+
+ protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
+ ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+ return info;
+ }
+
+ protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
+ return message;
+ }
+
+ protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+ message.setDestination(destination);
+ message.setPersistent(false);
+ try {
+ message.setText("Test Message Payload.");
+ }
+ catch (MessageNotWriteableException e) {
+ }
+ return message;
+ }
+
+ public Message receiveMessage(StubConnection connection) throws InterruptedException {
+ return receiveMessage(connection, maxWait);
+ }
+
+ public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException {
+ while (true) {
+ Object o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS);
+
+ if (o == null) {
+ return null;
+ }
+ if (o instanceof MessageDispatch) {
+
+ MessageDispatch dispatch = (MessageDispatch) o;
+ if (dispatch.getMessage() == null) {
+ return null;
+ }
+ dispatch.setMessage(dispatch.getMessage().copy());
+ dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
+ return dispatch.getMessage();
+ }
+ }
+ }
+
+ protected void assertNoMessagesLeft(StubConnection connection) throws InterruptedException {
+ long wait = FAST_NO_MESSAGE_LEFT_ASSERT ? 0 : maxWait;
+ while (true) {
+ Object o = connection.getDispatchQueue().poll(wait, TimeUnit.MILLISECONDS);
+ if (o == null) {
+ return;
+ }
+ if (o instanceof MessageDispatch && ((MessageDispatch) o).getMessage() != null) {
+ Assert.fail("Received a message: " + ((MessageDispatch) o).getMessage().getMessageId());
+ }
+ }
+ }
+ protected void restartRemoteBroker() throws Exception {
+ remoteServer.stop();
+ Thread.sleep(2000);
+ remoteServer.start();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
index 619190f..01f6963 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
@@ -173,11 +173,8 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
}
public void testClientHang() throws Exception {
-
- //
// Manually create a client transport so that it does not send KeepAlive
- // packets.
- // this should simulate a client hang.
+ // packets. this should simulate a client hang.
clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null);
clientTransport.setTransportListener(new TransportListener() {
@Override
@@ -205,9 +202,10 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
public void transportResumed() {
}
});
+
clientTransport.start();
WireFormatInfo info = new WireFormatInfo();
- info.setVersion(OpenWireFormat.DEFAULT_VERSION);
+ info.setVersion(OpenWireFormat.DEFAULT_LEGACY_VERSION);
info.setMaxInactivityDuration(1000);
clientTransport.oneway(info);
@@ -242,19 +240,17 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
* @throws URISyntaxException
*/
public void initCombosForTestNoClientHangWithServerBlock() throws Exception {
-
startClient();
- addCombinationValues("clientInactivityLimit", new Object[]{Long.valueOf(1000)});
- addCombinationValues("serverInactivityLimit", new Object[]{Long.valueOf(1000)});
- addCombinationValues("serverRunOnCommand", new Object[]{new Runnable() {
+ addCombinationValues("clientInactivityLimit", new Object[] {Long.valueOf(1000)});
+ addCombinationValues("serverInactivityLimit", new Object[] {Long.valueOf(1000)});
+ addCombinationValues("serverRunOnCommand", new Object[] {new Runnable() {
@Override
public void run() {
try {
LOG.info("Sleeping");
Thread.sleep(4000);
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
}
}
}});
@@ -272,5 +268,4 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
assertEquals(0, clientErrorCount.get());
assertEquals(0, serverErrorCount.get());
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java
index 3a1c9a4..d380246 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SslBrokerServiceTest.java
@@ -124,6 +124,7 @@ public class SslBrokerServiceTest extends TransportBrokerTestSupport {
private void makeSSLConnection(SSLContext context,
String enabledSuites[],
TransportConnector connector) throws Exception, UnknownHostException, SocketException {
+ System.out.println("-----connector: " + connector);
SSLSocket sslSocket = (SSLSocket) context.getSocketFactory().createSocket("localhost", connector.getUri().getPort());
if (enabledSuites != null) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportBindTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportBindTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportBindTest.java
index 1c34d83..38188ce 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportBindTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TcpTransportBindTest.java
@@ -37,10 +37,11 @@ public class TcpTransportBindTest extends EmbeddedBrokerTestSupport {
*/
@Override
protected void setUp() throws Exception {
+ disableWrapper = true;
bindAddress = addr + "?transport.reuseAddress=true&transport.soTimeout=1000";
super.setUp();
- addr = broker.getTransportConnectors().get(0).getPublishableConnectString();
+ addr = newURI("localhost", 0);
}
public void testConnect() throws Exception {
@@ -58,7 +59,7 @@ public class TcpTransportBindTest extends EmbeddedBrokerTestSupport {
@Override
public void run() {
try {
- broker.stop();
+ artemisBroker.stop();
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
index 9ae82ac..ce9aff9 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
@@ -23,7 +23,6 @@ import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -166,6 +165,7 @@ public class TransportUriTest extends EmbeddedBrokerTestSupport {
@Override
protected void setUp() throws Exception {
+ disableWrapper = true;
bindAddress = "tcp://localhost:61616";
super.setUp();
}
@@ -183,15 +183,6 @@ public class TransportUriTest extends EmbeddedBrokerTestSupport {
super.tearDown();
}
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
- answer.setUseJmx(false);
- answer.setPersistent(isPersistent());
- answer.addConnector(bindAddress);
- return answer;
- }
-
public static Test suite() {
return suite(TransportUriTest.class);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
deleted file mode 100644
index 3791848..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import java.net.URI;
-
-import javax.jms.Connection;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerRegistry;
-
-public class VMTransportBrokerNameTest extends TestCase {
-
- private static final String MY_BROKER = "myBroker";
- final String vmUrl = "vm:(broker:(tcp://localhost:61616)/" + MY_BROKER + "?persistent=false)";
-
- public void testBrokerName() throws Exception {
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(vmUrl));
- ActiveMQConnection c1 = (ActiveMQConnection) cf.createConnection();
- assertTrue("Transport has name in it: " + c1.getTransport(), c1.getTransport().toString().contains(MY_BROKER));
-
- // verify Broker is there with name
- ActiveMQConnectionFactory cfbyName = new ActiveMQConnectionFactory(new URI("vm://" + MY_BROKER + "?create=false"));
- Connection c2 = cfbyName.createConnection();
-
- assertNotNull(BrokerRegistry.getInstance().lookup(MY_BROKER));
- assertEquals(BrokerRegistry.getInstance().findFirst().getBrokerName(), MY_BROKER);
- assertEquals(BrokerRegistry.getInstance().getBrokers().size(), 1);
-
- c1.close();
- c2.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java
deleted file mode 100644
index 52e4b88..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import junit.framework.Test;
-
-import org.apache.activemq.transport.TransportBrokerTestSupport;
-
-public class VMTransportBrokerTest extends TransportBrokerTestSupport {
-
- @Override
- protected String getBindLocation() {
- return "vm://localhost";
- }
-
- public static Test suite() {
- return suite(VMTransportBrokerTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java
deleted file mode 100644
index dbc7f29..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import javax.jms.DeliveryMode;
-
-import org.apache.activemq.broker.BrokerRegistry;
-import org.apache.activemq.broker.BrokerTestSupport;
-import org.apache.activemq.broker.StubConnection;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.util.IOExceptionSupport;
-
-/**
- * Used to see if the VM transport starts an embedded broker on demand.
- */
-public class VMTransportEmbeddedBrokerTest extends BrokerTestSupport {
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(VMTransportEmbeddedBrokerTest.class);
- }
-
- public void testConsumerPrefetchAtOne() throws Exception {
-
- // Make sure the broker is created due to the connection being started.
- assertNull(BrokerRegistry.getInstance().lookup("localhost"));
- StubConnection connection = createConnection();
- assertNotNull(BrokerRegistry.getInstance().lookup("localhost"));
-
- // Start a producer and consumer
- ConnectionInfo connectionInfo = createConnectionInfo();
- SessionInfo sessionInfo = createSessionInfo(connectionInfo);
- ProducerInfo producerInfo = createProducerInfo(sessionInfo);
- connection.send(connectionInfo);
- connection.send(sessionInfo);
- connection.send(producerInfo);
-
- ActiveMQQueue destination = new ActiveMQQueue("TEST");
-
- ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
- consumerInfo.setPrefetchSize(1);
- connection.send(consumerInfo);
-
- // Send 2 messages to the broker.
- connection.send(createMessage(producerInfo, destination, DeliveryMode.NON_PERSISTENT));
- connection.send(createMessage(producerInfo, destination, DeliveryMode.NON_PERSISTENT));
-
- // Make sure only 1 message was delivered.
- Message m = receiveMessage(connection);
- assertNotNull(m);
- assertNoMessagesLeft(connection);
-
- // Make sure the broker is shutdown when the connection is stopped.
- assertNotNull(BrokerRegistry.getInstance().lookup("localhost"));
- connection.stop();
- assertNull(BrokerRegistry.getInstance().lookup("localhost"));
- }
-
- @Override
- protected void setUp() throws Exception {
- // Don't call super since it manually starts up a broker.
- }
-
- @Override
- protected void tearDown() throws Exception {
- // Don't call super since it manually tears down a broker.
- }
-
- @Override
- protected StubConnection createConnection() throws Exception {
- try {
- Transport transport = TransportFactory.connect(new URI("vm://localhost?broker.persistent=false"));
- StubConnection connection = new StubConnection(transport);
- return connection;
- }
- catch (URISyntaxException e) {
- throw IOExceptionSupport.create(e);
- }
- }
-
-}