You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ce...@apache.org on 2014/08/07 02:12:37 UTC

[2/2] git commit: Updated test for https://issues.apache.org/jira/browse/AMQ-5290 after tbish commit for enhancement.

Updated test for https://issues.apache.org/jira/browse/AMQ-5290 after tbish commit for enhancement.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0d9eedc6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0d9eedc6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0d9eedc6

Branch: refs/heads/trunk
Commit: 0d9eedc658a84b15cee9efcd0fe991a5345324da
Parents: c42b874
Author: Christian Posta <ch...@gmail.com>
Authored: Wed Aug 6 17:02:17 2014 -0700
Committer: Christian Posta <ch...@gmail.com>
Committed: Wed Aug 6 17:02:17 2014 -0700

----------------------------------------------------------------------
 .../MQTTNetworkOfBrokersFailoverTest.java       | 32 ++++++++++++--------
 1 file changed, 19 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0d9eedc6/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
index bf24971..e1ab183 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
@@ -24,7 +24,6 @@ import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.network.NetworkTestSupport;
 import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.util.Wait;
 import org.apache.commons.lang.ArrayUtils;
 import org.fusesource.hawtdispatch.Dispatch;
 import org.fusesource.mqtt.client.*;
@@ -48,10 +47,6 @@ import java.util.concurrent.TimeUnit;
 public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(MQTTNetworkOfBrokersFailoverTest.class);
-
-    private final String subName = "Subscriber1";
-    private final String subName2 = "Subscriber2";
-    private final String topicName = "TEST.FOO";
     private int localBrokerMQTTPort = -1;
     private int remoteBrokerMQTTPort = -1;
 
@@ -99,11 +94,12 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
         BlockingConnection remoteConn = remoteMqtt.blockingConnection();
         remoteConn.connect();
         remoteConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
+
+        assertTrue("No destination detected!", consumerNetworked.await(1, TimeUnit.SECONDS));
+        assertQueueExistsOn(remoteBroker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
+        assertQueueExistsOn(broker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
         remoteConn.disconnect();
 
-        consumerNetworked.await(1, TimeUnit.SECONDS);
-        assertOneDurableSubOn(remoteBroker, "foo");
-        assertOneDurableSubOn(broker, "NC_localhost_inbound_local");
 
         // now we reconnect the same sub on the local broker, again with clean==0
         MQTT localMqtt = createMQTTTcpConnection("foo", false, localBrokerMQTTPort);
@@ -139,17 +135,16 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
         final CountDownLatch latch = new CountDownLatch(1);
 
         URI brokerUri = broker.getVmConnectorURI();
-        System.out.println(brokerUri.toASCIIString());
 
         final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri.toASCIIString());
         final Connection connection = cf.createConnection();
+        connection.start();
         final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Destination dest = session.createTopic("ActiveMQ.Advisory.Consumer.Topic.foo.bar.>");
+        Destination dest = session.createTopic("ActiveMQ.Advisory.Consumer.Queue.Consumer.foo:AT_LEAST_ONCE.VirtualTopic.foo.bar");
         MessageConsumer consumer = session.createConsumer(dest);
         consumer.setMessageListener(new MessageListener() {
             @Override
             public void onMessage(Message message) {
-                System.out.println("got message! " + message);
                 latch.countDown();
                 // shutdown this connection
                 Dispatch.getGlobalQueue().execute(new Runnable() {
@@ -170,6 +165,13 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
         return latch;
     }
 
+    private void assertQueueExistsOn(BrokerService broker, String queueName) throws Exception {
+        BrokerViewMBean brokerView = broker.getAdminView();
+        ObjectName[] queueNames = brokerView.getQueues();
+        assertEquals(1, queueNames.length);
+
+        assertTrue(queueNames[0].toString().contains(queueName));
+    }
 
     private void assertOneDurableSubOn(BrokerService broker, String subName) throws Exception {
         BrokerViewMBean brokerView = broker.getAdminView();
@@ -193,7 +195,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
         broker.setBrokerName("local");
         broker.setDataDirectory("target/activemq-data");
         broker.setDeleteAllMessagesOnStartup(true);
-        TransportConnector tc = broker.addConnector("mqtt://localhost:0");
+        TransportConnector tc = broker.addConnector(getDefaultMQTTTransportConnectorUri());
         localBrokerMQTTPort = tc.getConnectUri().getPort();
         return broker;
     }
@@ -204,11 +206,15 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
         broker.setPersistent(true);
         broker.setDeleteAllMessagesOnStartup(true);
         broker.setDataDirectory("target/activemq-data");
-        TransportConnector tc = broker.addConnector("mqtt://localhost:0");
+        TransportConnector tc = broker.addConnector(getDefaultMQTTTransportConnectorUri());
         remoteBrokerMQTTPort = tc.getConnectUri().getPort();
         return broker;
     }
 
+    private String getDefaultMQTTTransportConnectorUri(){
+        return "mqtt://localhost:0?transport.subscriptionStrategyName=mqtt-virtual-topic-subscriptions";
+    }
+
     private MQTT createMQTTTcpConnection(String clientId, boolean clean, int port) throws Exception {
         MQTT mqtt = new MQTT();
         mqtt.setConnectAttemptsMax(1);