You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ce...@apache.org on 2014/08/07 02:12:37 UTC
[2/2] git commit: Updated test for
https://issues.apache.org/jira/browse/AMQ-5290 after tbish commit for
enhancement.
Updated test for https://issues.apache.org/jira/browse/AMQ-5290 after tbish commit for enhancement.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0d9eedc6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0d9eedc6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0d9eedc6
Branch: refs/heads/trunk
Commit: 0d9eedc658a84b15cee9efcd0fe991a5345324da
Parents: c42b874
Author: Christian Posta <ch...@gmail.com>
Authored: Wed Aug 6 17:02:17 2014 -0700
Committer: Christian Posta <ch...@gmail.com>
Committed: Wed Aug 6 17:02:17 2014 -0700
----------------------------------------------------------------------
.../MQTTNetworkOfBrokersFailoverTest.java | 32 ++++++++++++--------
1 file changed, 19 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/0d9eedc6/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
index bf24971..e1ab183 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
@@ -24,7 +24,6 @@ import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.network.NetworkTestSupport;
import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.util.Wait;
import org.apache.commons.lang.ArrayUtils;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.mqtt.client.*;
@@ -48,10 +47,6 @@ import java.util.concurrent.TimeUnit;
public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTNetworkOfBrokersFailoverTest.class);
-
- private final String subName = "Subscriber1";
- private final String subName2 = "Subscriber2";
- private final String topicName = "TEST.FOO";
private int localBrokerMQTTPort = -1;
private int remoteBrokerMQTTPort = -1;
@@ -99,11 +94,12 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
BlockingConnection remoteConn = remoteMqtt.blockingConnection();
remoteConn.connect();
remoteConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
+
+ assertTrue("No destination detected!", consumerNetworked.await(1, TimeUnit.SECONDS));
+ assertQueueExistsOn(remoteBroker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
+ assertQueueExistsOn(broker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
remoteConn.disconnect();
- consumerNetworked.await(1, TimeUnit.SECONDS);
- assertOneDurableSubOn(remoteBroker, "foo");
- assertOneDurableSubOn(broker, "NC_localhost_inbound_local");
// now we reconnect the same sub on the local broker, again with clean==0
MQTT localMqtt = createMQTTTcpConnection("foo", false, localBrokerMQTTPort);
@@ -139,17 +135,16 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
final CountDownLatch latch = new CountDownLatch(1);
URI brokerUri = broker.getVmConnectorURI();
- System.out.println(brokerUri.toASCIIString());
final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri.toASCIIString());
final Connection connection = cf.createConnection();
+ connection.start();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination dest = session.createTopic("ActiveMQ.Advisory.Consumer.Topic.foo.bar.>");
+ Destination dest = session.createTopic("ActiveMQ.Advisory.Consumer.Queue.Consumer.foo:AT_LEAST_ONCE.VirtualTopic.foo.bar");
MessageConsumer consumer = session.createConsumer(dest);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
- System.out.println("got message! " + message);
latch.countDown();
// shutdown this connection
Dispatch.getGlobalQueue().execute(new Runnable() {
@@ -170,6 +165,13 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
return latch;
}
+ private void assertQueueExistsOn(BrokerService broker, String queueName) throws Exception {
+ BrokerViewMBean brokerView = broker.getAdminView();
+ ObjectName[] queueNames = brokerView.getQueues();
+ assertEquals(1, queueNames.length);
+
+ assertTrue(queueNames[0].toString().contains(queueName));
+ }
private void assertOneDurableSubOn(BrokerService broker, String subName) throws Exception {
BrokerViewMBean brokerView = broker.getAdminView();
@@ -193,7 +195,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
broker.setBrokerName("local");
broker.setDataDirectory("target/activemq-data");
broker.setDeleteAllMessagesOnStartup(true);
- TransportConnector tc = broker.addConnector("mqtt://localhost:0");
+ TransportConnector tc = broker.addConnector(getDefaultMQTTTransportConnectorUri());
localBrokerMQTTPort = tc.getConnectUri().getPort();
return broker;
}
@@ -204,11 +206,15 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
broker.setPersistent(true);
broker.setDeleteAllMessagesOnStartup(true);
broker.setDataDirectory("target/activemq-data");
- TransportConnector tc = broker.addConnector("mqtt://localhost:0");
+ TransportConnector tc = broker.addConnector(getDefaultMQTTTransportConnectorUri());
remoteBrokerMQTTPort = tc.getConnectUri().getPort();
return broker;
}
+ private String getDefaultMQTTTransportConnectorUri(){
+ return "mqtt://localhost:0?transport.subscriptionStrategyName=mqtt-virtual-topic-subscriptions";
+ }
+
private MQTT createMQTTTcpConnection(String clientId, boolean clean, int port) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setConnectAttemptsMax(1);