You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/18 02:41:51 UTC
[02/65] [abbrv] activemq-artemis git commit: open wire changes
equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
index 5016e30..dc91873 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
@@ -16,22 +16,41 @@
*/
package org.apache.activemq.transport.failover;
-public class TwoBrokerFailoverClusterTest extends FailoverClusterTestSupport {
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
- private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
- private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
- private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://127.0.0.1:61626";
- private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://127.0.0.1:61627";
- private static final String BROKER_A_NAME = "BROKERA";
- private static final String BROKER_B_NAME = "BROKERB";
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+public class TwoBrokerFailoverClusterTest extends OpenwireArtemisBaseTest {
+
+ private static final int NUMBER_OF_CLIENTS = 30;
+ private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
+ private EmbeddedJMS server0;
+ private EmbeddedJMS server1;
+ private String clientUrl;
+
+ @Test
public void testTwoBrokersRestart() throws Exception {
- createBrokerA(false, "", null, null);
- createBrokerB(false, "", null, null);
- getBroker(BROKER_B_NAME).waitUntilStarted();
Thread.sleep(2000);
- setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
createClients();
Thread.sleep(5000);
@@ -39,59 +58,106 @@ public class TwoBrokerFailoverClusterTest extends FailoverClusterTestSupport {
assertClientsConnectedToTwoBrokers();
assertClientsConnectionsEvenlyDistributed(.35);
- getBroker(BROKER_A_NAME).stop();
- getBroker(BROKER_A_NAME).waitUntilStopped();
- removeBroker(BROKER_A_NAME);
+ server0.stop();
+ Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 1));
Thread.sleep(1000);
- assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS);
+ assertAllConnectedTo(newURI("127.0.0.1", 1));
Thread.sleep(5000);
- createBrokerA(false, "", null, null);
- getBroker(BROKER_A_NAME).waitUntilStarted();
+ server0.start();
+ Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
Thread.sleep(5000);
+ //need update-cluster-clients, -on-remove and rebalance set to true.
assertClientsConnectedToTwoBrokers();
assertClientsConnectionsEvenlyDistributed(.35);
}
- private void createBrokerA(boolean multi,
- String params,
- String clusterFilter,
- String destinationFilter) throws Exception {
- final String tcParams = (params == null) ? "" : params;
- if (getBroker(BROKER_A_NAME) == null) {
- addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
- addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + tcParams, true);
- if (multi) {
- addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS + tcParams, false);
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
- }
- else {
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
- }
- getBroker(BROKER_A_NAME).start();
+
+ @Before
+ public void setUp() throws Exception {
+ Configuration config0 = createConfig("127.0.0.1", 0);
+ Configuration config1 = createConfig("127.0.0.1", 1);
+
+ deployClusterConfiguration(config0, 1);
+ deployClusterConfiguration(config1, 0);
+
+ server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+
+ clientUrl = "failover://(" + newURI("127.0.0.1", 0) + "," + newURI("127.0.0.1", 1) + ")";
+
+ server0.start();
+ server1.start();
+ Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ server0.stop();
+ server1.stop();
+ }
+
+ protected void createClients() throws Exception {
+ createClients(NUMBER_OF_CLIENTS);
+ }
+
+ protected void createClients(int numOfClients) throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
+ for (int i = 0; i < numOfClients; i++) {
+ ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
+ c.start();
+ Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = s.createQueue(getClass().getName());
+ MessageConsumer consumer = s.createConsumer(queue);
+ connections.add(c);
}
}
- private void createBrokerB(boolean multi,
- String params,
- String clusterFilter,
- String destinationFilter) throws Exception {
- final String tcParams = (params == null) ? "" : params;
- if (getBroker(BROKER_B_NAME) == null) {
- addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
- addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + tcParams, true);
- if (multi) {
- addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS + tcParams, false);
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+ protected void assertClientsConnectedToTwoBrokers() {
+ Set<String> set = new HashSet<String>();
+ for (ActiveMQConnection c : connections) {
+ if (c.getTransportChannel().getRemoteAddress() != null) {
+ set.add(c.getTransportChannel().getRemoteAddress());
}
- else {
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+ }
+ Assert.assertTrue("Only 2 connections should be found: " + set, set.size() == 2);
+ }
+
+ protected void assertClientsConnectionsEvenlyDistributed(double minimumPercentage) {
+ Map<String, Double> clientConnectionCounts = new HashMap<String, Double>();
+ int total = 0;
+ for (ActiveMQConnection c : connections) {
+ String key = c.getTransportChannel().getRemoteAddress();
+ if (key != null) {
+ total++;
+ if (clientConnectionCounts.containsKey(key)) {
+ double count = clientConnectionCounts.get(key);
+ count += 1.0;
+ clientConnectionCounts.put(key, count);
+ }
+ else {
+ clientConnectionCounts.put(key, 1.0);
+ }
}
- getBroker(BROKER_B_NAME).start();
+ }
+ Set<String> keys = clientConnectionCounts.keySet();
+ for (String key : keys) {
+ double count = clientConnectionCounts.get(key);
+ double percentage = count / total;
+ System.out.println(count + " of " + total + " connections for " + key + " = " + percentage);
+ Assert.assertTrue("Connections distribution expected to be >= than " + minimumPercentage + ". Actuall distribution was " + percentage + " for connection " + key, percentage >= minimumPercentage);
+ }
+ }
+
+ protected void assertAllConnectedTo(String url) throws Exception {
+ for (ActiveMQConnection c : connections) {
+ Assert.assertEquals(url, c.getTransportChannel().getRemoteAddress());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
index dc369be..a372b79 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTest.java
@@ -26,45 +26,42 @@ import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.util.MessageIdList;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
-public class FanoutTest extends TestCase {
+public class FanoutTest extends OpenwireArtemisBaseTest {
- BrokerService broker1;
- BrokerService broker2;
+ EmbeddedJMS[] servers = new EmbeddedJMS[2];
ActiveMQConnectionFactory producerFactory = new ActiveMQConnectionFactory("fanout:(static:(tcp://localhost:61616,tcp://localhost:61617))?fanOutQueues=true");
Connection producerConnection;
Session producerSession;
int messageCount = 100;
- @Override
+ @Before
public void setUp() throws Exception {
- broker1 = BrokerFactory.createBroker("broker:(tcp://localhost:61616)/brokerA?persistent=false&useJmx=false");
- broker2 = BrokerFactory.createBroker("broker:(tcp://localhost:61617)/brokerB?persistent=false&useJmx=false");
-
- broker1.start();
- broker2.start();
-
- broker1.waitUntilStarted();
- broker2.waitUntilStarted();
+ setUpNonClusterServers(servers);
producerConnection = producerFactory.createConnection();
producerConnection.start();
producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
- @Override
+ @After
public void tearDown() throws Exception {
producerSession.close();
producerConnection.close();
- broker1.stop();
- broker2.stop();
+ shutDownNonClusterServers(servers);
}
+ @Test
public void testSendReceive() throws Exception {
MessageProducer prod = createProducer();
@@ -76,7 +73,6 @@ public class FanoutTest extends TestCase {
assertMessagesReceived("tcp://localhost:61616");
assertMessagesReceived("tcp://localhost:61617");
-
}
protected MessageProducer createProducer() throws Exception {
@@ -95,7 +91,7 @@ public class FanoutTest extends TestCase {
listener.assertMessagesReceived(messageCount);
consumer.close();
- consumerConnection.close();
consumerSession.close();
+ consumerConnection.close();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
index 7e52f13..2cfc136 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
@@ -18,51 +18,111 @@ package org.apache.activemq.transport.fanout;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.DeliveryMode;
+import javax.jms.MessageNotWriteableException;
-import junit.framework.Test;
-
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.network.NetworkTestSupport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.mock.MockTransport;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FanoutTransportBrokerTest extends NetworkTestSupport {
+@RunWith(Parameterized.class)
+public class FanoutTransportBrokerTest extends OpenwireArtemisBaseTest {
+ public static final boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true");
+
+ protected ArrayList<StubConnection> connections = new ArrayList<StubConnection>();
+ protected long idGenerator;
+ protected int msgIdGenerator;
+ protected int maxWait = 10000;
private static final Logger LOG = LoggerFactory.getLogger(FanoutTransportBrokerTest.class);
- public ActiveMQDestination destination;
- public int deliveryMode;
+ private EmbeddedJMS server;
+ private EmbeddedJMS remoteServer;
+
+ private ActiveMQDestination destination;
+ private int deliveryMode;
- public static Test suite() {
- return suite(FanoutTransportBrokerTest.class);
+ @Parameterized.Parameters(name="test-{index}")
+ public static Collection<Object[]> getParams()
+ {
+ return Arrays.asList(new Object[][]{
+ {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQQueue("TEST")},
+ {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQTopic("TEST")},
+ {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQQueue("TEST")},
+ {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQTopic("TEST")}
+ });
}
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
+ public FanoutTransportBrokerTest(int deliveryMode, ActiveMQDestination destination) {
+ this.deliveryMode = deliveryMode;
+ this.destination = destination;
}
- public void initCombosForTestPublisherFansout() {
- addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destination", new Object[]{new ActiveMQTopic("TEST"), new ActiveMQQueue("TEST")});
+ @Before
+ public void setUp() throws Exception {
+ Configuration config0 = createConfig(0);
+ server = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ Configuration config1 = createConfig(1);
+ remoteServer = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ server.start();
+ remoteServer.start();
+
+ }
+ @After
+ public void tearDown() throws Exception {
+ for (StubConnection conn : connections) {
+ try {
+ conn.stop();
+ }
+ catch (Exception e) {
+ }
+ }
+ try {
+ remoteServer.stop();
+ }
+ catch (Exception e) {
+ }
+ try {
+ server.stop();
+ }
+ catch (Exception e) {
+ }
}
+ @Test
public void testPublisherFansout() throws Exception {
-
// Start a normal consumer on the local broker
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
@@ -94,21 +154,28 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
// Send the message using the fail over publisher.
connection3.request(createMessage(producerInfo3, destination, deliveryMode));
- assertNotNull(receiveMessage(connection1));
+ Assert.assertNotNull(receiveMessage(connection1));
assertNoMessagesLeft(connection1);
- assertNotNull(receiveMessage(connection2));
+ Assert.assertNotNull(receiveMessage(connection2));
assertNoMessagesLeft(connection2);
}
+ /*
public void initCombosForTestPublisherWaitsForServerToBeUp() {
addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destination", new Object[]{new ActiveMQTopic("TEST")});
}
+*/
+ @Test
public void testPublisherWaitsForServerToBeUp() throws Exception {
+ if (name.getMethodName().contains("test-0") || name.getMethodName().contains("test-2")) {
+ System.out.println("Discarding invalid test: " + name.getMethodName());
+ return;
+ }
// Start a normal consumer on the local broker
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
@@ -140,19 +207,18 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
// Send the message using the fail over publisher.
connection3.request(createMessage(producerInfo3, destination, deliveryMode));
- assertNotNull(receiveMessage(connection1));
+ Assert.assertNotNull(receiveMessage(connection1));
assertNoMessagesLeft(connection1);
- assertNotNull(receiveMessage(connection2));
+ Assert.assertNotNull(receiveMessage(connection2));
assertNoMessagesLeft(connection2);
final CountDownLatch publishDone = new CountDownLatch(1);
// The MockTransport is on the remote connection.
// Slip in a new transport filter after the MockTransport
- MockTransport mt = connection3.getTransport().narrow(MockTransport.class);
+ MockTransport mt = (MockTransport) connection3.getTransport().narrow(MockTransport.class);
mt.install(new TransportFilter(mt.getNext()) {
- @Override
public void oneway(Object command) throws IOException {
LOG.info("Dropping: " + command);
// just eat it! to simulate a recent failure.
@@ -161,7 +227,6 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
// Send a message (async) as this will block
new Thread() {
- @Override
public void run() {
// Send the message using the fail over publisher.
try {
@@ -175,7 +240,7 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
}.start();
// Assert that we block:
- assertFalse(publishDone.await(3, TimeUnit.SECONDS));
+ Assert.assertFalse(publishDone.await(3, TimeUnit.SECONDS));
// Restart the remote server. State should be re-played and the publish
// should continue.
@@ -184,26 +249,127 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
LOG.info("Broker Restarted");
// This should reconnect, and resend
- assertTrue(publishDone.await(20, TimeUnit.SECONDS));
+ Assert.assertTrue(publishDone.await(20, TimeUnit.SECONDS));
}
- @Override
protected String getLocalURI() {
return "tcp://localhost:61616";
}
- @Override
protected String getRemoteURI() {
return "tcp://localhost:61617";
}
protected StubConnection createFanoutConnection() throws Exception {
- URI fanoutURI = new URI("fanout://(static://(" + connector.getServer().getConnectURI() + "," + "mock://" + remoteConnector.getServer().getConnectURI() + "))?fanOutQueues=true");
+ URI fanoutURI = new URI("fanout://(static://(" + newURI(0) + "," + "mock://" + newURI(1) + "))?fanOutQueues=true");
Transport transport = TransportFactory.connect(fanoutURI);
StubConnection connection = new StubConnection(transport);
connections.add(connection);
return connection;
}
+
+ protected StubConnection createConnection() throws Exception {
+ Transport transport = TransportFactory.connect(new URI(newURI(0)));
+ StubConnection connection = new StubConnection(transport);
+ connections.add(connection);
+ return connection;
+ }
+
+ protected StubConnection createRemoteConnection() throws Exception {
+ Transport transport = TransportFactory.connect(new URI(newURI(1)));
+ StubConnection connection = new StubConnection(transport);
+ connections.add(connection);
+ return connection;
+ }
+
+ protected ConnectionInfo createConnectionInfo() throws Exception {
+ ConnectionInfo info = new ConnectionInfo();
+ info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+ info.setClientId(info.getConnectionId().getValue());
+ return info;
+ }
+
+ protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
+ SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+ return info;
+ }
+
+ protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo,
+ ActiveMQDestination destination) throws Exception {
+ ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+ info.setBrowser(false);
+ info.setDestination(destination);
+ info.setPrefetchSize(1000);
+ info.setDispatchAsync(false);
+ return info;
+ }
+
+ protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
+ ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+ return info;
+ }
+
+ protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
+ return message;
+ }
+
+ protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+ message.setDestination(destination);
+ message.setPersistent(false);
+ try {
+ message.setText("Test Message Payload.");
+ }
+ catch (MessageNotWriteableException e) {
+ }
+ return message;
+ }
+
+ public Message receiveMessage(StubConnection connection) throws InterruptedException {
+ return receiveMessage(connection, maxWait);
+ }
+
+ public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException {
+ while (true) {
+ Object o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS);
+
+ if (o == null) {
+ return null;
+ }
+ if (o instanceof MessageDispatch) {
+
+ MessageDispatch dispatch = (MessageDispatch) o;
+ if (dispatch.getMessage() == null) {
+ return null;
+ }
+ dispatch.setMessage(dispatch.getMessage().copy());
+ dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
+ return dispatch.getMessage();
+ }
+ }
+ }
+
+ protected void assertNoMessagesLeft(StubConnection connection) throws InterruptedException {
+ long wait = FAST_NO_MESSAGE_LEFT_ASSERT ? 0 : maxWait;
+ while (true) {
+ Object o = connection.getDispatchQueue().poll(wait, TimeUnit.MILLISECONDS);
+ if (o == null) {
+ return;
+ }
+ if (o instanceof MessageDispatch && ((MessageDispatch) o).getMessage() != null) {
+ Assert.fail("Received a message: " + ((MessageDispatch) o).getMessage().getMessageId());
+ }
+ }
+ }
+ protected void restartRemoteBroker() throws Exception {
+ remoteServer.stop();
+ Thread.sleep(2000);
+ remoteServer.start();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
index 619190f..01f6963 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
@@ -173,11 +173,8 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
}
public void testClientHang() throws Exception {
-
- //
// Manually create a client transport so that it does not send KeepAlive
- // packets.
- // this should simulate a client hang.
+ // packets. this should simulate a client hang.
clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null);
clientTransport.setTransportListener(new TransportListener() {
@Override
@@ -205,9 +202,10 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
public void transportResumed() {
}
});
+
clientTransport.start();
WireFormatInfo info = new WireFormatInfo();
- info.setVersion(OpenWireFormat.DEFAULT_VERSION);
+ info.setVersion(OpenWireFormat.DEFAULT_LEGACY_VERSION);
info.setMaxInactivityDuration(1000);
clientTransport.oneway(info);
@@ -242,19 +240,17 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
* @throws URISyntaxException
*/
public void initCombosForTestNoClientHangWithServerBlock() throws Exception {
-
startClient();
- addCombinationValues("clientInactivityLimit", new Object[]{Long.valueOf(1000)});
- addCombinationValues("serverInactivityLimit", new Object[]{Long.valueOf(1000)});
- addCombinationValues("serverRunOnCommand", new Object[]{new Runnable() {
+ addCombinationValues("clientInactivityLimit", new Object[] {Long.valueOf(1000)});
+ addCombinationValues("serverInactivityLimit", new Object[] {Long.valueOf(1000)});
+ addCombinationValues("serverRunOnCommand", new Object[] {new Runnable() {
@Override
public void run() {
try {
LOG.info("Sleeping");
Thread.sleep(4000);
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
}
}
}});
@@ -272,5 +268,4 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
assertEquals(0, clientErrorCount.get());
assertEquals(0, serverErrorCount.get());
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
index 9ae82ac..9d3c347 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/TransportUriTest.java
@@ -23,7 +23,6 @@ import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -183,15 +182,6 @@ public class TransportUriTest extends EmbeddedBrokerTestSupport {
super.tearDown();
}
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
- answer.setUseJmx(false);
- answer.setPersistent(isPersistent());
- answer.addConnector(bindAddress);
- return answer;
- }
-
public static Test suite() {
return suite(TransportUriTest.class);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
deleted file mode 100644
index 3791848..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import java.net.URI;
-
-import javax.jms.Connection;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerRegistry;
-
-public class VMTransportBrokerNameTest extends TestCase {
-
- private static final String MY_BROKER = "myBroker";
- final String vmUrl = "vm:(broker:(tcp://localhost:61616)/" + MY_BROKER + "?persistent=false)";
-
- public void testBrokerName() throws Exception {
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(vmUrl));
- ActiveMQConnection c1 = (ActiveMQConnection) cf.createConnection();
- assertTrue("Transport has name in it: " + c1.getTransport(), c1.getTransport().toString().contains(MY_BROKER));
-
- // verify Broker is there with name
- ActiveMQConnectionFactory cfbyName = new ActiveMQConnectionFactory(new URI("vm://" + MY_BROKER + "?create=false"));
- Connection c2 = cfbyName.createConnection();
-
- assertNotNull(BrokerRegistry.getInstance().lookup(MY_BROKER));
- assertEquals(BrokerRegistry.getInstance().findFirst().getBrokerName(), MY_BROKER);
- assertEquals(BrokerRegistry.getInstance().getBrokers().size(), 1);
-
- c1.close();
- c2.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java
deleted file mode 100644
index 52e4b88..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import junit.framework.Test;
-
-import org.apache.activemq.transport.TransportBrokerTestSupport;
-
-public class VMTransportBrokerTest extends TransportBrokerTestSupport {
-
- @Override
- protected String getBindLocation() {
- return "vm://localhost";
- }
-
- public static Test suite() {
- return suite(VMTransportBrokerTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java
deleted file mode 100644
index dbc7f29..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportEmbeddedBrokerTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import javax.jms.DeliveryMode;
-
-import org.apache.activemq.broker.BrokerRegistry;
-import org.apache.activemq.broker.BrokerTestSupport;
-import org.apache.activemq.broker.StubConnection;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.util.IOExceptionSupport;
-
-/**
- * Used to see if the VM transport starts an embedded broker on demand.
- */
-public class VMTransportEmbeddedBrokerTest extends BrokerTestSupport {
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(VMTransportEmbeddedBrokerTest.class);
- }
-
- public void testConsumerPrefetchAtOne() throws Exception {
-
- // Make sure the broker is created due to the connection being started.
- assertNull(BrokerRegistry.getInstance().lookup("localhost"));
- StubConnection connection = createConnection();
- assertNotNull(BrokerRegistry.getInstance().lookup("localhost"));
-
- // Start a producer and consumer
- ConnectionInfo connectionInfo = createConnectionInfo();
- SessionInfo sessionInfo = createSessionInfo(connectionInfo);
- ProducerInfo producerInfo = createProducerInfo(sessionInfo);
- connection.send(connectionInfo);
- connection.send(sessionInfo);
- connection.send(producerInfo);
-
- ActiveMQQueue destination = new ActiveMQQueue("TEST");
-
- ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
- consumerInfo.setPrefetchSize(1);
- connection.send(consumerInfo);
-
- // Send 2 messages to the broker.
- connection.send(createMessage(producerInfo, destination, DeliveryMode.NON_PERSISTENT));
- connection.send(createMessage(producerInfo, destination, DeliveryMode.NON_PERSISTENT));
-
- // Make sure only 1 message was delivered.
- Message m = receiveMessage(connection);
- assertNotNull(m);
- assertNoMessagesLeft(connection);
-
- // Make sure the broker is shutdown when the connection is stopped.
- assertNotNull(BrokerRegistry.getInstance().lookup("localhost"));
- connection.stop();
- assertNull(BrokerRegistry.getInstance().lookup("localhost"));
- }
-
- @Override
- protected void setUp() throws Exception {
- // Don't call super since it manually starts up a broker.
- }
-
- @Override
- protected void tearDown() throws Exception {
- // Don't call super since it manually tears down a broker.
- }
-
- @Override
- protected StubConnection createConnection() throws Exception {
- try {
- Transport transport = TransportFactory.connect(new URI("vm://localhost?broker.persistent=false"));
- StubConnection connection = new StubConnection(transport);
- return connection;
- }
- catch (URISyntaxException e) {
- throw IOExceptionSupport.create(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
deleted file mode 100644
index 2268048..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
+++ /dev/null
@@ -1,937 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.command.BaseCommand;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.state.CommandVisitor;
-import org.apache.activemq.transport.FutureResponse;
-import org.apache.activemq.transport.MutexTransport;
-import org.apache.activemq.transport.ResponseCallback;
-import org.apache.activemq.transport.ResponseCorrelator;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportDisposedIOException;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VMTransportThreadSafeTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(VMTransportThreadSafeTest.class);
-
- private final static String location1 = "vm://transport1";
- private final static String location2 = "vm://transport2";
-
- private final ConcurrentLinkedQueue<DummyCommand> localReceived = new ConcurrentLinkedQueue<>();
- private final ConcurrentLinkedQueue<DummyCommand> remoteReceived = new ConcurrentLinkedQueue<>();
-
- private class DummyCommand extends BaseCommand {
-
- public final int sequenceId;
-
- public DummyCommand() {
- this.sequenceId = 0;
- }
-
- public DummyCommand(int id) {
- this.sequenceId = id;
- }
-
- @Override
- public Response visit(CommandVisitor visitor) throws Exception {
- return null;
- }
-
- @Override
- public byte getDataStructureType() {
- return 42;
- }
- }
-
- private class VMTestTransportListener implements TransportListener {
-
- protected final Queue<DummyCommand> received;
-
- public boolean shutdownReceived = false;
-
- public VMTestTransportListener(Queue<DummyCommand> receiveQueue) {
- this.received = receiveQueue;
- }
-
- @Override
- public void onCommand(Object command) {
-
- if (command instanceof ShutdownInfo) {
- shutdownReceived = true;
- }
- else {
- received.add((DummyCommand) command);
- }
- }
-
- @Override
- public void onException(IOException error) {
- }
-
- @Override
- public void transportInterupted() {
- }
-
- @Override
- public void transportResumed() {
- }
- }
-
- private class VMResponderTransportListener implements TransportListener {
-
- protected final Queue<DummyCommand> received;
-
- private final Transport peer;
-
- public VMResponderTransportListener(Queue<DummyCommand> receiveQueue, Transport peer) {
- this.received = receiveQueue;
- this.peer = peer;
- }
-
- @Override
- public void onCommand(Object command) {
-
- if (command instanceof ShutdownInfo) {
- return;
- }
- else {
- received.add((DummyCommand) command);
-
- if (peer != null) {
- try {
- peer.oneway(command);
- }
- catch (IOException e) {
- }
- }
- }
- }
-
- @Override
- public void onException(IOException error) {
- }
-
- @Override
- public void transportInterupted() {
- }
-
- @Override
- public void transportResumed() {
- }
- }
-
- private class SlowVMTestTransportListener extends VMTestTransportListener {
-
- private final TimeUnit delayUnit;
- private final long delay;
-
- public SlowVMTestTransportListener(Queue<DummyCommand> receiveQueue) {
- this(receiveQueue, 10, TimeUnit.MILLISECONDS);
- }
-
- public SlowVMTestTransportListener(Queue<DummyCommand> receiveQueue, long delay, TimeUnit delayUnit) {
- super(receiveQueue);
-
- this.delay = delay;
- this.delayUnit = delayUnit;
- }
-
- @Override
- public void onCommand(Object command) {
- super.onCommand(command);
- try {
- delayUnit.sleep(delay);
- }
- catch (InterruptedException e) {
- }
- }
- }
-
- private class GatedVMTestTransportListener extends VMTestTransportListener {
-
- private final CountDownLatch gate;
-
- public GatedVMTestTransportListener(Queue<DummyCommand> receiveQueue) {
- this(receiveQueue, new CountDownLatch(1));
- }
-
- public GatedVMTestTransportListener(Queue<DummyCommand> receiveQueue, CountDownLatch gate) {
- super(receiveQueue);
-
- this.gate = gate;
- }
-
- @Override
- public void onCommand(Object command) {
- super.onCommand(command);
- try {
- gate.await();
- }
- catch (InterruptedException e) {
- }
- }
- }
-
- private void assertMessageAreOrdered(ConcurrentLinkedQueue<DummyCommand> queue) {
- int lastSequenceId = 0;
- for (DummyCommand command : queue) {
- int id = command.sequenceId;
- assertTrue("Last id: " + lastSequenceId + " should be less than current id: " + id, id > lastSequenceId);
- }
- }
-
- @Before
- public void setUp() throws Exception {
- localReceived.clear();
- remoteReceived.clear();
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test(timeout = 60000)
- public void testStartWthoutListenerIOE() throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- remote.setTransportListener(new VMTestTransportListener(localReceived));
-
- try {
- local.start();
- fail("Should have thrown an IOExcoption");
- }
- catch (IOException e) {
- }
- }
-
- @Test(timeout = 60000)
- public void testOnewayOnStoppedTransportTDE() throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
- local.start();
- local.stop();
-
- try {
- local.oneway(new DummyCommand());
- fail("Should have thrown a TransportDisposedException");
- }
- catch (TransportDisposedIOException e) {
- }
- }
-
- @Test(timeout = 60000)
- public void testStopSendsShutdownToPeer() throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(remoteListener);
-
- local.start();
- local.stop();
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteListener.shutdownReceived;
- }
- }));
- }
-
- @Test(timeout = 60000)
- public void testRemoteStopSendsExceptionToPendingRequests() throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived);
- remote.setTransportListener(remoteListener);
- remote.start();
-
- final Response[] answer = new Response[1];
- ResponseCorrelator responseCorrelator = new ResponseCorrelator(local);
- responseCorrelator.setTransportListener(new VMTestTransportListener(localReceived));
- responseCorrelator.start();
- responseCorrelator.asyncRequest(new DummyCommand(), new ResponseCallback() {
- @Override
- public void onCompletion(FutureResponse resp) {
- try {
- answer[0] = resp.getResult();
- }
- catch (IOException e) {
- e.printStackTrace();
- }
- }
- });
-
- // simulate broker stop
- remote.stop();
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.info("answer: " + answer[0]);
- return answer[0] instanceof ExceptionResponse && ((ExceptionResponse) answer[0]).getException() instanceof TransportDisposedIOException;
- }
- }));
-
- local.stop();
- }
-
- @Test(timeout = 60000)
- public void testMultipleStartsAndStops() throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
- local.start();
- remote.start();
-
- local.start();
- remote.start();
-
- for (int i = 0; i < 100; ++i) {
- local.oneway(new DummyCommand());
- }
-
- for (int i = 0; i < 100; ++i) {
- remote.oneway(new DummyCommand());
- }
-
- local.start();
- remote.start();
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteReceived.size() == 100;
- }
- }));
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return localReceived.size() == 100;
- }
- }));
-
- local.stop();
- local.stop();
- remote.stop();
- remote.stop();
- }
-
- @Test(timeout = 60000)
- public void testStartWithPeerNotStartedEnqueusCommandsNonAsync() throws Exception {
- doTestStartWithPeerNotStartedEnqueusCommands(false);
- }
-
- private void doTestStartWithPeerNotStartedEnqueusCommands(boolean async) throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- remote.setAsync(async);
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
- local.start();
-
- for (int i = 0; i < 100; ++i) {
- local.oneway(new DummyCommand());
- }
-
- assertEquals(100, remote.getMessageQueue().size());
-
- remote.start();
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteReceived.size() == 100;
- }
- }));
-
- local.stop();
- remote.stop();
- }
-
- @Test(timeout = 60000)
- public void testBlockedOnewayEnqeueAandStopTransportAsync() throws Exception {
- doTestBlockedOnewayEnqeueAandStopTransport(true);
- }
-
- @Test(timeout = 60000)
- public void testBlockedOnewayEnqeueAandStopTransportNonAsync() throws Exception {
- doTestBlockedOnewayEnqeueAandStopTransport(false);
- }
-
- private void doTestBlockedOnewayEnqeueAandStopTransport(boolean async) throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- final AtomicInteger sequenceId = new AtomicInteger();
-
- remote.setAsync(async);
- remote.setAsyncQueueDepth(99);
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
- local.start();
-
- Thread t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- for (int i = 0; i < 100; ++i) {
- try {
- local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
- }
- catch (Exception e) {
- }
- }
-
- }
- });
- t.start();
-
- LOG.debug("Started async delivery, wait for remote's queue to fill up");
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remote.getMessageQueue().remainingCapacity() == 0;
- }
- }));
-
- LOG.debug("Remote messageQ is full, start it and stop all");
-
- remote.start();
- local.stop();
- remote.stop();
- }
-
- @Test(timeout = 60000)
- public void testBlockedOnewayEnqeueWhileStartedDetectsStop() throws Exception {
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- final AtomicInteger sequenceId = new AtomicInteger();
-
- remote.setAsync(true);
- remote.setAsyncQueueDepth(2);
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new GatedVMTestTransportListener(remoteReceived));
-
- local.start();
- remote.start();
-
- Thread t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- for (int i = 0; i < 3; ++i) {
- try {
- local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
- }
- catch (Exception e) {
- }
- }
-
- }
- });
- t.start();
-
- LOG.debug("Started async delivery, wait for remote's queue to fill up");
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remote.getMessageQueue().remainingCapacity() == 0;
- }
- }));
-
- LOG.debug("Starting async gate open.");
- Thread gateman = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(100);
- }
- catch (InterruptedException e) {
- }
- ((GatedVMTestTransportListener) remote.getTransportListener()).gate.countDown();
- }
- });
- gateman.start();
-
- remote.stop();
- local.stop();
-
- assertEquals(1, remoteReceived.size());
- assertMessageAreOrdered(remoteReceived);
- }
-
- @Test(timeout = 60000)
- public void testStopWhileStartingAsyncWithNoAsyncLimit() throws Exception {
- // In the async case the iterate method should see that we are stopping and
- // drop out before we dispatch all the messages but it should get at least 49 since
- // the stop thread waits 500 mills and the listener is waiting 10 mills on each receive.
- doTestStopWhileStartingWithNoAsyncLimit(true, 49);
- }
-
- @Test(timeout = 60000)
- public void testStopWhileStartingNonAsyncWithNoAsyncLimit() throws Exception {
- // In the non-async case the start dispatches all messages up front and then continues on
- doTestStopWhileStartingWithNoAsyncLimit(false, 100);
- }
-
- private void doTestStopWhileStartingWithNoAsyncLimit(boolean async, final int expect) throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- remote.setAsync(async);
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new SlowVMTestTransportListener(remoteReceived));
-
- local.start();
-
- for (int i = 0; i < 100; ++i) {
- local.oneway(new DummyCommand(i));
- }
-
- Thread t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- Thread.sleep(1000);
- remote.stop();
- }
- catch (Exception e) {
- }
- }
- });
-
- remote.start();
-
- t.start();
-
- assertTrue("Remote should receive: " + expect + ", commands but got: " + remoteReceived.size(), Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteReceived.size() >= expect;
- }
- }));
-
- LOG.debug("Remote listener received " + remoteReceived.size() + " messages");
-
- local.stop();
-
- assertTrue("Remote transport never was disposed.", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remote.isDisposed();
- }
- }));
- }
-
- @Test(timeout = 120000)
- public void TestTwoWayMessageThroughPutSync() throws Exception {
-
- long totalTimes = 0;
- final long executions = 20;
-
- for (int i = 0; i < 20; ++i) {
- totalTimes += doTestTwoWayMessageThroughPut(false);
- }
-
- LOG.info("Total time of one way sync send throughput test: " + (totalTimes / executions) + "ms");
- }
-
- @Test(timeout = 120000)
- public void TestTwoWayMessageThroughPutAsnyc() throws Exception {
-
- long totalTimes = 0;
- final long executions = 50;
-
- for (int i = 0; i < executions; ++i) {
- totalTimes += doTestTwoWayMessageThroughPut(false);
- }
-
- LOG.info("Total time of one way async send throughput test: " + (totalTimes / executions) + "ms");
- }
-
- private long doTestTwoWayMessageThroughPut(boolean async) throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- final AtomicInteger sequenceId = new AtomicInteger();
-
- remote.setAsync(async);
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
- final int messageCount = 200000;
-
- local.start();
- remote.start();
-
- long startTime = System.currentTimeMillis();
-
- Thread localSend = new Thread(new Runnable() {
-
- @Override
- public void run() {
- for (int i = 0; i < messageCount; ++i) {
- try {
- local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
- }
- catch (Exception e) {
- }
- }
-
- }
- });
-
- Thread remoteSend = new Thread(new Runnable() {
-
- @Override
- public void run() {
- for (int i = 0; i < messageCount; ++i) {
- try {
- remote.oneway(new DummyCommand(sequenceId.incrementAndGet()));
- }
- catch (Exception e) {
- }
- }
-
- }
- });
-
- localSend.start();
- remoteSend.start();
-
- // Wait for both to finish and then check that each side go the correct amount
- localSend.join();
- remoteSend.join();
-
- long endTime = System.currentTimeMillis();
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteReceived.size() == messageCount;
- }
- }));
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return localReceived.size() == messageCount;
- }
- }));
-
- LOG.debug("All messages sent,stop all");
-
- local.stop();
- remote.stop();
-
- localReceived.clear();
- remoteReceived.clear();
-
- return endTime - startTime;
- }
-
- @Test(timeout = 120000)
- public void TestOneWayMessageThroughPutSync() throws Exception {
-
- long totalTimes = 0;
- final long executions = 30;
-
- for (int i = 0; i < executions; ++i) {
- totalTimes += doTestOneWayMessageThroughPut(false);
- }
-
- LOG.info("Total time of one way sync send throughput test: " + (totalTimes / executions) + "ms");
- }
-
- @Test(timeout = 120000)
- public void TestOneWayMessageThroughPutAsnyc() throws Exception {
-
- long totalTimes = 0;
- final long executions = 20;
-
- for (int i = 0; i < 20; ++i) {
- totalTimes += doTestOneWayMessageThroughPut(true);
- }
-
- LOG.info("Total time of one way async send throughput test: " + (totalTimes / executions) + "ms");
- }
-
- private long doTestOneWayMessageThroughPut(boolean async) throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- final AtomicInteger sequenceId = new AtomicInteger();
-
- remote.setAsync(async);
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new VMTestTransportListener(remoteReceived));
-
- final int messageCount = 100000;
-
- local.start();
- remote.start();
-
- long startTime = System.currentTimeMillis();
-
- Thread localSend = new Thread(new Runnable() {
-
- @Override
- public void run() {
- for (int i = 0; i < messageCount; ++i) {
- try {
- local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
- }
- catch (Exception e) {
- }
- }
-
- }
- });
-
- localSend.start();
-
- // Wait for both to finish and then check that each side go the correct amount
- localSend.join();
-
- long endTime = System.currentTimeMillis();
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteReceived.size() == messageCount;
- }
- }));
-
- LOG.debug("All messages sent,stop all");
-
- local.stop();
- remote.stop();
-
- localReceived.clear();
- remoteReceived.clear();
-
- return endTime - startTime;
- }
-
- @Test(timeout = 120000)
- public void testTwoWayTrafficWithMutexTransportSync1() throws Exception {
-
- for (int i = 0; i < 20; ++i) {
- doTestTwoWayTrafficWithMutexTransport(false, false);
- }
- }
-
- @Test(timeout = 120000)
- public void testTwoWayTrafficWithMutexTransportSync2() throws Exception {
-
- for (int i = 0; i < 20; ++i) {
- doTestTwoWayTrafficWithMutexTransport(true, false);
- }
- }
-
- @Test(timeout = 120000)
- public void testTwoWayTrafficWithMutexTransportSync3() throws Exception {
-
- for (int i = 0; i < 20; ++i) {
- doTestTwoWayTrafficWithMutexTransport(false, true);
- }
- }
-
- @Test(timeout = 120000)
- public void testTwoWayTrafficWithMutexTransportSync4() throws Exception {
-
- for (int i = 0; i < 20; ++i) {
- doTestTwoWayTrafficWithMutexTransport(false, false);
- }
- }
-
- public void doTestTwoWayTrafficWithMutexTransport(boolean localAsync, boolean remoteAsync) throws Exception {
-
- final VMTransport vmlocal = new VMTransport(new URI(location1));
- final VMTransport vmremote = new VMTransport(new URI(location2));
-
- final MutexTransport local = new MutexTransport(vmlocal);
- final MutexTransport remote = new MutexTransport(vmremote);
-
- final AtomicInteger sequenceId = new AtomicInteger();
-
- vmlocal.setAsync(localAsync);
- vmremote.setAsync(remoteAsync);
-
- vmlocal.setPeer(vmremote);
- vmremote.setPeer(vmlocal);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(new VMResponderTransportListener(remoteReceived, remote));
-
- final int messageCount = 200000;
-
- Thread localSend = new Thread(new Runnable() {
-
- @Override
- public void run() {
- for (int i = 0; i < messageCount; ++i) {
- try {
- local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
- }
- catch (Exception e) {
- }
- }
- }
- });
-
- Thread remoteSend = new Thread(new Runnable() {
-
- @Override
- public void run() {
- for (int i = 0; i < messageCount; ++i) {
- try {
- remote.oneway(new DummyCommand(sequenceId.incrementAndGet()));
- }
- catch (Exception e) {
- }
- }
- }
- });
-
- localSend.start();
- remoteSend.start();
-
- Thread.sleep(10);
-
- local.start();
- remote.start();
-
- // Wait for both to finish and then check that each side go the correct amount
- localSend.join();
- remoteSend.join();
-
- assertTrue("Remote should have received (" + messageCount + ") but got ()" + remoteReceived.size(), Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteReceived.size() == messageCount;
- }
- }));
-
- assertTrue("Local should have received (" + messageCount * 2 + ") but got ()" + localReceived.size(), Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return localReceived.size() == messageCount * 2;
- }
- }));
-
- LOG.debug("All messages sent,stop all");
-
- local.stop();
- remote.stop();
-
- localReceived.clear();
- remoteReceived.clear();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
deleted file mode 100644
index dd14d67..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.JMSException;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerRegistry;
-import org.apache.activemq.broker.BrokerService;
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VMTransportWaitForTest {
-
- static final Logger LOG = LoggerFactory.getLogger(VMTransportWaitForTest.class);
-
- private static final int WAIT_TIME = 20000;
- private static final int SHORT_WAIT_TIME = 5000;
-
- private static final String VM_BROKER_URI_NO_WAIT = "vm://localhost?broker.persistent=false&create=false";
-
- private static final String VM_BROKER_URI_WAIT_FOR_START = VM_BROKER_URI_NO_WAIT + "&waitForStart=" + WAIT_TIME;
-
- private static final String VM_BROKER_URI_SHORT_WAIT_FOR_START = VM_BROKER_URI_NO_WAIT + "&waitForStart=" + SHORT_WAIT_TIME;
-
- CountDownLatch started = new CountDownLatch(1);
- CountDownLatch gotConnection = new CountDownLatch(1);
-
- @After
- public void after() throws IOException {
- BrokerRegistry.getInstance().unbind("localhost");
- }
-
- @Test(timeout = 90000)
- public void testWaitFor() throws Exception {
- try {
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_NO_WAIT));
- cf.createConnection();
- fail("expect broker not exist exception");
- }
- catch (JMSException expectedOnNoBrokerAndNoCreate) {
- }
-
- // spawn a thread that will wait for an embedded broker to start via
- // vm://..
- Thread t = new Thread("ClientConnectionThread") {
- @Override
- public void run() {
- try {
- started.countDown();
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_WAIT_FOR_START));
- cf.createConnection();
- gotConnection.countDown();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("unexpected exception: " + e);
- }
- }
- };
- t.start();
- started.await(20, TimeUnit.SECONDS);
- Thread.yield();
- assertFalse("has not got connection", gotConnection.await(2, TimeUnit.SECONDS));
-
- BrokerService broker = new BrokerService();
- broker.setPersistent(false);
- broker.start();
- assertTrue("has got connection", gotConnection.await(5, TimeUnit.SECONDS));
- broker.stop();
- }
-
- @Test(timeout = 90000)
- public void testWaitForNoBrokerInRegistry() throws Exception {
-
- long startTime = System.currentTimeMillis();
-
- try {
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START));
- cf.createConnection();
- fail("expect broker not exist exception");
- }
- catch (JMSException expectedOnNoBrokerAndNoCreate) {
- }
-
- long endTime = System.currentTimeMillis();
-
- LOG.info("Total wait time was: {}", endTime - startTime);
- assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100);
- }
-
- @Test(timeout = 90000)
- public void testWaitForNotStartedButInRegistry() throws Exception {
-
- BrokerService broker = new BrokerService();
- broker.setPersistent(false);
- BrokerRegistry.getInstance().bind("localhost", broker);
-
- long startTime = System.currentTimeMillis();
-
- try {
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START));
- cf.createConnection();
- fail("expect broker not exist exception");
- }
- catch (JMSException expectedOnNoBrokerAndNoCreate) {
- }
-
- long endTime = System.currentTimeMillis();
-
- LOG.info("Total wait time was: {}", endTime - startTime);
- assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
deleted file mode 100644
index 2b97cff..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.vm;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.bugs.embedded.ThreadExplorer;
-import org.apache.activemq.network.NetworkConnector;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class VmTransportNetworkBrokerTest extends TestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(VmTransportNetworkBrokerTest.class);
-
- private static final String VM_BROKER_URI = "vm://localhost?create=false";
-
- CountDownLatch started = new CountDownLatch(1);
- CountDownLatch gotConnection = new CountDownLatch(1);
-
- public void testNoThreadLeak() throws Exception {
-
- // with VMConnection and simple discovery network connector
- int originalThreadCount = Thread.activeCount();
- LOG.debug(ThreadExplorer.show("threads at beginning"));
-
- BrokerService broker = new BrokerService();
- broker.setDedicatedTaskRunner(true);
- broker.setPersistent(false);
- broker.addConnector("tcp://localhost:61616");
- NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://wrongHostname1:61617,tcp://wrongHostname2:61618)?useExponentialBackOff=false");
- networkConnector.setDuplex(true);
- broker.start();
-
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI));
- Connection connection = cf.createConnection("system", "manager");
- connection.start();
-
- // let it settle
- TimeUnit.SECONDS.sleep(5);
-
- int threadCountAfterStart = Thread.activeCount();
- TimeUnit.SECONDS.sleep(30);
- int threadCountAfterSleep = Thread.activeCount();
-
- assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" + threadCountAfterStart + " threadCountAfterSleep=" + threadCountAfterSleep, threadCountAfterSleep < threadCountAfterStart + 8);
-
- connection.close();
- broker.stop();
- broker.waitUntilStopped();
-
- // testNoDanglingThreadsAfterStop with tcp transport
- broker = new BrokerService();
- broker.setSchedulerSupport(true);
- broker.setDedicatedTaskRunner(true);
- broker.setPersistent(false);
- broker.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
- broker.start();
-
- cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
- connection = cf.createConnection("system", "manager");
- connection.start();
- connection.close();
- broker.stop();
- broker.waitUntilStopped();
-
- // let it settle
- TimeUnit.SECONDS.sleep(5);
-
- // get final threads but filter out any daemon threads that the JVM may have created.
- Thread[] threads = filterDaemonThreads(ThreadExplorer.listThreads());
- int threadCountAfterStop = threads.length;
-
- // lets see the thread counts at INFO level so they are always in the test log
- LOG.info(ThreadExplorer.show("active after stop"));
- LOG.info("originalThreadCount=" + originalThreadCount + " threadCountAfterStop=" + threadCountAfterStop);
-
- assertTrue("Threads are leaking: " +
- ThreadExplorer.show("active after stop") +
- ". originalThreadCount=" +
- originalThreadCount +
- " threadCountAfterStop=" +
- threadCountAfterStop, threadCountAfterStop <= originalThreadCount);
- }
-
- /**
- * Filters any daemon threads from the thread list.
- *
- * Thread counts before and after the test should ideally be equal.
- * However there is no guarantee that the JVM does not create any
- * additional threads itself.
- * E.g. on Mac OSX there is a JVM internal thread called
- * "Poller SunPKCS11-Darwin" created after the test go started and
- * under the main thread group.
- * When debugging tests in Eclipse another so called "Reader" thread
- * is created by Eclipse.
- * So we cannot assume that the JVM does not create additional threads
- * during the test. However for the time being we assume that any such
- * additionally created threads are daemon threads.
- *
- * @param threads - the array of threads to parse
- * @return a new array with any daemon threads removed
- */
- public Thread[] filterDaemonThreads(Thread[] threads) throws Exception {
-
- List<Thread> threadList = new ArrayList<>(Arrays.asList(threads));
-
- // Can't use an Iterator as it would raise a
- // ConcurrentModificationException when trying to remove an element
- // from the list, so using standard walk through
- for (int i = 0; i < threadList.size(); i++) {
-
- Thread thread = threadList.get(i);
- LOG.debug("Inspecting thread " + thread.getName());
- if (thread.isDaemon()) {
- LOG.debug("Removing deamon thread.");
- threadList.remove(thread);
- Thread.sleep(100);
-
- }
- }
- LOG.debug("Converting list back to Array");
- return threadList.toArray(new Thread[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a54f4ed3/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java
new file mode 100644
index 0000000..03e0d2e
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/LockFileTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LockFileTest {
+
+ @Test
+ public void testNoDeleteOnUnlockIfNotLocked() throws Exception {
+
+ File lockFile = new File(IOHelper.getDefaultDataDirectory(), "lockToTest1");
+ IOHelper.mkdirs(lockFile.getParentFile());
+ lockFile.createNewFile();
+
+ LockFile underTest = new LockFile(lockFile, true);
+
+ underTest.lock();
+
+ lockFile.delete();
+
+ assertFalse("no longer valid", underTest.keepAlive());
+
+ // a slave gets in
+ lockFile.createNewFile();
+
+ underTest.unlock();
+
+ assertTrue("file still exists after unlock when not locked", lockFile.exists());
+
+ }
+
+ @Test
+ public void testDeleteOnUnlockIfLocked() throws Exception {
+
+ File lockFile = new File(IOHelper.getDefaultDataDirectory(), "lockToTest2");
+ IOHelper.mkdirs(lockFile.getParentFile());
+ lockFile.createNewFile();
+
+ LockFile underTest = new LockFile(lockFile, true);
+
+ underTest.lock();
+
+ assertTrue("valid", underTest.keepAlive());
+
+ underTest.unlock();
+
+ assertFalse("file deleted on unlock", lockFile.exists());
+
+ }
+}