You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/08/06 23:41:47 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5290

Repository: activemq
Updated Branches:
  refs/heads/trunk fff3c8397 -> 413e4840d


https://issues.apache.org/jira/browse/AMQ-5290

Adds a subscription strategy model where the default is the normal
durable topic subscription based approach or a strategy that maps all
subscriptions and publish operations to a Virtual Topic model.  A
network of brokers can network the Queues instead of having the durable
topics subscriptions repaeted on each Broker.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/413e4840
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/413e4840
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/413e4840

Branch: refs/heads/trunk
Commit: 413e4840d64fa8377103ce431a2a2c6cef6e0c20
Parents: fff3c83
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Aug 6 17:41:19 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Aug 6 17:41:19 2014 -0400

----------------------------------------------------------------------
 .../transport/mqtt/MQTTInactivityMonitor.java   |  18 +-
 .../transport/mqtt/MQTTNIOTransportFactory.java |  10 +-
 .../transport/mqtt/MQTTProtocolConverter.java   | 377 ++++++++-----------
 .../transport/mqtt/MQTTSubscription.java        |   7 +-
 .../transport/mqtt/MQTTTransportFactory.java    |  14 +-
 .../transport/mqtt/MQTTTransportFilter.java     |  24 +-
 .../AbstractMQTTSubscriptionStrategy.java       | 129 +++++++
 .../MQTTDefaultSubscriptionStrategy.java        | 151 ++++++++
 .../mqtt/strategy/MQTTSubscriptionStrategy.java | 139 +++++++
 .../MQTTVirtualTopicSubscriptionStrategy.java   | 221 +++++++++++
 .../strategies/mqtt-default-subscriptions       |  17 +
 .../strategies/mqtt-virtual-topic-subscriptions |  17 +
 .../activemq/transport/mqtt/MQTTTest.java       |  95 ++++-
 .../mqtt/MQTTVirtualTopicSubscriptionsTest.java |  59 +++
 14 files changed, 1038 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
index 685bb60..22bbac5 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
@@ -70,9 +70,12 @@ public class MQTTInactivityMonitor extends TransportFilter {
             int currentCounter = next.getReceiveCounter();
             int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
 
-            // for the PINGREQ/RESP frames, the currentCounter will be different from previousCounter, and that
-            // should be sufficient to indicate the connection is still alive. If there were random data, or something
-            // outside the scope of the spec, the wire format unrmarshalling would fail, so we don't need to handle
+            // for the PINGREQ/RESP frames, the currentCounter will be different
+            // from previousCounter, and that
+            // should be sufficient to indicate the connection is still alive.
+            // If there were random data, or something
+            // outside the scope of the spec, the wire format unrmarshalling
+            // would fail, so we don't need to handle
             // PINGREQ/RESP explicitly here
             if (inReceive.get() || currentCounter != previousCounter) {
                 if (LOG.isTraceEnabled()) {
@@ -82,24 +85,21 @@ public class MQTTInactivityMonitor extends TransportFilter {
                 return;
             }
 
-            if( (now-lastReceiveTime) >= readKeepAliveTime+readGraceTime && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
+            if ((now - lastReceiveTime) >= readKeepAliveTime + readGraceTime && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("No message received since last read check for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException.");
                 }
                 ASYNC_TASKS.execute(new Runnable() {
                     @Override
                     public void run() {
-                        onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime+readGraceTime) + ") long: " + next.getRemoteAddress()));
+                        onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime + readGraceTime) + ") long: "
+                            + next.getRemoteAddress()));
                     }
                 });
             }
         }
     };
 
-    private boolean allowReadCheck(long elapsed) {
-        return elapsed > (readGraceTime * 9 / 10);
-    }
-
     public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
         super(next);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
index 96f7747..efc85be 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
@@ -26,7 +26,7 @@ import java.util.Map;
 
 import javax.net.ServerSocketFactory;
 import javax.net.SocketFactory;
-import org.apache.activemq.broker.BrokerContext;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.transport.MutexTransport;
@@ -44,12 +44,15 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
 
     private BrokerService brokerService = null;
 
+    @Override
     protected String getDefaultWireFormatType() {
         return "mqtt";
     }
 
+    @Override
     protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
         TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) {
+            @Override
             protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
                 return new MQTTNIOTransport(format, socket);
             }
@@ -58,6 +61,7 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
         return result;
     }
 
+    @Override
     protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
         return new MQTTNIOTransport(wf, socketFactory, location, localLocation);
     }
@@ -75,6 +79,7 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
         return transport;
     }
 
+    @Override
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
         transport = new MQTTTransportFilter(transport, format, brokerService);
@@ -82,16 +87,17 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
         return super.compositeConfigure(transport, format, options);
     }
 
+    @Override
     public void setBrokerService(BrokerService brokerService) {
         this.brokerService = brokerService;
     }
 
+    @Override
     protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
         MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format);
         MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class);
         filter.setInactivityMonitor(monitor);
         return monitor;
     }
-
 }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 91087d0..3969d87 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -17,11 +17,7 @@
 package org.apache.activemq.transport.mqtt;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.DataFormatException;
@@ -34,19 +30,13 @@ import javax.jms.Message;
 import javax.security.auth.login.CredentialException;
 
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.PrefetchSubscription;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.TopicRegion;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
-import org.apache.activemq.broker.region.virtual.VirtualTopicInterceptor;
 import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMapMessage;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionError;
 import org.apache.activemq.command.ConnectionId;
@@ -60,17 +50,17 @@ import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.store.PersistenceAdapterSupport;
+import org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.FactoryFinder;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.activemq.util.LRUCache;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.fusesource.hawtbuf.Buffer;
@@ -114,9 +104,8 @@ public class MQTTProtocolConverter {
     private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
     private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
     private final ConcurrentHashMap<String, MQTTSubscription> mqttSubscriptionByTopic = new ConcurrentHashMap<String, MQTTSubscription>();
-    private final Map<String, ActiveMQTopic> activeMQTopicMap = new LRUCache<String, ActiveMQTopic>(DEFAULT_CACHE_SIZE);
+    private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE);
     private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE);
-    private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>());
 
     private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
     private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
@@ -136,6 +125,15 @@ public class MQTTProtocolConverter {
     private final MQTTPacketIdGenerator packetIdGenerator;
     private boolean publishDollarTopics;
 
+    private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/");
+    /*
+     * Subscription strategy configuration element.
+     *   > mqtt-default-subscriptions
+     *   > mqtt-virtual-topic-subscriptions
+     */
+    private String subscriptionStrategyName = "mqtt-default-subscriptions";
+    private MQTTSubscriptionStrategy subsciptionStrategy;
+
     public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
         this.mqttTransport = mqttTransport;
         this.brokerService = brokerService;
@@ -149,22 +147,26 @@ public class MQTTProtocolConverter {
         }
     }
 
-    void sendToActiveMQ(Command command, ResponseHandler handler) {
+    public void sendToActiveMQ(Command command, ResponseHandler handler) {
 
         // Lets intercept message send requests..
         if (command instanceof ActiveMQMessage) {
             ActiveMQMessage msg = (ActiveMQMessage) command;
-            if (!getPublishDollarTopics() && msg.getDestination().getPhysicalName().startsWith("$")) {
-                // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1
-                // specification requirements
-                if (handler != null) {
-                    try {
-                        handler.onResponse(this, new Response());
-                    } catch (IOException e) {
-                        e.printStackTrace();
+            try {
+                if (!getPublishDollarTopics() && getSubscriptionStrategy().isControlTopic(msg.getDestination())) {
+                    // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1
+                    // specification requirements for system assigned destinations.
+                    if (handler != null) {
+                        try {
+                            handler.onResponse(this, new Response());
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
                     }
+                    return;
                 }
-                return;
+            } catch (IOException e) {
+                e.printStackTrace();
             }
         }
 
@@ -189,54 +191,43 @@ public class MQTTProtocolConverter {
      */
     public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
         switch (frame.messageType()) {
-            case PINGREQ.TYPE: {
+            case PINGREQ.TYPE:
                 LOG.debug("Received a ping from client: " + getClientId());
                 sendToMQTT(PING_RESP_FRAME);
                 LOG.debug("Sent Ping Response to " + getClientId());
                 break;
-            }
-            case CONNECT.TYPE: {
+            case CONNECT.TYPE:
                 CONNECT connect = new CONNECT().decode(frame);
                 onMQTTConnect(connect);
                 LOG.debug("MQTT Client {} connected. (version: {})", getClientId(), connect.version());
                 break;
-            }
-            case DISCONNECT.TYPE: {
+            case DISCONNECT.TYPE:
                 LOG.debug("MQTT Client {} disconnecting", getClientId());
                 onMQTTDisconnect();
                 break;
-            }
-            case SUBSCRIBE.TYPE: {
+            case SUBSCRIBE.TYPE:
                 onSubscribe(new SUBSCRIBE().decode(frame));
                 break;
-            }
-            case UNSUBSCRIBE.TYPE: {
+            case UNSUBSCRIBE.TYPE:
                 onUnSubscribe(new UNSUBSCRIBE().decode(frame));
                 break;
-            }
-            case PUBLISH.TYPE: {
+            case PUBLISH.TYPE:
                 onMQTTPublish(new PUBLISH().decode(frame));
                 break;
-            }
-            case PUBACK.TYPE: {
+            case PUBACK.TYPE:
                 onMQTTPubAck(new PUBACK().decode(frame));
                 break;
-            }
-            case PUBREC.TYPE: {
+            case PUBREC.TYPE:
                 onMQTTPubRec(new PUBREC().decode(frame));
                 break;
-            }
-            case PUBREL.TYPE: {
+            case PUBREL.TYPE:
                 onMQTTPubRel(new PUBREL().decode(frame));
                 break;
-            }
-            case PUBCOMP.TYPE: {
+            case PUBCOMP.TYPE:
                 onMQTTPubComp(new PUBCOMP().decode(frame));
                 break;
-            }
-            default: {
+            default:
                 handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
-            }
         }
     }
 
@@ -332,52 +323,17 @@ public class MQTTProtocolConverter {
                         connected.set(true);
                         getMQTTTransport().sendToMQTT(ack.encode());
 
-                        List<SubscriptionInfo> subs = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), connectionInfo.getClientId());
                         if (connect.cleanSession()) {
                             packetIdGenerator.stopClientSession(getClientId());
-                            deleteDurableSubs(subs);
                         } else {
                             packetIdGenerator.startClientSession(getClientId());
-                            restoreDurableSubs(subs);
                         }
-                    }
-                });
-            }
-        });
-    }
 
-    public void deleteDurableSubs(List<SubscriptionInfo> subs) {
-        try {
-            for (SubscriptionInfo sub : subs) {
-                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
-                rsi.setConnectionId(connectionId);
-                rsi.setSubscriptionName(sub.getSubcriptionName());
-                rsi.setClientId(sub.getClientId());
-                sendToActiveMQ(rsi, new ResponseHandler() {
-                    @Override
-                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
-                        // ignore failures..
+                        getSubscriptionStrategy().onConnect(connect);
                     }
                 });
             }
-        } catch (Throwable e) {
-            LOG.warn("Could not delete the MQTT durable subs.", e);
-        }
-    }
-
-    public void restoreDurableSubs(List<SubscriptionInfo> subs) {
-        try {
-            for (SubscriptionInfo sub : subs) {
-                String name = sub.getSubcriptionName();
-                String[] split = name.split(":", 2);
-                QoS qoS = QoS.valueOf(split[0]);
-                onSubscribe(new Topic(split[1], qoS));
-                // mark this durable subscription as restored by Broker
-                restoredSubs.add(split[1]);
-            }
-        } catch (IOException e) {
-            LOG.warn("Could not restore the MQTT durable subs.", e);
-        }
+        });
     }
 
     void onMQTTDisconnect() throws MQTTProtocolException {
@@ -408,42 +364,41 @@ public class MQTTProtocolConverter {
         } else {
             LOG.warn("No topics defined for Subscription " + command);
         }
-
     }
 
-    byte onSubscribe(final Topic topic) throws MQTTProtocolException {
+    public byte onSubscribe(final Topic topic) throws MQTTProtocolException {
 
-        final String topicName = topic.name().toString();
-        final QoS topicQoS = topic.qos();
+        final String destinationName = topic.name().toString();
+        final QoS requestedQoS = topic.qos();
 
-        if (mqttSubscriptionByTopic.containsKey(topicName)) {
-            final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName);
-            if (topicQoS != mqttSubscription.qos()) {
+        if (mqttSubscriptionByTopic.containsKey(destinationName)) {
+            final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(destinationName);
+            if (requestedQoS != mqttSubscription.getQoS()) {
                 // remove old subscription as the QoS has changed
-                onUnSubscribe(topicName);
+                onUnSubscribe(destinationName);
             } else {
-                // duplicate SUBSCRIBE packet, find all matching topics and re-send retained messages
-                resendRetainedMessages(mqttSubscription);
-                return (byte) topicQoS.ordinal();
+                try {
+                    getSubscriptionStrategy().onReSubscribe(mqttSubscription);
+                } catch (IOException e) {
+                    throw new MQTTProtocolException("Failed to find subscription strategy", true, e);
+                }
+                return (byte) requestedQoS.ordinal();
             }
         }
 
-        ActiveMQDestination destination = new ActiveMQTopic(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName));
-
-        ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
-        ConsumerInfo consumerInfo = new ConsumerInfo(id);
-        consumerInfo.setDestination(destination);
-        consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
-        consumerInfo.setRetroactive(true);
-        consumerInfo.setDispatchAsync(true);
-        // create durable subscriptions only when cleansession is false
-        if (!connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
-            consumerInfo.setSubscriptionName(topicQoS + ":" + topicName);
+        try {
+            return getSubscriptionStrategy().onSubscribe(destinationName, requestedQoS);
+        } catch (IOException e) {
+            throw new MQTTProtocolException("Failed while intercepting subscribe", true, e);
         }
-        MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicName, topicQoS, consumerInfo);
+    }
+
+    public byte doSubscribe(ConsumerInfo consumerInfo, final String topicName, final QoS qoS) throws MQTTProtocolException {
+
+        MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicName, qoS, consumerInfo);
 
         // optimistic add to local maps first to be able to handle commands in onActiveMQCommand
-        subscriptionsByConsumerId.put(id, mqttSubscription);
+        subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), mqttSubscription);
         mqttSubscriptionByTopic.put(topicName, mqttSubscription);
 
         final byte[] qos = {-1};
@@ -453,79 +408,24 @@ public class MQTTProtocolConverter {
                 // validate subscription request
                 if (response.isException()) {
                     final Throwable throwable = ((ExceptionResponse) response).getException();
-                    LOG.warn("Error subscribing to " + topicName, throwable);
+                    LOG.warn("Error subscribing to {}", topicName, throwable);
                     qos[0] = SUBSCRIBE_ERROR;
                 } else {
-                    qos[0] = (byte) topicQoS.ordinal();
+                    qos[0] = (byte) qoS.ordinal();
                 }
             }
         });
 
         if (qos[0] == SUBSCRIBE_ERROR) {
             // remove from local maps if subscribe failed
-            subscriptionsByConsumerId.remove(id);
+            subscriptionsByConsumerId.remove(consumerInfo.getConsumerId());
             mqttSubscriptionByTopic.remove(topicName);
         }
 
         return qos[0];
     }
 
-    private void resendRetainedMessages(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
-
-        ActiveMQDestination destination = mqttSubscription.getDestination();
-
-        // check whether the Topic has been recovered in restoreDurableSubs
-        // mark subscription available for recovery for duplicate subscription
-        if (restoredSubs.remove(destination.getPhysicalName())) {
-            return;
-        }
-
-        String topicName = mqttSubscription.getTopicName();
-        // get TopicRegion
-        RegionBroker regionBroker;
-        try {
-            regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
-        } catch (Exception e) {
-            throw new MQTTProtocolException("Error subscribing to " + topicName + ": " + e.getMessage(), false, e);
-        }
-        final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
-
-        final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
-        final ConsumerId consumerId = consumerInfo.getConsumerId();
-
-        // use actual client id used to create connection to lookup connection context
-        final String connectionInfoClientId = connectionInfo.getClientId();
-        final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfoClientId);
-
-        // get all matching Topics
-        final Set<org.apache.activemq.broker.region.Destination> matchingDestinations = topicRegion.getDestinations(destination);
-        for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) {
-
-            // recover retroactive messages for matching subscription
-            for (Subscription subscription : dest.getConsumers()) {
-                if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
-                    try {
-                        if (dest instanceof org.apache.activemq.broker.region.Topic) {
-                            ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription);
-                        } else if (dest instanceof VirtualTopicInterceptor) {
-                            ((VirtualTopicInterceptor)dest).getTopic().recoverRetroactiveMessages(connectionContext, subscription);
-                        }
-                        if (subscription instanceof PrefetchSubscription) {
-                            // request dispatch for prefetch subs
-                            PrefetchSubscription prefetchSubscription = (PrefetchSubscription) subscription;
-                            prefetchSubscription.dispatchPending();
-                        }
-                    } catch (Exception e) {
-                        throw new MQTTProtocolException("Error recovering retained messages for " +
-                            dest.getName() + ": " + e.getMessage(), false, e);
-                    }
-                    break;
-                }
-            }
-        }
-    }
-
-    void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException {
+    public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException {
         checkConnected();
         UTF8Buffer[] topics = command.topics();
         if (topics != null) {
@@ -538,33 +438,38 @@ public class MQTTProtocolConverter {
         sendToMQTT(ack.encode());
     }
 
-    void onUnSubscribe(String topicName) {
-        MQTTSubscription subs = mqttSubscriptionByTopic.remove(topicName);
-        if (subs != null) {
-            ConsumerInfo info = subs.getConsumerInfo();
-            if (info != null) {
-                subscriptionsByConsumerId.remove(info.getConsumerId());
-            }
-            RemoveInfo removeInfo = null;
-            if (info != null) {
-                removeInfo = info.createRemoveCommand();
-            }
-            sendToActiveMQ(removeInfo, null);
-
-            // check if the durable sub also needs to be removed
-            if (subs.getConsumerInfo().getSubscriptionName() != null) {
-                // also remove it from restored durable subscriptions set
-                restoredSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName));
-
-                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
-                rsi.setConnectionId(connectionId);
-                rsi.setSubscriptionName(subs.getConsumerInfo().getSubscriptionName());
-                rsi.setClientId(connectionInfo.getClientId());
-                sendToActiveMQ(rsi, null);
+    public void onUnSubscribe(String topicName) {
+        MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName);
+        if (subscription != null) {
+            doUnSubscribe(subscription);
+
+            // check if the broker side of the subscription needs to be removed
+            try {
+                getSubscriptionStrategy().onUnSubscribe(subscription);
+            } catch (IOException e) {
+                // Ignore
             }
         }
     }
 
+    public void doUnSubscribe(MQTTSubscription subscription) {
+        mqttSubscriptionByTopic.remove(subscription.getTopicName());
+        ConsumerInfo info = subscription.getConsumerInfo();
+        if (info != null) {
+            subscriptionsByConsumerId.remove(info.getConsumerId());
+        }
+        RemoveInfo removeInfo = null;
+        if (info != null) {
+            removeInfo = info.createRemoveCommand();
+        }
+        sendToActiveMQ(removeInfo, new ResponseHandler() {
+            @Override
+            public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
+                // ignore failures..
+            }
+        });
+    }
+
     /**
      * Dispatch an ActiveMQ command
      */
@@ -610,7 +515,7 @@ public class MQTTProtocolConverter {
         } else if (command.isBrokerInfo()) {
             //ignore
         } else {
-            LOG.debug("Do not know how to process ActiveMQ Command " + command);
+            LOG.debug("Do not know how to process ActiveMQ Command {}", command);
         }
     }
 
@@ -647,7 +552,7 @@ public class MQTTProtocolConverter {
             ack = publisherRecs.remove(command.messageId());
         }
         if (ack == null) {
-            LOG.warn("Unknown PUBREL: " + command.messageId() + " received");
+            LOG.warn("Unknown PUBREL: {} received", command.messageId());
         }
         PUBCOMP pubcomp = new PUBCOMP();
         pubcomp.messageId(command.messageId());
@@ -680,16 +585,23 @@ public class MQTTProtocolConverter {
             msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
         }
 
-        ActiveMQTopic topic;
-        synchronized (activeMQTopicMap) {
-            topic = activeMQTopicMap.get(command.topicName());
-            if (topic == null) {
+        ActiveMQDestination destination;
+        synchronized (activeMQDestinationMap) {
+            destination = activeMQDestinationMap.get(command.topicName());
+            if (destination == null) {
                 String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
-                topic = new ActiveMQTopic(topicName);
-                activeMQTopicMap.put(command.topicName().toString(), topic);
+
+                try {
+                    destination = getSubscriptionStrategy().onSend(topicName);
+                } catch (IOException e) {
+                    throw JMSExceptionSupport.create(e);
+                }
+
+                activeMQDestinationMap.put(command.topicName().toString(), destination);
             }
         }
-        msg.setJMSDestination(topic);
+
+        msg.setJMSDestination(destination);
         msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length);
         return msg;
     }
@@ -714,7 +626,8 @@ public class MQTTProtocolConverter {
         synchronized (mqttTopicMap) {
             topicName = mqttTopicMap.get(message.getJMSDestination());
             if (topicName == null) {
-                topicName = MQTTProtocolSupport.convertActiveMQToMQTT(message.getDestination().getPhysicalName());
+                String amqTopicName = getSubscriptionStrategy().onSend(message.getDestination());
+                topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName);
                 mqttTopicMap.put(message.getJMSDestination(), topicName);
             }
         }
@@ -803,9 +716,7 @@ public class MQTTProtocolConverter {
 
         long keepAliveMS = keepAliveSeconds * 1000;
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("MQTT Client " + getClientId() + " requests heart beat of  " + keepAliveMS + " ms");
-        }
+        LOG.debug("MQTT Client {} requests heart beat of {} ms", getClientId(), keepAliveMS);
 
         try {
             // if we have a default keep-alive value, and the client is trying to turn off keep-alive,
@@ -822,12 +733,8 @@ public class MQTTProtocolConverter {
             monitor.setReadGraceTime(readGracePeriod);
             monitor.startMonitorThread();
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("MQTT Client " + getClientId() +
-                        " established heart beat of  " + keepAliveMS +
-                        " ms (" + keepAliveMS + "ms + " + readGracePeriod +
-                        "ms grace period)");
-            }
+            LOG.debug("MQTT Client {} established heart beat of  {} ms ({} ms + {} ms grace period)",
+                      new Object[] { getClientId(), keepAliveMS, keepAliveMS, readGracePeriod });
         } catch (Exception ex) {
             LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
         }
@@ -835,9 +742,7 @@ public class MQTTProtocolConverter {
 
     void handleException(Throwable exception, MQTTFrame command) {
         LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Exception detail", exception);
-        }
+        LOG.debug("Exception detail", exception);
 
         if (connected.get() && connectionInfo != null) {
             connected.set(false);
@@ -861,7 +766,6 @@ public class MQTTProtocolConverter {
     }
 
     ResponseHandler createResponseHandler(final PUBLISH command) {
-
         if (command != null) {
             switch (command.qos()) {
                 case AT_LEAST_ONCE:
@@ -944,6 +848,22 @@ public class MQTTProtocolConverter {
         return connectionId;
     }
 
+    public ConsumerId getNextConsumerId() {
+        return new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+    }
+
+    public boolean isCleanSession() {
+        return this.connect.cleanSession();
+    }
+
+    public String getSubscriptionStrategyName() {
+        return subscriptionStrategyName;
+    }
+
+    public void setSubscriptionStrategyName(String name) {
+        this.subscriptionStrategyName = name;
+    }
+
     public String getClientId() {
         if (clientId == null) {
             if (connect != null && connect.clientId() != null) {
@@ -954,4 +874,33 @@ public class MQTTProtocolConverter {
         }
         return clientId;
     }
+
+    protected MQTTSubscriptionStrategy getSubscriptionStrategy() throws IOException {
+        if (subsciptionStrategy == null) {
+            synchronized (STRATAGY_FINDER) {
+                if (subsciptionStrategy != null) {
+                    return subsciptionStrategy;
+                }
+
+                MQTTSubscriptionStrategy strategy = null;
+                if (subscriptionStrategyName != null && !subscriptionStrategyName.isEmpty()) {
+                    try {
+                        strategy = (MQTTSubscriptionStrategy) STRATAGY_FINDER.newInstance(subscriptionStrategyName);
+                        LOG.debug("MQTT Using subscription strategy: {}", subscriptionStrategyName);
+                        if (strategy instanceof BrokerServiceAware) {
+                            ((BrokerServiceAware)strategy).setBrokerService(brokerService);
+                        }
+                        strategy.initialize(this);
+                    } catch (Exception e) {
+                        throw IOExceptionSupport.create(e);
+                    }
+                } else {
+                    throw new IOException("Invalid subscription strategy name given: " + subscriptionStrategyName);
+                }
+
+                this.subsciptionStrategy = strategy;
+            }
+        }
+        return subsciptionStrategy;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
index bbed655..d265335 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
@@ -132,7 +132,12 @@ public class MQTTSubscription {
     /**
      * @return the assigned QoS value for this subscription.
      */
-    public QoS qos() {
+    public QoS getQoS() {
         return qos;
     }
+
+    @Override
+    public String toString() {
+        return "MQTT Sub: topic[" + topicName + "] -> [" + consumerInfo.getDestination() + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
index da23ba5..47afedb 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
@@ -17,13 +17,13 @@
 package org.apache.activemq.transport.mqtt;
 
 import java.io.IOException;
-import java.net.Socket;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.activemq.broker.BrokerContext;
+import javax.net.ServerSocketFactory;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.transport.MutexTransport;
@@ -33,8 +33,6 @@ import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.wireformat.WireFormat;
 
-import javax.net.ServerSocketFactory;
-
 /**
  * A <a href="http://mqtt.org/">MQTT</a> transport factory
  */
@@ -42,16 +40,19 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS
 
     private BrokerService brokerService = null;
 
+    @Override
     protected String getDefaultWireFormatType() {
         return "mqtt";
     }
 
+    @Override
     protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
-        TcpTransportServer result =  new TcpTransportServer(this, location, serverSocketFactory);
+        TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory);
         result.setAllowLinkStealing(true);
         return result;
     }
 
+    @Override
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
         transport = new MQTTTransportFilter(transport, format, brokerService);
@@ -59,6 +60,7 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS
         return super.compositeConfigure(transport, format, options);
     }
 
+    @Override
     public void setBrokerService(BrokerService brokerService) {
         this.brokerService = brokerService;
     }
@@ -79,10 +81,8 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS
     @Override
     protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
         MQTTInactivityMonitor monitor = new MQTTInactivityMonitor(transport, format);
-
         MQTTTransportFilter filter = transport.narrow(MQTTTransportFilter.class);
         filter.setInactivityMonitor(monitor);
-
         return monitor;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
index 8612c25..ae557ab 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.mqtt;
 import java.io.IOException;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import javax.jms.JMSException;
 
 import org.apache.activemq.broker.BrokerService;
@@ -29,7 +30,20 @@ import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
-import org.fusesource.mqtt.codec.*;
+import org.fusesource.mqtt.codec.CONNACK;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.fusesource.mqtt.codec.DISCONNECT;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.PINGREQ;
+import org.fusesource.mqtt.codec.PINGRESP;
+import org.fusesource.mqtt.codec.PUBACK;
+import org.fusesource.mqtt.codec.PUBCOMP;
+import org.fusesource.mqtt.codec.PUBLISH;
+import org.fusesource.mqtt.codec.PUBREC;
+import org.fusesource.mqtt.codec.PUBREL;
+import org.fusesource.mqtt.codec.SUBACK;
+import org.fusesource.mqtt.codec.SUBSCRIBE;
+import org.fusesource.mqtt.codec.UNSUBSCRIBE;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -197,6 +211,14 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
         protocolConverter.setPublishDollarTopics(publishDollarTopics);
     }
 
+    public String getSubscriptionStrategyName() {
+        return protocolConverter != null ? protocolConverter.getSubscriptionStrategyName() : "default";
+    }
+
+    public void setSubscriptionStrategyName(String name) {
+        protocolConverter.setSubscriptionStrategyName(name);
+    }
+
     public int getActiveMQSubscriptionPrefetch() {
         return protocolConverter.getActiveMQSubscriptionPrefetch();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
new file mode 100644
index 0000000..60259e2
--- /dev/null
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.strategy;
+
+import java.util.Set;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.TopicRegion;
+import org.apache.activemq.broker.region.virtual.VirtualTopicInterceptor;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
+import org.apache.activemq.transport.mqtt.MQTTProtocolException;
+import org.apache.activemq.transport.mqtt.MQTTSubscription;
+
+/**
+ * Abstract implementation of the {@link MQTTSubscriptionStrategy} interface providing
+ * the base functionality that is common to most implementations.
+ */
+public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscriptionStrategy, BrokerServiceAware {
+
+    protected MQTTProtocolConverter protocol;
+    protected BrokerService brokerService;
+
+    @Override
+    public void initialize(MQTTProtocolConverter protocol) throws MQTTProtocolException {
+        setProtocolConverter(protocol);
+    }
+
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+
+    @Override
+    public void setProtocolConverter(MQTTProtocolConverter parent) {
+        this.protocol = parent;
+    }
+
+    @Override
+    public MQTTProtocolConverter getProtocolConverter() {
+        return protocol;
+    }
+
+    @Override
+    public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
+        String topicName = mqttSubscription.getTopicName();
+
+        // get TopicRegion
+        RegionBroker regionBroker;
+        try {
+            regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
+        } catch (Exception e) {
+            throw new MQTTProtocolException("Error subscribing to " + topicName + ": " + e.getMessage(), false, e);
+        }
+        final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
+
+        final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
+        final ConsumerId consumerId = consumerInfo.getConsumerId();
+
+        // use actual client id used to create connection to lookup connection
+        // context
+        final String connectionInfoClientId = protocol.getClientId();
+        final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfoClientId);
+
+        // get all matching Topics
+        final Set<org.apache.activemq.broker.region.Destination> matchingDestinations =
+            topicRegion.getDestinations(mqttSubscription.getDestination());
+        for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) {
+
+            // recover retroactive messages for matching subscription
+            for (Subscription subscription : dest.getConsumers()) {
+                if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
+                    try {
+                        if (dest instanceof org.apache.activemq.broker.region.Topic) {
+                            ((org.apache.activemq.broker.region.Topic) dest).recoverRetroactiveMessages(connectionContext, subscription);
+                        } else if (dest instanceof VirtualTopicInterceptor) {
+                            ((VirtualTopicInterceptor) dest).getTopic().recoverRetroactiveMessages(connectionContext, subscription);
+                        }
+                        if (subscription instanceof PrefetchSubscription) {
+                            // request dispatch for prefetch subs
+                            PrefetchSubscription prefetchSubscription = (PrefetchSubscription) subscription;
+                            prefetchSubscription.dispatchPending();
+                        }
+                    } catch (Exception e) {
+                        throw new MQTTProtocolException("Error recovering retained messages for " + dest.getName() + ": " + e.getMessage(), false, e);
+                    }
+                    break;
+                }
+            }
+        }
+    }
+
+    @Override
+    public ActiveMQDestination onSend(String topicName) {
+        return new ActiveMQTopic(topicName);
+    }
+
+    @Override
+    public String onSend(ActiveMQDestination destination) {
+        return destination.getPhysicalName();
+    }
+
+    @Override
+    public boolean isControlTopic(ActiveMQDestination destination) {
+        return destination.getPhysicalName().startsWith("$");
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
new file mode 100644
index 0000000..4e8f362
--- /dev/null
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.strategy;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.store.PersistenceAdapterSupport;
+import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
+import org.apache.activemq.transport.mqtt.MQTTProtocolException;
+import org.apache.activemq.transport.mqtt.MQTTProtocolSupport;
+import org.apache.activemq.transport.mqtt.MQTTSubscription;
+import org.apache.activemq.transport.mqtt.ResponseHandler;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation that uses unmapped topic subscriptions.
+ */
+public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTDefaultSubscriptionStrategy.class);
+
+    private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>());
+
+    @Override
+    public void onConnect(CONNECT connect) throws MQTTProtocolException {
+        List<SubscriptionInfo> subs;
+        try {
+            subs = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), protocol.getClientId());
+        } catch (IOException e) {
+            throw new MQTTProtocolException("Error loading store subscriptions", true, e);
+        }
+        if (connect.cleanSession()) {
+            deleteDurableSubs(subs);
+        } else {
+            restoreDurableSubs(subs);
+        }
+    }
+
+    @Override
+    public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException {
+        ActiveMQDestination destination = new ActiveMQTopic(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName));
+
+        ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId());
+        consumerInfo.setDestination(destination);
+        consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
+        consumerInfo.setRetroactive(true);
+        consumerInfo.setDispatchAsync(true);
+        // create durable subscriptions only when clean session is false
+        if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
+            consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName);
+        }
+
+        return protocol.doSubscribe(consumerInfo, topicName, requestedQoS);
+    }
+
+    @Override
+    public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
+
+        ActiveMQDestination destination = mqttSubscription.getDestination();
+
+        // check whether the Topic has been recovered in restoreDurableSubs
+        // mark subscription available for recovery for duplicate subscription
+        if (restoredSubs.remove(destination.getPhysicalName())) {
+            return;
+        }
+
+        super.onReSubscribe(mqttSubscription);
+    }
+
+    @Override
+    public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException {
+        // check if the durable sub also needs to be removed
+        if (subscription.getConsumerInfo().getSubscriptionName() != null) {
+            // also remove it from restored durable subscriptions set
+            restoredSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName()));
+
+            RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+            rsi.setConnectionId(protocol.getConnectionId());
+            rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName());
+            rsi.setClientId(protocol.getClientId());
+            protocol.sendToActiveMQ(rsi, new ResponseHandler() {
+                @Override
+                public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
+                    // ignore failures..
+                }
+            });
+        }
+    }
+
+    private void deleteDurableSubs(List<SubscriptionInfo> subs) {
+        try {
+            for (SubscriptionInfo sub : subs) {
+                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+                rsi.setConnectionId(protocol.getConnectionId());
+                rsi.setSubscriptionName(sub.getSubcriptionName());
+                rsi.setClientId(sub.getClientId());
+                protocol.sendToActiveMQ(rsi, new ResponseHandler() {
+                    @Override
+                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
+                        // ignore failures..
+                    }
+                });
+            }
+        } catch (Throwable e) {
+            LOG.warn("Could not delete the MQTT durable subs.", e);
+        }
+    }
+
+    private void restoreDurableSubs(List<SubscriptionInfo> subs) {
+        try {
+            for (SubscriptionInfo sub : subs) {
+                String name = sub.getSubcriptionName();
+                String[] split = name.split(":", 2);
+                QoS qoS = QoS.valueOf(split[0]);
+                protocol.onSubscribe(new Topic(split[1], qoS));
+                // mark this durable subscription as restored by Broker
+                restoredSubs.add(split[1]);
+            }
+        } catch (IOException e) {
+            LOG.warn("Could not restore the MQTT durable subs.", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java
new file mode 100644
index 0000000..0eaa72a
--- /dev/null
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.strategy;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
+import org.apache.activemq.transport.mqtt.MQTTProtocolException;
+import org.apache.activemq.transport.mqtt.MQTTSubscription;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.codec.CONNECT;
+
+/**
+ * Subscription management strategy used to control how MQTT clients
+ * subscribe to destination and how messages are addressed in order to
+ * arrive on the appropriate destinations.
+ */
+public interface MQTTSubscriptionStrategy {
+
+    /**
+     * Initialize the strategy before first use.
+     *
+     * @param protocol
+     *        the MQTTProtocolConverter that is initializing the strategy
+     *
+     * @throws MQTTProtocolException if an error occurs during initialization.
+     */
+    public void initialize(MQTTProtocolConverter protocol) throws MQTTProtocolException;
+
+    /**
+     * Allows the strategy to perform any needed actions on client connect
+     * prior to the CONNACK frame being sent back such as recovering old
+     * subscriptions and performing any clean session actions.
+     *
+     * @throws MQTTProtocolException if an error occurs while processing the connect actions.
+     */
+    public void onConnect(CONNECT connect) throws MQTTProtocolException;
+
+    /**
+     * Called when a new Subscription is being requested.  This method allows the
+     * strategy to create a specific type of subscription for the client such as
+     * mapping topic subscriptions to Queues etc.
+     *
+     * @param topicName
+     *        the requested Topic name to subscribe to.
+     * @param requestedQoS
+     *        the QoS level that the client has requested for this subscription.
+     *
+     * @return the assigned QoS value given to the new subscription
+     *
+     * @throws MQTTProtocolException if an error occurs while processing the subscribe actions.
+     */
+    public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException;
+
+    /**
+     * Called when a client sends a duplicate subscribe request which should
+     * force any retained messages on that topic to be replayed again as though
+     * the client had just subscribed for the first time.  The method should
+     * not unsubscribe the client as it might miss messages sent while the
+     * subscription is being recreated.
+     *
+     * @param subscription
+     *        the MQTTSubscription that contains the subscription state.
+     */
+    public void onReSubscribe(MQTTSubscription subscription) throws MQTTProtocolException;
+
+    /**
+     * Called when a client requests an un-subscribe a previous subscription.
+     *
+     * @param subscription
+     *        the {@link MQTTSubscription} that is being removed.
+     *
+     * @throws MQTTProtocolException if an error occurs during the un-subscribe processing.
+     */
+    public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException;
+
+    /**
+     * Intercepts PUBLISH operations from the client and allows the strategy to map the
+     * target destination so that the send operation will land in the destinations that
+     * this strategy has mapped the incoming subscribe requests to.
+     *
+     * @param topicName
+     *        the targeted Topic that the client sent the message to.
+     *
+     * @return an ActiveMQ Topic instance that lands the send in the correct destinations.
+     */
+    public ActiveMQDestination onSend(String topicName);
+
+    /**
+     * Intercepts send operations from the broker and allows the strategy to map the
+     * target topic name so that the client sees a valid Topic name.
+     *
+     * @param destination
+     *        the destination that the message was dispatched from
+     *
+     * @return an Topic name that is valid for the receiving client.
+     */
+    public String onSend(ActiveMQDestination destination);
+
+    /**
+     * Allows the protocol handler to interrogate an destination name to determine if it
+     * is equivalent to the MQTT control topic (starts with $).  Since the mapped destinations
+     * that the strategy might alter the naming scheme the strategy must provide a way to
+     * reverse map and determine if the destination was originally an MQTT control topic.
+     *
+     * @param destination
+     *        the destination to query.
+     *
+     * @return true if the destination is an MQTT control topic.
+     */
+    public boolean isControlTopic(ActiveMQDestination destination);
+
+    /**
+     * Sets the {@link MQTTProtocolConverter} that is the parent of this strategy object.
+     *
+     * @param parent
+     *        the {@link MQTTProtocolConverter} that owns this strategy.
+     */
+    public void setProtocolConverter(MQTTProtocolConverter parent);
+
+    /**
+     * @return the {@link MQTTProtocolConverter} that owns this strategy.
+     */
+    public MQTTProtocolConverter getProtocolConverter();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
new file mode 100644
index 0000000..64995c6
--- /dev/null
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.strategy;
+
+import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertActiveMQToMQTT;
+import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertMQTTToActiveMQ;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.store.PersistenceAdapterSupport;
+import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
+import org.apache.activemq.transport.mqtt.MQTTProtocolException;
+import org.apache.activemq.transport.mqtt.MQTTSubscription;
+import org.apache.activemq.transport.mqtt.ResponseHandler;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.codec.CONNECT;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Subscription strategy that converts all MQTT subscribes that would be durable to
+ * Virtual Topic Queue subscriptions.  Also maps all publish requests to be prefixed
+ * with the VirtualTopic. prefix unless already present.
+ */
+public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy {
+
+    private static final String VIRTUALTOPIC_PREFIX = "VirtualTopic.";
+    private static final String VIRTUALTOPIC_CONSUMER_PREFIX = "Consumer.";
+
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTVirtualTopicSubscriptionStrategy.class);
+
+    private final Set<ActiveMQQueue> restoredQueues = Collections.synchronizedSet(new HashSet<ActiveMQQueue>());
+
+    @Override
+    public void onConnect(CONNECT connect) throws MQTTProtocolException {
+        List<ActiveMQQueue> queues;
+        try {
+            queues = PersistenceAdapterSupport.listQueues(brokerService.getPersistenceAdapter(), new PersistenceAdapterSupport.DestinationMatcher() {
+
+                @Override
+                public boolean matches(ActiveMQDestination destination) {
+                    if (destination.getPhysicalName().startsWith("Consumer." + protocol.getClientId())) {
+                        LOG.debug("Recovered client sub: {}", destination.getPhysicalName());
+                        return true;
+                    }
+                    return false;
+                }
+            });
+        } catch (IOException e) {
+            throw new MQTTProtocolException("Error restoring durable subscriptions", true, e);
+        }
+
+        if (connect.cleanSession()) {
+            deleteDurableQueues(queues);
+        } else {
+            restoreDurableQueue(queues);
+        }
+    }
+
+    @Override
+    public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException {
+        ActiveMQDestination destination = null;
+        if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
+            String converted = VIRTUALTOPIC_CONSUMER_PREFIX + protocol.getClientId() + ":" + requestedQoS + "." +
+                               VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName);
+            destination = new ActiveMQQueue(converted);
+        } else {
+            String converted = convertMQTTToActiveMQ(topicName);
+            if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) {
+                converted = VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName);
+            }
+            destination = new ActiveMQTopic(converted);
+        }
+
+        ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId());
+        consumerInfo.setDestination(destination);
+        consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
+        consumerInfo.setRetroactive(true);
+        consumerInfo.setDispatchAsync(true);
+
+        return protocol.doSubscribe(consumerInfo, topicName, requestedQoS);
+    }
+
+    @Override
+    public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
+
+        ActiveMQDestination destination = mqttSubscription.getDestination();
+
+        // check whether the Topic has been recovered in restoreDurableSubs
+        // mark subscription available for recovery for duplicate subscription
+        if (restoredQueues.remove(destination)) {
+            return;
+        }
+
+        if (mqttSubscription.getDestination().isTopic()) {
+            super.onReSubscribe(mqttSubscription);
+        } else {
+            protocol.doUnSubscribe(mqttSubscription);
+            ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
+            consumerInfo.setConsumerId(protocol.getNextConsumerId());
+            protocol.doSubscribe(consumerInfo, mqttSubscription.getTopicName(), mqttSubscription.getQoS());
+        }
+    }
+
+    @Override
+    public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException {
+        if (subscription.getDestination().isQueue()) {
+            DestinationInfo remove = new DestinationInfo();
+            remove.setConnectionId(protocol.getConnectionId());
+            remove.setDestination(subscription.getDestination());
+            remove.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
+
+            protocol.sendToActiveMQ(remove, new ResponseHandler() {
+                @Override
+                public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
+                    // ignore failures..
+                }
+            });
+        }
+    }
+
+    @Override
+    public ActiveMQDestination onSend(String topicName) {
+        if (!topicName.startsWith(VIRTUALTOPIC_PREFIX)) {
+            return new ActiveMQTopic(VIRTUALTOPIC_PREFIX + topicName);
+        } else {
+            return new ActiveMQTopic(topicName);
+        }
+    }
+
+    @Override
+    public String onSend(ActiveMQDestination destination) {
+        String amqTopicName = destination.getPhysicalName();
+        if (amqTopicName.startsWith(VIRTUALTOPIC_PREFIX)) {
+            amqTopicName = amqTopicName.substring(VIRTUALTOPIC_PREFIX.length());
+        }
+        return amqTopicName;
+    }
+
+    @Override
+    public boolean isControlTopic(ActiveMQDestination destination) {
+        String destinationName = destination.getPhysicalName();
+        if (destinationName.startsWith("$") || destinationName.startsWith(VIRTUALTOPIC_PREFIX + "$")) {
+            return true;
+        }
+        return false;
+    }
+
+    private void deleteDurableQueues(List<ActiveMQQueue> queues) {
+        try {
+            for (ActiveMQQueue queue : queues) {
+                DestinationInfo removeAction = new DestinationInfo();
+                removeAction.setConnectionId(protocol.getConnectionId());
+                removeAction.setDestination(queue);
+                removeAction.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
+
+                protocol.sendToActiveMQ(removeAction, new ResponseHandler() {
+                    @Override
+                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
+                        // ignore failures..
+                    }
+                });
+            }
+        } catch (Throwable e) {
+            LOG.warn("Could not delete the MQTT durable subs.", e);
+        }
+    }
+
+    private void restoreDurableQueue(List<ActiveMQQueue> queues) {
+        try {
+            for (ActiveMQQueue queue : queues) {
+                String name = queue.getPhysicalName().substring(VIRTUALTOPIC_CONSUMER_PREFIX.length());
+                StringTokenizer tokenizer = new StringTokenizer(name);
+                tokenizer.nextToken(":.");
+                String qosString = tokenizer.nextToken();
+                tokenizer.nextToken();
+                String topicName = convertActiveMQToMQTT(tokenizer.nextToken("").substring(1));
+                QoS qoS = QoS.valueOf(qosString);
+                LOG.trace("Restoring subscription: {}:{}", topicName, qoS);
+
+                ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId());
+                consumerInfo.setDestination(queue);
+                consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
+                consumerInfo.setRetroactive(true);
+                consumerInfo.setDispatchAsync(true);
+
+                protocol.doSubscribe(consumerInfo, topicName, qoS);
+
+                // mark this durable subscription as restored by Broker
+                restoredQueues.add(queue);
+            }
+        } catch (IOException e) {
+            LOG.warn("Could not restore the MQTT durable subs.", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-default-subscriptions
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-default-subscriptions b/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-default-subscriptions
new file mode 100644
index 0000000..006e0ba
--- /dev/null
+++ b/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-default-subscriptions
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.mqtt.strategy.MQTTDefaultSubscriptionStrategy
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-virtual-topic-subscriptions
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-virtual-topic-subscriptions b/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-virtual-topic-subscriptions
new file mode 100644
index 0000000..893b0b3
--- /dev/null
+++ b/activemq-mqtt/src/main/resources/META-INF/services/org/apache/activemq/transport/strategies/mqtt-virtual-topic-subscriptions
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.mqtt.strategy.MQTTVirtualTopicSubscriptionStrategy
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index c4571dc..3b4062d 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -62,6 +62,7 @@ import org.fusesource.mqtt.client.Topic;
 import org.fusesource.mqtt.client.Tracer;
 import org.fusesource.mqtt.codec.MQTTFrame;
 import org.fusesource.mqtt.codec.PUBLISH;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -153,7 +154,7 @@ public class MQTTTest extends MQTTTestSupport {
             publishProvider.publish(topic, payload.getBytes(), AT_LEAST_ONCE);
         }
 
-        latch.await(10, TimeUnit.SECONDS);
+        latch.await(20, TimeUnit.SECONDS);
         assertEquals(0, latch.getCount());
         subscriptionProvider.disconnect();
         publishProvider.disconnect();
@@ -488,13 +489,83 @@ public class MQTTTest extends MQTTTestSupport {
     @Test(timeout = 120 * 1000)
     public void testRetainedMessage() throws Exception {
         MQTT mqtt = createMQTTConnection();
-        mqtt.setKeepAlive((short) 2);
+        mqtt.setKeepAlive((short) 60);
 
         final String RETAIN = "RETAIN";
         final String TOPICA = "TopicA";
 
         final String[] clientIds = { null, "foo", "durable" };
         for (String clientId : clientIds) {
+            LOG.info("Testing now with Client ID: {}", clientId);
+
+            mqtt.setClientId(clientId);
+            mqtt.setCleanSession(!"durable".equals(clientId));
+
+            BlockingConnection connection = mqtt.blockingConnection();
+            connection.connect();
+
+            // set retained message and check
+            connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+            Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+            assertNotNull("No retained message for " + clientId, msg);
+            assertEquals(RETAIN, new String(msg.getPayload()));
+            msg.ack();
+            assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+            // test duplicate subscription
+            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+            msg = connection.receive(15000, TimeUnit.MILLISECONDS);
+            assertNotNull("No retained message on duplicate subscription for " + clientId, msg);
+            assertEquals(RETAIN, new String(msg.getPayload()));
+            msg.ack();
+            assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+            connection.unsubscribe(new String[]{TOPICA});
+
+            // clear retained message and check that we don't receive it
+            connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true);
+            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+            msg = connection.receive(500, TimeUnit.MILLISECONDS);
+            assertNull("Retained message not cleared for " + clientId, msg);
+            connection.unsubscribe(new String[]{TOPICA});
+
+            // set retained message again and check
+            connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+            msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+            assertNotNull("No reset retained message for " + clientId, msg);
+            assertEquals(RETAIN, new String(msg.getPayload()));
+            msg.ack();
+            assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+            // re-connect and check
+            connection.disconnect();
+            connection = mqtt.blockingConnection();
+            connection.connect();
+            connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+            msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+            assertNotNull("No reset retained message for " + clientId, msg);
+            assertEquals(RETAIN, new String(msg.getPayload()));
+            msg.ack();
+            assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+            connection.unsubscribe(new String[]{TOPICA});
+            connection.disconnect();
+        }
+    }
+
+    @Ignore
+    @Test(timeout = 120 * 1000)
+    public void testRetainedMessageOnVirtualTopics() throws Exception {
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setKeepAlive((short) 60);
+
+        final String RETAIN = "RETAIN";
+        final String TOPICA = "VirtualTopic/TopicA";
+
+        final String[] clientIds = { null, "foo", "durable" };
+        for (String clientId : clientIds) {
+            LOG.info("Testing now with Client ID: {}", clientId);
 
             mqtt.setClientId(clientId);
             mqtt.setCleanSession(!"durable".equals(clientId));
@@ -547,6 +618,7 @@ public class MQTTTest extends MQTTTestSupport {
             msg.ack();
             assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
 
+            LOG.info("Test now unsubscribing from: {} for the last time", TOPICA);
             connection.unsubscribe(new String[]{TOPICA});
             connection.disconnect();
         }
@@ -914,9 +986,12 @@ public class MQTTTest extends MQTTTestSupport {
 
     @Test(timeout = 60 * 1000)
     public void testSendMQTTReceiveJMS() throws Exception {
+        doTestSendMQTTReceiveJMS("foo.*");
+    }
+
+    public void doTestSendMQTTReceiveJMS(String destinationName) throws Exception {
         final MQTTClientProvider provider = getMQTTClientProvider();
         initializeConnection(provider);
-        final String DESTINATION_NAME = "foo.*";
 
         // send retained message
         final String RETAINED = "RETAINED";
@@ -927,7 +1002,7 @@ public class MQTTTest extends MQTTTestSupport {
         activeMQConnection.setUseRetroactiveConsumer(true);
         activeMQConnection.start();
         Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
+        javax.jms.Topic jmsTopic = s.createTopic(destinationName);
         MessageConsumer consumer = s.createConsumer(jmsTopic);
 
         // check whether we received retained message on JMS subscribe
@@ -952,6 +1027,10 @@ public class MQTTTest extends MQTTTestSupport {
 
     @Test(timeout = 2 * 60 * 1000)
     public void testSendJMSReceiveMQTT() throws Exception {
+        doTestSendJMSReceiveMQTT("foo.far");
+    }
+
+    public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
         final MQTTClientProvider provider = getMQTTClientProvider();
         initializeConnection(provider);
 
@@ -959,7 +1038,7 @@ public class MQTTTest extends MQTTTestSupport {
         activeMQConnection.setUseRetroactiveConsumer(true);
         activeMQConnection.start();
         Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        javax.jms.Topic jmsTopic = s.createTopic("foo.far");
+        javax.jms.Topic jmsTopic = s.createTopic(destinationName);
         MessageProducer producer = s.createProducer(jmsTopic);
 
         // send retained message from JMS
@@ -1130,10 +1209,14 @@ public class MQTTTest extends MQTTTestSupport {
 
     @Test(timeout = 30 * 10000)
     public void testJmsMapping() throws Exception {
+        doTestJmsMapping("test.foo");
+    }
+
+    public void doTestJmsMapping(String destinationName) throws Exception {
         // start up jms consumer
         Connection jmsConn = cf.createConnection();
         Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Destination dest = session.createTopic("test.foo");
+        Destination dest = session.createTopic(destinationName);
         MessageConsumer consumer = session.createConsumer(dest);
         jmsConn.start();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/413e4840/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
new file mode 100644
index 0000000..6605f53
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Run the basic tests with the NIO Transport.
+ */
+public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        protocolConfig = "transport.subscriptionStrategyName=mqtt-virtual-topic-subscriptions";
+        super.setUp();
+    }
+
+    // TODO - This currently fails on the durable case because we have a hard time
+    //        recovering the original Topic name when a client tries to subscribe
+    //        durable to a VirtualTopic.* type topic.
+    @Override
+    @Ignore
+    public void testRetainedMessageOnVirtualTopics() throws Exception {}
+
+    @Override
+    @Test(timeout = 60 * 1000)
+    public void testSendMQTTReceiveJMS() throws Exception {
+        doTestSendMQTTReceiveJMS("VirtualTopic.foo.*");
+    }
+
+    @Override
+    @Test(timeout = 2 * 60 * 1000)
+    public void testSendJMSReceiveMQTT() throws Exception {
+        doTestSendJMSReceiveMQTT("VirtualTopic.foo.far");
+    }
+
+    @Override
+    @Test(timeout = 30 * 10000)
+    public void testJmsMapping() throws Exception {
+        doTestJmsMapping("VirtualTopic.test.foo");
+    }
+}