You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/10/20 21:05:19 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5365

Repository: activemq
Updated Branches:
  refs/heads/trunk 4881a848d -> 6885ff0a6


https://issues.apache.org/jira/browse/AMQ-5365

Trap VirtualTopic names that leak from messages sent across a broker
network.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6885ff0a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6885ff0a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6885ff0a

Branch: refs/heads/trunk
Commit: 6885ff0a62602207e22ff8737a193c702f29ee18
Parents: 4881a84
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Oct 20 15:05:08 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Oct 20 15:05:08 2014 -0400

----------------------------------------------------------------------
 .../MQTTVirtualTopicSubscriptionStrategy.java   |  10 +-
 .../MQTTNetworkOfBrokersFailoverTest.java       | 255 -------------------
 .../MQTTNetworkOfBrokersFailoverTest.java       | 254 ++++++++++++++++++
 3 files changed, 260 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6885ff0a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
index 0778764..835a5f8 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
@@ -159,11 +159,12 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
 
     @Override
     public String onSend(ActiveMQDestination destination) {
-        String amqTopicName = destination.getPhysicalName();
-        if (amqTopicName.startsWith(VIRTUALTOPIC_PREFIX)) {
-            amqTopicName = amqTopicName.substring(VIRTUALTOPIC_PREFIX.length());
+        String destinationName = destination.getPhysicalName();
+        int position = destinationName.indexOf(VIRTUALTOPIC_PREFIX);
+        if (position >= 0) {
+            destinationName = destinationName.substring(position+VIRTUALTOPIC_PREFIX.length()).substring(0);
         }
-        return amqTopicName;
+        return destinationName;
     }
 
     @Override
@@ -178,6 +179,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
     private void deleteDurableQueues(List<ActiveMQQueue> queues) {
         try {
             for (ActiveMQQueue queue : queues) {
+                LOG.debug("Removing subscription for {} ",queue.getPhysicalName());
                 DestinationInfo removeAction = new DestinationInfo();
                 removeAction.setConnectionId(protocol.getConnectionId());
                 removeAction.setDestination(queue);

http://git-wip-us.apache.org/repos/asf/activemq/blob/6885ff0a/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
deleted file mode 100644
index 203cacd..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.management.ObjectName;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.jmx.BrokerViewMBean;
-import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.network.NetworkTestSupport;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.commons.lang.ArrayUtils;
-import org.fusesource.hawtdispatch.Dispatch;
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.QoS;
-import org.fusesource.mqtt.client.Topic;
-import org.fusesource.mqtt.client.Tracer;
-import org.fusesource.mqtt.codec.MQTTFrame;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by ceposta
- * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
- */
-public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MQTTNetworkOfBrokersFailoverTest.class);
-    private int localBrokerMQTTPort = -1;
-    private int remoteBrokerMQTTPort = -1;
-
-    @Override
-    protected void setUp() throws Exception {
-        useJmx=true;
-        super.setUp();
-
-        URI ncUri = new URI("static:(" + connector.getConnectUri().toString() + ")");
-        NetworkConnector nc = new DiscoveryNetworkConnector(ncUri);
-        nc.setDuplex(true);
-        remoteBroker.addNetworkConnector(nc);
-        nc.start();
-
-        // mqtt port should have been assigned by now
-        assertFalse(localBrokerMQTTPort == -1);
-        assertFalse(remoteBrokerMQTTPort == -1);
-    }
-
-    @Override
-    protected void tearDown() throws Exception {
-        if (remoteBroker.isStarted()) {
-            remoteBroker.stop();
-            remoteBroker.waitUntilStopped();
-        }
-        if (broker.isStarted()) {
-            broker.stop();
-            broker.waitUntilStopped();
-        }
-        super.tearDown();
-    }
-
-    @Test
-    public void testNoStaleSubscriptionAcrossNetwork() throws Exception {
-
-        // before we get started, we want an async way to be able to know when
-        // the durable consumer has been networked so we can assert that it indeed
-        // would have a durable subscriber. for example, when we subscribe on remote broker,
-        // a network-sub would be created on local broker and we want to listen for when that
-        // even happens. we do that with advisory messages and a latch:
-        CountDownLatch consumerNetworked = listenForConsumersOn(broker);
-
-        // create a subscription with Clean == 0 (durable sub for QoS==1 && QoS==2)
-        // on the remote broker. this sub should still be there after we disconnect
-        MQTT remoteMqtt = createMQTTTcpConnection("foo", false, remoteBrokerMQTTPort);
-        BlockingConnection remoteConn = remoteMqtt.blockingConnection();
-        remoteConn.connect();
-        remoteConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
-
-        assertTrue("No destination detected!", consumerNetworked.await(1, TimeUnit.SECONDS));
-        assertQueueExistsOn(remoteBroker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
-        assertQueueExistsOn(broker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
-        remoteConn.disconnect();
-
-        // now we reconnect the same sub on the local broker, again with clean==0
-        MQTT localMqtt = createMQTTTcpConnection("foo", false, localBrokerMQTTPort);
-        BlockingConnection localConn = localMqtt.blockingConnection();
-        localConn.connect();
-        localConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
-
-        // now let's connect back up to remote broker and send a message
-        remoteConn = remoteMqtt.blockingConnection();
-        remoteConn.connect();
-        remoteConn.publish("foo/bar", "Hello, World!".getBytes(), QoS.AT_LEAST_ONCE, false);
-
-        // now we should see that message on the local broker because the subscription
-        // should have been properly networked... we'll give a sec of grace for the
-        // networking and forwarding to have happened properly
-        org.fusesource.mqtt.client.Message msg = localConn.receive(1, TimeUnit.SECONDS);
-        assertNotNull(msg);
-        msg.ack();
-        String response = new String(msg.getPayload());
-        assertEquals("Hello, World!", response);
-
-        // Now... we SHOULD NOT see a message on the remote broker because we already
-        // consumed it on the local broker... having the same message on the remote broker
-        // would effectively give us duplicates in a distributed topic scenario:
-        remoteConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
-        msg = remoteConn.receive(500, TimeUnit.MILLISECONDS);
-        assertNull("We have duplicate messages across the cluster for a distributed topic", msg);
-    }
-
-    private CountDownLatch listenForConsumersOn(BrokerService broker) throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        URI brokerUri = broker.getVmConnectorURI();
-
-        final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri.toASCIIString());
-        final Connection connection = cf.createConnection();
-        connection.start();
-        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Destination dest = session.createTopic("ActiveMQ.Advisory.Consumer.Queue.Consumer.foo:AT_LEAST_ONCE.VirtualTopic.foo.bar");
-        MessageConsumer consumer = session.createConsumer(dest);
-        consumer.setMessageListener(new MessageListener() {
-            @Override
-            public void onMessage(Message message) {
-                latch.countDown();
-                // shutdown this connection
-                Dispatch.getGlobalQueue().execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            session.close();
-                            connection.close();
-                        } catch (JMSException e) {
-                            e.printStackTrace();
-                        }
-                    }
-                });
-            }
-        });
-
-        return latch;
-    }
-
-    private void assertQueueExistsOn(BrokerService broker, String queueName) throws Exception {
-        BrokerViewMBean brokerView = broker.getAdminView();
-        ObjectName[] queueNames = brokerView.getQueues();
-        assertEquals(1, queueNames.length);
-
-        assertTrue(queueNames[0].toString().contains(queueName));
-    }
-
-    @SuppressWarnings("unused")
-    private void assertOneDurableSubOn(BrokerService broker, String subName) throws Exception {
-        BrokerViewMBean brokerView = broker.getAdminView();
-        ObjectName[] activeDurableSubs = brokerView.getDurableTopicSubscribers();
-        ObjectName[] inactiveDurableSubs = brokerView.getInactiveDurableTopicSubscribers();
-        ObjectName[] allDurables = (ObjectName[]) ArrayUtils.addAll(activeDurableSubs, inactiveDurableSubs);
-        assertEquals(1, allDurables.length);
-
-        // at this point our assertions should prove that we have only on durable sub
-        DurableSubscriptionViewMBean durableSubView = (DurableSubscriptionViewMBean)
-                broker.getManagementContext().newProxyInstance(allDurables[0], DurableSubscriptionViewMBean.class, true);
-
-        assertEquals(subName, durableSubView.getClientId());
-    }
-
-    @Override
-    protected BrokerService createBroker() throws Exception {
-        BrokerService broker =  super.createBroker();
-        broker.setPersistent(true);
-        broker.setBrokerName("local");
-        broker.setDataDirectory("target/activemq-data");
-        broker.setDeleteAllMessagesOnStartup(true);
-        TransportConnector tc = broker.addConnector(getDefaultMQTTTransportConnectorUri());
-        localBrokerMQTTPort = tc.getConnectUri().getPort();
-        return broker;
-    }
-
-    @Override
-    protected BrokerService createRemoteBroker(PersistenceAdapter persistenceAdapter) throws Exception {
-        BrokerService broker = super.createRemoteBroker(persistenceAdapter);
-        broker.setPersistent(true);
-        broker.setDeleteAllMessagesOnStartup(true);
-        broker.setDataDirectory("target/activemq-data");
-        TransportConnector tc = broker.addConnector(getDefaultMQTTTransportConnectorUri());
-        remoteBrokerMQTTPort = tc.getConnectUri().getPort();
-        return broker;
-    }
-
-    private String getDefaultMQTTTransportConnectorUri(){
-        return "mqtt://localhost:0?transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
-    }
-
-    private MQTT createMQTTTcpConnection(String clientId, boolean clean, int port) throws Exception {
-        MQTT mqtt = new MQTT();
-        mqtt.setConnectAttemptsMax(1);
-        mqtt.setReconnectAttemptsMax(0);
-        mqtt.setTracer(createTracer());
-        if (clientId != null) {
-            mqtt.setClientId(clientId);
-        }
-        mqtt.setCleanSession(clean);
-        mqtt.setHost("localhost", port);
-        return mqtt;
-    }
-
-    protected Tracer createTracer() {
-        return new Tracer() {
-            @Override
-            public void onReceive(MQTTFrame frame) {
-                LOG.info("Client Received:\n" + frame);
-            }
-
-            @Override
-            public void onSend(MQTTFrame frame) {
-                LOG.info("Client Sent:\n" + frame);
-            }
-
-            @Override
-            public void debug(String message, Object... args) {
-                LOG.info(String.format(message, args));
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/6885ff0a/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java
new file mode 100644
index 0000000..928a7a6
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.commons.lang.ArrayUtils;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.client.Tracer;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by ceposta
+ * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
+ */
+public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTNetworkOfBrokersFailoverTest.class);
+    private int localBrokerMQTTPort = -1;
+    private int remoteBrokerMQTTPort = -1;
+
+    @Override
+    protected void setUp() throws Exception {
+        useJmx=true;
+        super.setUp();
+
+        URI ncUri = new URI("static:(" + connector.getConnectUri().toString() + ")");
+        NetworkConnector nc = new DiscoveryNetworkConnector(ncUri);
+        nc.setDuplex(true);
+        remoteBroker.addNetworkConnector(nc);
+        nc.start();
+
+        // mqtt port should have been assigned by now
+        assertFalse(localBrokerMQTTPort == -1);
+        assertFalse(remoteBrokerMQTTPort == -1);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (remoteBroker.isStarted()) {
+            remoteBroker.stop();
+            remoteBroker.waitUntilStopped();
+        }
+        if (broker.isStarted()) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+        super.tearDown();
+    }
+
+    @Test
+    public void testNoStaleSubscriptionAcrossNetwork() throws Exception {
+
+        // before we get started, we want an async way to be able to know when
+        // the durable consumer has been networked so we can assert that it indeed
+        // would have a durable subscriber. for example, when we subscribe on remote broker,
+        // a network-sub would be created on local broker and we want to listen for when that
+        // even happens. we do that with advisory messages and a latch:
+        CountDownLatch consumerNetworked = listenForConsumersOn(broker);
+
+        // create a subscription with Clean == 0 (durable sub for QoS==1 && QoS==2)
+        // on the remote broker. this sub should still be there after we disconnect
+        MQTT remoteMqtt = createMQTTTcpConnection("foo", false, remoteBrokerMQTTPort);
+        BlockingConnection remoteConn = remoteMqtt.blockingConnection();
+        remoteConn.connect();
+        remoteConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
+
+        assertTrue("No destination detected!", consumerNetworked.await(1, TimeUnit.SECONDS));
+        assertQueueExistsOn(remoteBroker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
+        assertQueueExistsOn(broker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
+        remoteConn.disconnect();
+
+        // now we reconnect the same sub on the local broker, again with clean==0
+        MQTT localMqtt = createMQTTTcpConnection("foo", false, localBrokerMQTTPort);
+        BlockingConnection localConn = localMqtt.blockingConnection();
+        localConn.connect();
+        localConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
+
+        // now let's connect back up to remote broker and send a message
+        remoteConn = remoteMqtt.blockingConnection();
+        remoteConn.connect();
+        remoteConn.publish("foo/bar", "Hello, World!".getBytes(), QoS.AT_LEAST_ONCE, false);
+
+        // now we should see that message on the local broker because the subscription
+        // should have been properly networked... we'll give a sec of grace for the
+        // networking and forwarding to have happened properly
+        org.fusesource.mqtt.client.Message msg = localConn.receive(100, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        msg.ack();
+        String response = new String(msg.getPayload());
+        assertEquals("Hello, World!", response);
+        assertEquals("foo/bar", msg.getTopic());
+
+        // Now... we SHOULD NOT see a message on the remote broker because we already
+        // consumed it on the local broker... having the same message on the remote broker
+        // would effectively give us duplicates in a distributed topic scenario:
+        remoteConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
+        msg = remoteConn.receive(500, TimeUnit.MILLISECONDS);
+        assertNull("We have duplicate messages across the cluster for a distributed topic", msg);
+    }
+
+    private CountDownLatch listenForConsumersOn(BrokerService broker) throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        URI brokerUri = broker.getVmConnectorURI();
+
+        final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri.toASCIIString());
+        final Connection connection = cf.createConnection();
+        connection.start();
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination dest = session.createTopic("ActiveMQ.Advisory.Consumer.Queue.Consumer.foo:AT_LEAST_ONCE.VirtualTopic.foo.bar");
+        MessageConsumer consumer = session.createConsumer(dest);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+                // shutdown this connection
+                Dispatch.getGlobalQueue().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            session.close();
+                            connection.close();
+                        } catch (JMSException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
+            }
+        });
+
+        return latch;
+    }
+
+    private void assertQueueExistsOn(BrokerService broker, String queueName) throws Exception {
+        BrokerViewMBean brokerView = broker.getAdminView();
+        ObjectName[] queueNames = brokerView.getQueues();
+        assertEquals(1, queueNames.length);
+
+        assertTrue(queueNames[0].toString().contains(queueName));
+    }
+
+    @SuppressWarnings("unused")
+    private void assertOneDurableSubOn(BrokerService broker, String subName) throws Exception {
+        BrokerViewMBean brokerView = broker.getAdminView();
+        ObjectName[] activeDurableSubs = brokerView.getDurableTopicSubscribers();
+        ObjectName[] inactiveDurableSubs = brokerView.getInactiveDurableTopicSubscribers();
+        ObjectName[] allDurables = (ObjectName[]) ArrayUtils.addAll(activeDurableSubs, inactiveDurableSubs);
+        assertEquals(1, allDurables.length);
+
+        // at this point our assertions should prove that we have only on durable sub
+        DurableSubscriptionViewMBean durableSubView = (DurableSubscriptionViewMBean)
+                broker.getManagementContext().newProxyInstance(allDurables[0], DurableSubscriptionViewMBean.class, true);
+
+        assertEquals(subName, durableSubView.getClientId());
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker =  super.createBroker();
+        broker.setPersistent(true);
+        broker.setBrokerName("local");
+        broker.setDataDirectory("target/activemq-data");
+        broker.setDeleteAllMessagesOnStartup(true);
+        TransportConnector tc = broker.addConnector(getDefaultMQTTTransportConnectorUri());
+        localBrokerMQTTPort = tc.getConnectUri().getPort();
+        return broker;
+    }
+
+    @Override
+    protected BrokerService createRemoteBroker(PersistenceAdapter persistenceAdapter) throws Exception {
+        BrokerService broker = super.createRemoteBroker(persistenceAdapter);
+        broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setDataDirectory("target/activemq-data");
+        TransportConnector tc = broker.addConnector(getDefaultMQTTTransportConnectorUri());
+        remoteBrokerMQTTPort = tc.getConnectUri().getPort();
+        return broker;
+    }
+
+    private String getDefaultMQTTTransportConnectorUri(){
+        return "mqtt://localhost:0?transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
+    }
+
+    private MQTT createMQTTTcpConnection(String clientId, boolean clean, int port) throws Exception {
+        MQTT mqtt = new MQTT();
+        mqtt.setConnectAttemptsMax(1);
+        mqtt.setReconnectAttemptsMax(0);
+        mqtt.setTracer(createTracer());
+        if (clientId != null) {
+            mqtt.setClientId(clientId);
+        }
+        mqtt.setCleanSession(clean);
+        mqtt.setHost("localhost", port);
+        return mqtt;
+    }
+
+    protected Tracer createTracer() {
+        return new Tracer() {
+            @Override
+            public void onReceive(MQTTFrame frame) {
+                LOG.info("Client Received:\n" + frame);
+            }
+
+            @Override
+            public void onSend(MQTTFrame frame) {
+                LOG.info("Client Sent:\n" + frame);
+            }
+
+            @Override
+            public void debug(String message, Object... args) {
+                LOG.info(String.format(message, args));
+            }
+        };
+    }
+}