You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ce...@apache.org on 2014/08/07 02:12:36 UTC

[1/2] git commit: Test for https://issues.apache.org/jira/browse/AMQ-5290

Repository: activemq
Updated Branches:
  refs/heads/trunk 413e4840d -> 0d9eedc65


Test for https://issues.apache.org/jira/browse/AMQ-5290


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c42b8749
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c42b8749
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c42b8749

Branch: refs/heads/trunk
Commit: c42b874972672bbf1ba80d3de4926a0d262e9353
Parents: 413e484
Author: Christian Posta <ch...@gmail.com>
Authored: Wed Aug 6 15:53:16 2014 -0700
Committer: Christian Posta <ch...@gmail.com>
Committed: Wed Aug 6 15:53:39 2014 -0700

----------------------------------------------------------------------
 .../transport/mqtt/FuseMQQTTClientProvider.java | 110 ---------
 .../transport/mqtt/FuseMQTTClientProvider.java  | 110 +++++++++
 .../transport/mqtt/MQTTTestSupport.java         |   2 +-
 .../MQTTNetworkOfBrokersFailoverTest.java       | 244 +++++++++++++++++++
 4 files changed, 355 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c42b8749/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
deleted file mode 100644
index d329066..0000000
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.mqtt;
-
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLContext;
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.Message;
-import org.fusesource.mqtt.client.QoS;
-import org.fusesource.mqtt.client.Topic;
-import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
-
-class FuseMQQTTClientProvider implements MQTTClientProvider {
-    private final MQTT mqtt = new MQTT();
-    private BlockingConnection connection;
-    @Override
-    public void connect(String host) throws Exception {
-        mqtt.setHost(host);
-        // shut off connect retry
-        mqtt.setConnectAttemptsMax(0);
-        mqtt.setReconnectAttemptsMax(0);
-        connection = mqtt.blockingConnection();
-        connection.connect();
-    }
-
-    @Override
-    public void disconnect() throws Exception {
-        if (this.connection != null){
-            this.connection.disconnect();
-        }
-    }
-
-    @Override
-    public void publish(String topic, byte[] payload, int qos) throws Exception {
-        publish(topic,payload,qos,false);
-    }
-
-    @Override
-    public void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception {
-        connection.publish(topic,payload, QoS.values()[qos],retained);
-    }
-
-    @Override
-    public void subscribe(String topic, int qos) throws Exception {
-        Topic[] topics = {new Topic(utf8(topic), QoS.values()[qos])};
-        connection.subscribe(topics);
-    }
-
-    @Override
-    public void unsubscribe(String topic) throws Exception {
-        connection.unsubscribe(new String[]{topic});
-    }
-
-    @Override
-    public byte[] receive(int timeout) throws Exception {
-        byte[] result = null;
-        Message message = connection.receive(timeout, TimeUnit.MILLISECONDS);
-        if (message != null){
-            result = message.getPayload();
-            message.ack();
-        }
-        return result;
-    }
-
-    @Override
-    public void setSslContext(SSLContext sslContext) {
-        mqtt.setSslContext(sslContext);
-    }
-
-    @Override
-    public void setWillMessage(String string) {
-        mqtt.setWillMessage(string);
-    }
-
-    @Override
-    public void setWillTopic(String topic) {
-        mqtt.setWillTopic(topic);
-    }
-
-    @Override
-    public void setClientId(String clientId) {
-        mqtt.setClientId(clientId);
-    }
-
-    @Override
-    public void kill() throws Exception {
-        connection.kill();
-    }
-
-    @Override
-    public void setKeepAlive(int keepAlive) throws Exception {
-        mqtt.setKeepAlive((short) keepAlive);
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/c42b8749/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQTTClientProvider.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQTTClientProvider.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQTTClientProvider.java
new file mode 100644
index 0000000..f7e6c48
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQTTClientProvider.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
+
+public class FuseMQTTClientProvider implements MQTTClientProvider {
+    private final MQTT mqtt = new MQTT();
+    private BlockingConnection connection;
+    @Override
+    public void connect(String host) throws Exception {
+        mqtt.setHost(host);
+        // shut off connect retry
+        mqtt.setConnectAttemptsMax(0);
+        mqtt.setReconnectAttemptsMax(0);
+        connection = mqtt.blockingConnection();
+        connection.connect();
+    }
+
+    @Override
+    public void disconnect() throws Exception {
+        if (this.connection != null){
+            this.connection.disconnect();
+        }
+    }
+
+    @Override
+    public void publish(String topic, byte[] payload, int qos) throws Exception {
+        publish(topic,payload,qos,false);
+    }
+
+    @Override
+    public void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception {
+        connection.publish(topic,payload, QoS.values()[qos],retained);
+    }
+
+    @Override
+    public void subscribe(String topic, int qos) throws Exception {
+        Topic[] topics = {new Topic(utf8(topic), QoS.values()[qos])};
+        connection.subscribe(topics);
+    }
+
+    @Override
+    public void unsubscribe(String topic) throws Exception {
+        connection.unsubscribe(new String[]{topic});
+    }
+
+    @Override
+    public byte[] receive(int timeout) throws Exception {
+        byte[] result = null;
+        Message message = connection.receive(timeout, TimeUnit.MILLISECONDS);
+        if (message != null){
+            result = message.getPayload();
+            message.ack();
+        }
+        return result;
+    }
+
+    @Override
+    public void setSslContext(SSLContext sslContext) {
+        mqtt.setSslContext(sslContext);
+    }
+
+    @Override
+    public void setWillMessage(String string) {
+        mqtt.setWillMessage(string);
+    }
+
+    @Override
+    public void setWillTopic(String topic) {
+        mqtt.setWillTopic(topic);
+    }
+
+    @Override
+    public void setClientId(String clientId) {
+        mqtt.setClientId(clientId);
+    }
+
+    @Override
+    public void kill() throws Exception {
+        connection.kill();
+    }
+
+    @Override
+    public void setKeepAlive(int keepAlive) throws Exception {
+        mqtt.setKeepAlive((short) keepAlive);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/c42b8749/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
index 558873f..19aac52 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTestSupport.java
@@ -339,7 +339,7 @@ public class MQTTTestSupport {
     }
 
     protected MQTTClientProvider getMQTTClientProvider() {
-        return new FuseMQQTTClientProvider();
+        return new FuseMQTTClientProvider();
     }
 
     protected MQTT createMQTTConnection() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/c42b8749/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
new file mode 100644
index 0000000..bf24971
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.network.NetworkTestSupport;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.lang.ArrayUtils;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.mqtt.client.*;
+import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import javax.jms.Message;
+import javax.management.ObjectName;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by ceposta
+ * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
+ */
+public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTNetworkOfBrokersFailoverTest.class);
+
+    private final String subName = "Subscriber1";
+    private final String subName2 = "Subscriber2";
+    private final String topicName = "TEST.FOO";
+    private int localBrokerMQTTPort = -1;
+    private int remoteBrokerMQTTPort = -1;
+
+    protected void setUp() throws Exception {
+        useJmx=true;
+        super.setUp();
+
+        URI ncUri = new URI("static:(" + connector.getConnectUri().toString() + ")");
+        NetworkConnector nc = new DiscoveryNetworkConnector(ncUri);
+        nc.setDuplex(true);
+        remoteBroker.addNetworkConnector(nc);
+        nc.start();
+
+        // mqtt port should have been assigned by now
+        assertFalse(localBrokerMQTTPort == -1);
+        assertFalse(remoteBrokerMQTTPort == -1);
+    }
+
+    protected void tearDown() throws Exception {
+        if (remoteBroker.isStarted()) {
+            remoteBroker.stop();
+            remoteBroker.waitUntilStopped();
+        }
+        if (broker.isStarted()) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+        super.tearDown();
+    }
+
+    @Test
+    public void testNoStaleSubscriptionAcrossNetwork() throws Exception {
+
+        // before we get started, we want an async way to be able to know when
+        // the durable consumer has been networked so we can assert that it indeed
+        // would have a durable subscriber. for example, when we subscribe on remote broker,
+        // a network-sub would be created on local broker and we want to listen for when that
+        // even happens. we do that with advisory messages and a latch:
+        CountDownLatch consumerNetworked = listenForConsumersOn(broker);
+
+
+        // create a subscription with Clean == 0 (durable sub for QoS==1 && QoS==2)
+        // on the remote broker. this sub should still be there after we disconnect
+        MQTT remoteMqtt = createMQTTTcpConnection("foo", false, remoteBrokerMQTTPort);
+        BlockingConnection remoteConn = remoteMqtt.blockingConnection();
+        remoteConn.connect();
+        remoteConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
+        remoteConn.disconnect();
+
+        consumerNetworked.await(1, TimeUnit.SECONDS);
+        assertOneDurableSubOn(remoteBroker, "foo");
+        assertOneDurableSubOn(broker, "NC_localhost_inbound_local");
+
+        // now we reconnect the same sub on the local broker, again with clean==0
+        MQTT localMqtt = createMQTTTcpConnection("foo", false, localBrokerMQTTPort);
+        BlockingConnection localConn = localMqtt.blockingConnection();
+        localConn.connect();
+        localConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
+
+        // now let's connect back up to remote broker and send a message
+        remoteConn = remoteMqtt.blockingConnection();
+        remoteConn.connect();
+        remoteConn.publish("foo/bar", "Hello, World!".getBytes(), QoS.AT_LEAST_ONCE, false);
+
+        // now we should see that message on the local broker because the subscription
+        // should have been properly networked... we'll give a sec of grace for the
+        // networking and forwarding to have happened properly
+        org.fusesource.mqtt.client.Message msg = localConn.receive(1, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        msg.ack();
+        String response = new String(msg.getPayload());
+        assertEquals("Hello, World!", response);
+
+        // Now... we SHOULD NOT see a message on the remote broker because we already
+        // consumed it on the local broker... having the same message on the remote broker
+        // would effectively give us duplicates in a distributed topic scenario:
+        remoteConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
+        msg = remoteConn.receive(500, TimeUnit.MILLISECONDS);
+        assertNull("We have duplicate messages across the cluster for a distributed topic",
+                msg);
+
+    }
+
+    private CountDownLatch listenForConsumersOn(BrokerService broker) throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        URI brokerUri = broker.getVmConnectorURI();
+        System.out.println(brokerUri.toASCIIString());
+
+        final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri.toASCIIString());
+        final Connection connection = cf.createConnection();
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination dest = session.createTopic("ActiveMQ.Advisory.Consumer.Topic.foo.bar.>");
+        MessageConsumer consumer = session.createConsumer(dest);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                System.out.println("got message! " + message);
+                latch.countDown();
+                // shutdown this connection
+                Dispatch.getGlobalQueue().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            session.close();
+                            connection.close();
+                        } catch (JMSException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
+            }
+        });
+
+
+        return latch;
+    }
+
+
+    private void assertOneDurableSubOn(BrokerService broker, String subName) throws Exception {
+        BrokerViewMBean brokerView = broker.getAdminView();
+        ObjectName[] activeDurableSubs = brokerView.getDurableTopicSubscribers();
+        ObjectName[] inactiveDurableSubs = brokerView.getInactiveDurableTopicSubscribers();
+        ObjectName[] allDurables = (ObjectName[]) ArrayUtils.addAll(activeDurableSubs, inactiveDurableSubs);
+        assertEquals(1, allDurables.length);
+
+        // at this point our assertions should prove that we have only on durable sub
+        DurableSubscriptionViewMBean durableSubView = (DurableSubscriptionViewMBean)
+                broker.getManagementContext().newProxyInstance(allDurables[0], DurableSubscriptionViewMBean.class, true);
+
+        assertEquals(subName, durableSubView.getClientId());
+    }
+
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker =  super.createBroker();
+        broker.setPersistent(true);
+        broker.setBrokerName("local");
+        broker.setDataDirectory("target/activemq-data");
+        broker.setDeleteAllMessagesOnStartup(true);
+        TransportConnector tc = broker.addConnector("mqtt://localhost:0");
+        localBrokerMQTTPort = tc.getConnectUri().getPort();
+        return broker;
+    }
+
+    @Override
+    protected BrokerService createRemoteBroker(PersistenceAdapter persistenceAdapter) throws Exception {
+        BrokerService broker = super.createRemoteBroker(persistenceAdapter);
+        broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setDataDirectory("target/activemq-data");
+        TransportConnector tc = broker.addConnector("mqtt://localhost:0");
+        remoteBrokerMQTTPort = tc.getConnectUri().getPort();
+        return broker;
+    }
+
+    private MQTT createMQTTTcpConnection(String clientId, boolean clean, int port) throws Exception {
+        MQTT mqtt = new MQTT();
+        mqtt.setConnectAttemptsMax(1);
+        mqtt.setReconnectAttemptsMax(0);
+        mqtt.setTracer(createTracer());
+        if (clientId != null) {
+            mqtt.setClientId(clientId);
+        }
+        mqtt.setCleanSession(clean);
+        mqtt.setHost("localhost", port);
+        return mqtt;
+    }
+
+    protected Tracer createTracer() {
+        return new Tracer() {
+            @Override
+            public void onReceive(MQTTFrame frame) {
+                LOG.info("Client Received:\n" + frame);
+            }
+
+            @Override
+            public void onSend(MQTTFrame frame) {
+                LOG.info("Client Sent:\n" + frame);
+            }
+
+            @Override
+            public void debug(String message, Object... args) {
+                LOG.info(String.format(message, args));
+            }
+        };
+    }
+
+}


[2/2] git commit: Updated test for https://issues.apache.org/jira/browse/AMQ-5290 after tbish commit for enhancement.

Posted by ce...@apache.org.
Updated test for https://issues.apache.org/jira/browse/AMQ-5290 after tbish commit for enhancement.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0d9eedc6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0d9eedc6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0d9eedc6

Branch: refs/heads/trunk
Commit: 0d9eedc658a84b15cee9efcd0fe991a5345324da
Parents: c42b874
Author: Christian Posta <ch...@gmail.com>
Authored: Wed Aug 6 17:02:17 2014 -0700
Committer: Christian Posta <ch...@gmail.com>
Committed: Wed Aug 6 17:02:17 2014 -0700

----------------------------------------------------------------------
 .../MQTTNetworkOfBrokersFailoverTest.java       | 32 ++++++++++++--------
 1 file changed, 19 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0d9eedc6/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
index bf24971..e1ab183 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
@@ -24,7 +24,6 @@ import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.network.NetworkTestSupport;
 import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.util.Wait;
 import org.apache.commons.lang.ArrayUtils;
 import org.fusesource.hawtdispatch.Dispatch;
 import org.fusesource.mqtt.client.*;
@@ -48,10 +47,6 @@ import java.util.concurrent.TimeUnit;
 public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(MQTTNetworkOfBrokersFailoverTest.class);
-
-    private final String subName = "Subscriber1";
-    private final String subName2 = "Subscriber2";
-    private final String topicName = "TEST.FOO";
     private int localBrokerMQTTPort = -1;
     private int remoteBrokerMQTTPort = -1;
 
@@ -99,11 +94,12 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
         BlockingConnection remoteConn = remoteMqtt.blockingConnection();
         remoteConn.connect();
         remoteConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
+
+        assertTrue("No destination detected!", consumerNetworked.await(1, TimeUnit.SECONDS));
+        assertQueueExistsOn(remoteBroker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
+        assertQueueExistsOn(broker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
         remoteConn.disconnect();
 
-        consumerNetworked.await(1, TimeUnit.SECONDS);
-        assertOneDurableSubOn(remoteBroker, "foo");
-        assertOneDurableSubOn(broker, "NC_localhost_inbound_local");
 
         // now we reconnect the same sub on the local broker, again with clean==0
         MQTT localMqtt = createMQTTTcpConnection("foo", false, localBrokerMQTTPort);
@@ -139,17 +135,16 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
         final CountDownLatch latch = new CountDownLatch(1);
 
         URI brokerUri = broker.getVmConnectorURI();
-        System.out.println(brokerUri.toASCIIString());
 
         final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri.toASCIIString());
         final Connection connection = cf.createConnection();
+        connection.start();
         final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Destination dest = session.createTopic("ActiveMQ.Advisory.Consumer.Topic.foo.bar.>");
+        Destination dest = session.createTopic("ActiveMQ.Advisory.Consumer.Queue.Consumer.foo:AT_LEAST_ONCE.VirtualTopic.foo.bar");
         MessageConsumer consumer = session.createConsumer(dest);
         consumer.setMessageListener(new MessageListener() {
             @Override
             public void onMessage(Message message) {
-                System.out.println("got message! " + message);
                 latch.countDown();
                 // shutdown this connection
                 Dispatch.getGlobalQueue().execute(new Runnable() {
@@ -170,6 +165,13 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
         return latch;
     }
 
+    private void assertQueueExistsOn(BrokerService broker, String queueName) throws Exception {
+        BrokerViewMBean brokerView = broker.getAdminView();
+        ObjectName[] queueNames = brokerView.getQueues();
+        assertEquals(1, queueNames.length);
+
+        assertTrue(queueNames[0].toString().contains(queueName));
+    }
 
     private void assertOneDurableSubOn(BrokerService broker, String subName) throws Exception {
         BrokerViewMBean brokerView = broker.getAdminView();
@@ -193,7 +195,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
         broker.setBrokerName("local");
         broker.setDataDirectory("target/activemq-data");
         broker.setDeleteAllMessagesOnStartup(true);
-        TransportConnector tc = broker.addConnector("mqtt://localhost:0");
+        TransportConnector tc = broker.addConnector(getDefaultMQTTTransportConnectorUri());
         localBrokerMQTTPort = tc.getConnectUri().getPort();
         return broker;
     }
@@ -204,11 +206,15 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
         broker.setPersistent(true);
         broker.setDeleteAllMessagesOnStartup(true);
         broker.setDataDirectory("target/activemq-data");
-        TransportConnector tc = broker.addConnector("mqtt://localhost:0");
+        TransportConnector tc = broker.addConnector(getDefaultMQTTTransportConnectorUri());
         remoteBrokerMQTTPort = tc.getConnectUri().getPort();
         return broker;
     }
 
+    private String getDefaultMQTTTransportConnectorUri(){
+        return "mqtt://localhost:0?transport.subscriptionStrategyName=mqtt-virtual-topic-subscriptions";
+    }
+
     private MQTT createMQTTTcpConnection(String clientId, boolean clean, int port) throws Exception {
         MQTT mqtt = new MQTT();
         mqtt.setConnectAttemptsMax(1);