You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/07/09 16:18:07 UTC

[1/5] activemq-artemis git commit: Added Initial MQTT Protocol Support

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 077e9e266 -> 17cc62bca


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/util/ResourceLoadingSslContext.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/util/ResourceLoadingSslContext.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/util/ResourceLoadingSslContext.java
new file mode 100644
index 0000000..9a4c03f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/util/ResourceLoadingSslContext.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.mqtt.imported.util;
+
+import javax.annotation.PostConstruct;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.File;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.security.KeyStore;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.broker.SslContext;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.core.io.Resource;
+import org.springframework.core.io.UrlResource;
+import org.springframework.util.ResourceUtils;
+
+/**
+ * Extends the SslContext so that it's easier to configure from spring.
+ */
+public class ResourceLoadingSslContext extends SslContext
+{
+
+   private String keyStoreType = "jks";
+   private String trustStoreType = "jks";
+
+   private String secureRandomAlgorithm = "SHA1PRNG";
+   private String keyStoreAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
+   private String trustStoreAlgorithm = TrustManagerFactory.getDefaultAlgorithm();
+
+   private String keyStore;
+   private String trustStore;
+
+   private String keyStoreKeyPassword;
+   private String keyStorePassword;
+   private String trustStorePassword;
+
+   /**
+    * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
+    * <p/>
+    * delegates to afterPropertiesSet, done to prevent backwards incompatible
+    * signature change.
+    */
+   @PostConstruct
+   private void postConstruct()
+   {
+      try
+      {
+         afterPropertiesSet();
+      }
+      catch (Exception ex)
+      {
+         throw new RuntimeException(ex);
+      }
+   }
+
+   /**
+    * @throws Exception
+    * @org.apache.xbean.InitMethod
+    */
+   public void afterPropertiesSet() throws Exception
+   {
+      keyManagers.addAll(createKeyManagers());
+      trustManagers.addAll(createTrustManagers());
+      if (secureRandom == null)
+      {
+         secureRandom = createSecureRandom();
+      }
+   }
+
+   private SecureRandom createSecureRandom() throws NoSuchAlgorithmException
+   {
+      return SecureRandom.getInstance(secureRandomAlgorithm);
+   }
+
+   private Collection<TrustManager> createTrustManagers() throws Exception
+   {
+      KeyStore ks = createTrustManagerKeyStore();
+      if (ks == null)
+      {
+         return new ArrayList<TrustManager>(0);
+      }
+
+      TrustManagerFactory tmf = TrustManagerFactory.getInstance(trustStoreAlgorithm);
+      tmf.init(ks);
+      return Arrays.asList(tmf.getTrustManagers());
+   }
+
+   private Collection<KeyManager> createKeyManagers() throws Exception
+   {
+      KeyStore ks = createKeyManagerKeyStore();
+      if (ks == null)
+      {
+         return new ArrayList<KeyManager>(0);
+      }
+
+      KeyManagerFactory tmf = KeyManagerFactory.getInstance(keyStoreAlgorithm);
+      tmf.init(ks, keyStoreKeyPassword == null ? (keyStorePassword == null ? null : keyStorePassword.toCharArray()) : keyStoreKeyPassword.toCharArray());
+      return Arrays.asList(tmf.getKeyManagers());
+   }
+
+   private KeyStore createTrustManagerKeyStore() throws Exception
+   {
+      if (trustStore == null)
+      {
+         return null;
+      }
+
+      KeyStore ks = KeyStore.getInstance(trustStoreType);
+      InputStream is = resourceFromString(trustStore).getInputStream();
+      try
+      {
+         ks.load(is, trustStorePassword == null ? null : trustStorePassword.toCharArray());
+      }
+      finally
+      {
+         is.close();
+      }
+      return ks;
+   }
+
+   private KeyStore createKeyManagerKeyStore() throws Exception
+   {
+      if (keyStore == null)
+      {
+         return null;
+      }
+
+      KeyStore ks = KeyStore.getInstance(keyStoreType);
+      InputStream is = resourceFromString(keyStore).getInputStream();
+      try
+      {
+         ks.load(is, keyStorePassword == null ? null : keyStorePassword.toCharArray());
+      }
+      finally
+      {
+         is.close();
+      }
+      return ks;
+   }
+
+   public String getTrustStoreType()
+   {
+      return trustStoreType;
+   }
+
+   public String getKeyStoreType()
+   {
+      return keyStoreType;
+   }
+
+   public String getKeyStore()
+   {
+      return keyStore;
+   }
+
+   public void setKeyStore(String keyStore) throws MalformedURLException
+   {
+      this.keyStore = keyStore;
+   }
+
+   public String getTrustStore()
+   {
+      return trustStore;
+   }
+
+   public void setTrustStore(String trustStore) throws MalformedURLException
+   {
+      this.trustStore = trustStore;
+   }
+
+   public String getKeyStoreAlgorithm()
+   {
+      return keyStoreAlgorithm;
+   }
+
+   public void setKeyStoreAlgorithm(String keyAlgorithm)
+   {
+      this.keyStoreAlgorithm = keyAlgorithm;
+   }
+
+   public String getTrustStoreAlgorithm()
+   {
+      return trustStoreAlgorithm;
+   }
+
+   public void setTrustStoreAlgorithm(String trustAlgorithm)
+   {
+      this.trustStoreAlgorithm = trustAlgorithm;
+   }
+
+   public String getKeyStoreKeyPassword()
+   {
+      return keyStoreKeyPassword;
+   }
+
+   public void setKeyStoreKeyPassword(String keyPassword)
+   {
+      this.keyStoreKeyPassword = keyPassword;
+   }
+
+   public String getKeyStorePassword()
+   {
+      return keyStorePassword;
+   }
+
+   public void setKeyStorePassword(String keyPassword)
+   {
+      this.keyStorePassword = keyPassword;
+   }
+
+   public String getTrustStorePassword()
+   {
+      return trustStorePassword;
+   }
+
+   public void setTrustStorePassword(String trustPassword)
+   {
+      this.trustStorePassword = trustPassword;
+   }
+
+   public void setKeyStoreType(String keyType)
+   {
+      this.keyStoreType = keyType;
+   }
+
+   public void setTrustStoreType(String trustType)
+   {
+      this.trustStoreType = trustType;
+   }
+
+   public String getSecureRandomAlgorithm()
+   {
+      return secureRandomAlgorithm;
+   }
+
+   public void setSecureRandomAlgorithm(String secureRandomAlgorithm)
+   {
+      this.secureRandomAlgorithm = secureRandomAlgorithm;
+   }
+
+   public static Resource resourceFromString(String uri) throws MalformedURLException
+   {
+      Resource resource;
+      File file = new File(uri);
+      if (file.exists())
+      {
+         resource = new FileSystemResource(uri);
+      }
+      else if (ResourceUtils.isUrl(uri))
+      {
+         resource = new UrlResource(uri);
+      }
+      else
+      {
+         resource = new ClassPathResource(uri);
+      }
+      return resource;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/util/Wait.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/util/Wait.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/util/Wait.java
new file mode 100644
index 0000000..84fc3a4
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/util/Wait.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.mqtt.imported.util;
+
+
+import java.util.concurrent.TimeUnit;
+
+public class Wait
+{
+
+   public static final long MAX_WAIT_MILLIS = 30 * 1000;
+   public static final int SLEEP_MILLIS = 1000;
+
+   public interface Condition
+   {
+      boolean isSatisified() throws Exception;
+   }
+
+   public static boolean waitFor(Condition condition) throws Exception
+   {
+      return waitFor(condition, MAX_WAIT_MILLIS);
+   }
+
+   public static boolean waitFor(final Condition condition, final long duration) throws Exception
+   {
+      return waitFor(condition, duration, SLEEP_MILLIS);
+   }
+
+   public static boolean waitFor(final Condition condition, final long duration, final int sleepMillis) throws Exception
+   {
+
+      final long expiry = System.currentTimeMillis() + duration;
+      boolean conditionSatisified = condition.isSatisified();
+      while (!conditionSatisified && System.currentTimeMillis() < expiry)
+      {
+         TimeUnit.MILLISECONDS.sleep(sleepMillis);
+         conditionSatisified = condition.isSatisified();
+      }
+      return conditionSatisified;
+   }
+}


[3/5] activemq-artemis git commit: Added Initial MQTT Protocol Support

Posted by cl...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
new file mode 100644
index 0000000..e7ac143
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class MQTTSubscriptionManager
+{
+   private MQTTSession session;
+
+   private ConcurrentHashMap<Long, Integer> consumerQoSLevels;
+
+   private ConcurrentHashMap<String, ServerConsumer> consumers;
+
+   private MQTTLogger log = MQTTLogger.LOGGER;
+
+   public MQTTSubscriptionManager(MQTTSession session)
+   {
+      this.session = session;
+
+      consumers = new ConcurrentHashMap<>();
+      consumerQoSLevels = new ConcurrentHashMap<>();
+   }
+
+   synchronized void start() throws Exception
+   {
+      for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions())
+      {
+         SimpleString q = createQueueForSubscription(subscription.topicName(), subscription.qualityOfService().value());
+         createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value());
+      }
+   }
+
+   synchronized void stop(boolean clean) throws Exception
+   {
+      for (ServerConsumer consumer : consumers.values())
+      {
+         consumer.setStarted(false);
+         consumer.disconnect();
+         consumer.getQueue().removeConsumer(consumer);
+         consumer.close(false);
+      }
+
+      if (clean)
+      {
+         for (ServerConsumer consumer : consumers.values())
+         {
+            session.getServer().destroyQueue(consumer.getQueue().getName());
+         }
+      }
+   }
+
+   /**
+    * Creates a Queue if it doesn't already exist, based on a topic and address.  Returning the queue name.
+    */
+   private SimpleString createQueueForSubscription(String topic, int qos) throws Exception
+   {
+      String address = MQTTUtil.convertMQTTAddressFilterToCore(topic);
+      SimpleString queue = getQueueNameForTopic(address);
+
+      Queue q = session.getServer().locateQueue(queue);
+      if (q == null)
+      {
+         session.getServerSession().createQueue(new SimpleString(address), queue, null, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0);
+      }
+      return queue;
+   }
+
+   /**
+    * Creates a new consumer for the queue associated with a subscription
+    */
+   private void createConsumerForSubscriptionQueue(SimpleString queue, String topic, int qos) throws Exception
+   {
+      long cid = session.getServer().getStorageManager().generateID();
+
+      ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, null, false, true, -1);
+      consumer.setStarted(true);
+
+      consumers.put(topic, consumer);
+      consumerQoSLevels.put(cid, qos);
+   }
+
+
+   private void addSubscription(MqttTopicSubscription subscription) throws Exception
+   {
+      MqttTopicSubscription s = session.getSessionState().getSubscription(subscription.topicName());
+
+      int qos = subscription.qualityOfService().value();
+      String topic = subscription.topicName();
+
+      session.getSessionState().addSubscription(subscription);
+
+      SimpleString q = createQueueForSubscription(topic, qos);
+
+      if (s == null)
+      {
+         createConsumerForSubscriptionQueue(q, topic, qos);
+      }
+      else
+      {
+         consumerQoSLevels.put(consumers.get(topic).getID(), qos);
+      }
+      session.getRetainMessageManager().addRetainedMessagesToQueue(q, topic);
+   }
+
+   void removeSubscriptions(List<String> topics) throws Exception
+   {
+      for (String topic : topics)
+      {
+         removeSubscription(topic);
+      }
+   }
+
+   private synchronized void removeSubscription(String address) throws Exception
+   {
+      ServerConsumer consumer = consumers.get(address);
+      String internalAddress = MQTTUtil.convertMQTTAddressFilterToCore(address);
+      SimpleString internalQueueName = getQueueNameForTopic(internalAddress);
+
+      Queue queue = session.getServer().locateQueue(internalQueueName);
+      queue.deleteQueue(true);
+      session.getSessionState().removeSubscription(address);
+      consumers.remove(address);
+      consumerQoSLevels.remove(consumer.getID());
+   }
+
+   private SimpleString getQueueNameForTopic(String topic)
+   {
+      return new SimpleString(session.getSessionState().getClientId() + "." + topic);
+   }
+
+   /**
+    * As per MQTT Spec.  Subscribes this client to a number of MQTT topics.
+    *
+    * @param subscriptions
+    * @return An array of integers representing the list of accepted QoS for each topic.
+    *
+    * @throws Exception
+    */
+   int[] addSubscriptions(List<MqttTopicSubscription> subscriptions) throws Exception
+   {
+      int[] qos = new int[subscriptions.size()];
+
+      for (int i = 0; i < subscriptions.size(); i++)
+      {
+         addSubscription(subscriptions.get(i));
+         qos[i] = subscriptions.get(i).qualityOfService().value();
+      }
+      return qos;
+   }
+
+   Map<Long, Integer> getConsumerQoSLevels()
+   {
+      return consumerQoSLevels;
+   }
+
+   ServerConsumer getConsumerForAddress(String address)
+   {
+      return consumers.get(address);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
new file mode 100644
index 0000000..c6f1a65
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+
+/**
+ * A Utility Class for creating Server Side objects and converting MQTT concepts to/from Artemis.
+ */
+
+public class MQTTUtil
+{
+   // TODO These settings should be configurable.
+   public static final int DEFAULT_SERVER_MESSAGE_BUFFER_SIZE = 512;
+
+   public static final boolean DURABLE_MESSAGES = true;
+
+   public static final boolean SESSION_AUTO_COMMIT_SENDS = true;
+
+   public static final boolean SESSION_AUTO_COMMIT_ACKS = false;
+
+   public static final boolean SESSION_PREACKNOWLEDGE = false;
+
+   public static final boolean SESSION_XA = false;
+
+   public static final boolean SESSION_AUTO_CREATE_QUEUE = false;
+
+   public static final int MAX_MESSAGE_SIZE = 268435455;
+
+   public static final String MQTT_ADDRESS_PREFIX = "$sys.mqtt.";
+
+   public static final String MQTT_RETAIN_ADDRESS_PREFIX = "$sys.mqtt.retain.";
+
+   public static final String MQTT_QOS_LEVEL_KEY = "mqtt.qos.level";
+
+   public static final String MQTT_MESSAGE_ID_KEY = "mqtt.message.id";
+
+   public static final String MQTT_MESSAGE_TYPE_KEY = "mqtt.message.type";
+
+   public static final String MQTT_MESSAGE_RETAIN_KEY = "mqtt.message.retain";
+
+   public static final int DEFAULT_KEEP_ALIVE_FREQUENCY = 5000;
+
+   public static String convertMQTTAddressFilterToCore(String filter)
+   {
+      return MQTT_ADDRESS_PREFIX + swapMQTTAndCoreWildCards(filter);
+   }
+
+   public static String convertCoreAddressFilterToMQTT(String filter)
+   {
+      if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX.toString()))
+      {
+         filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
+      }
+      else if (filter.startsWith(MQTT_ADDRESS_PREFIX.toString()))
+      {
+         filter = filter.substring(MQTT_ADDRESS_PREFIX.length(), filter.length());
+      }
+      return swapMQTTAndCoreWildCards(filter);
+   }
+
+   public static String convertMQTTAddressFilterToCoreRetain(String filter)
+   {
+      return MQTT_RETAIN_ADDRESS_PREFIX + swapMQTTAndCoreWildCards(filter);
+   }
+
+   public static String swapMQTTAndCoreWildCards(String filter)
+   {
+      char[] topicFilter = filter.toCharArray();
+      for (int i = 0; i < topicFilter.length; i++)
+      {
+         switch (topicFilter[i])
+         {
+            case '/':
+               topicFilter[i] = '.'; break;
+            case '.':
+               topicFilter[i] = '/'; break;
+            case '*':
+               topicFilter[i] = '+'; break;
+            case '+':
+               topicFilter[i] = '*'; break;
+            default:
+               break;
+         }
+      }
+      return String.valueOf(topicFilter);
+   }
+
+   private static ServerMessage createServerMessage(MQTTSession session, SimpleString address, boolean retain, int qos)
+   {
+      long id = session.getServer().getStorageManager().generateID();
+
+      ServerMessageImpl message = new ServerMessageImpl(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE);
+      message.setAddress(address);
+      message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
+      message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
+      return message;
+   }
+
+   public static ServerMessage createServerMessageFromByteBuf(MQTTSession session, String topic, boolean retain, int qos, ByteBuf payload)
+   {
+      String coreAddress = convertMQTTAddressFilterToCore(topic);
+      ServerMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos);
+
+      // FIXME does this involve a copy?
+      message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), payload.readableBytes());
+      return message;
+   }
+
+   public static ServerMessage createServerMessageFromString(MQTTSession session, String payload, String topic, int qos, boolean retain)
+   {
+      ServerMessage message = createServerMessage(session, new SimpleString(topic), retain, qos);
+      message.getBodyBuffer().writeString(payload);
+      return message;
+   }
+
+   public static ServerMessage createPubRelMessage(MQTTSession session, SimpleString address, int messageId)
+   {
+      ServerMessage message = createServerMessage(session, address, false, 1);
+      message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId);
+      message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_TYPE_KEY), MqttMessageType.PUBREL.value());
+      return message;
+   }
+   public static void logMessage(MQTTLogger logger, MqttMessage message, boolean inbound)
+   {
+      StringBuilder log = inbound ? new StringBuilder("Received ") : new StringBuilder("Sent ");
+
+      if (message.fixedHeader() != null)
+      {
+         log.append(message.fixedHeader().messageType().toString());
+
+         if (message.variableHeader() instanceof MqttPublishVariableHeader)
+         {
+            log.append("(" + ((MqttPublishVariableHeader) message.variableHeader()).messageId() + ") " + message.fixedHeader().qosLevel());
+         }
+         else if (message.variableHeader() instanceof MqttMessageIdVariableHeader)
+         {
+            log.append("(" + ((MqttMessageIdVariableHeader) message.variableHeader()).messageId() + ")");
+         }
+
+         if (message.fixedHeader().messageType() == MqttMessageType.SUBSCRIBE)
+         {
+            for (MqttTopicSubscription sub : ((MqttSubscribeMessage) message).payload().topicSubscriptions())
+            {
+               log.append("\n\t" + sub.topicName() + " : " + sub.qualityOfService());
+            }
+         }
+
+         logger.debug(log.toString());
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory b/artemis-protocols/artemis-mqtt-protocol/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory
new file mode 100644
index 0000000..2c1103f
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/resources/META-INF/services/org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory
@@ -0,0 +1 @@
+org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManagerFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-protocols/pom.xml b/artemis-protocols/pom.xml
index d052e43..5ef449b 100644
--- a/artemis-protocols/pom.xml
+++ b/artemis-protocols/pom.xml
@@ -36,6 +36,7 @@
       <module>artemis-openwire-protocol</module>
       <module>artemis-proton-plug</module>
       <module>artemis-hornetq-protocol</module>
+      <module>artemis-mqtt-protocol</module>
    </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 93e0efe..28d48ef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,6 +166,23 @@
             <!-- License: CPL 1.0 -->
             <!-- There are newer versions of the JUnit but they break our tests -->
          </dependency>
+
+         <!-- ### For MQTT Tests -->
+         <dependency>
+            <groupId>org.fusesource.mqtt-client</groupId>
+            <artifactId>mqtt-client</artifactId>
+            <version>1.10</version>
+            <scope>test</scope>
+            <!-- Apache v2.0 License -->
+         </dependency>
+         <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>mqtt-client</artifactId>
+            <version>0.4.1-SNAPSHOT</version>
+            <scope>test</scope>
+            <!-- Eclipse Public License - v 1.0 -->
+         </dependency>
+
          <!-- ## End Test Dependencies ## -->
 
          <!-- ### Build Time Dependencies ### -->
@@ -302,6 +319,12 @@
             <!-- License: Apache 2.0 -->
          </dependency>
          <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec-mqtt</artifactId>
+            <version>5.0.0.Alpha2</version>
+            <!-- License: Apache 2.0 -->
+         </dependency>
+         <dependency>
             <groupId>org.apache.qpid</groupId>
             <artifactId>proton-j</artifactId>
             <version>${proton.version}</version>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 8d580d9..3e90194 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -32,6 +32,17 @@
       <vertx.testtools.version>2.0.3-final</vertx.testtools.version>
    </properties>
 
+   <repositories>
+       <!-- for the paho dependency -->
+       <repository>
+           <id>eclipse.m2</id>
+           <url>https://repo.eclipse.org/content/groups/snapshots/</url>
+           <releases><enabled>false</enabled></releases>
+           <snapshots><enabled>true</enabled></snapshots>
+       </repository>
+   </repositories>
+
+
    <dependencies>
       <dependency>
          <groupId>org.apache.activemq</groupId>
@@ -123,6 +134,32 @@
       </dependency>
       <dependency>
          <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-hornetq-protocol</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+
+      <!-- MQTT Deps -->
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-mqtt-protocol</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.fusesource.mqtt-client</groupId>
+         <artifactId>mqtt-client</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.eclipse.paho</groupId>
+          <artifactId>mqtt-client</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>io.netty</groupId>
+         <artifactId>netty-codec-mqtt</artifactId>
+      </dependency>
+      <!-- END MQTT Deps -->
+
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
          <artifactId>artemis-aerogear-integration</artifactId>
          <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/FuseMQTTClientProvider.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/FuseMQTTClientProvider.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/FuseMQTTClientProvider.java
new file mode 100644
index 0000000..f5dbe30
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/FuseMQTTClientProvider.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+
+import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
+
+public class FuseMQTTClientProvider implements MQTTClientProvider
+{
+   private final MQTT mqtt = new MQTT();
+   private BlockingConnection connection;
+
+   @Override
+   public void connect(String host) throws Exception
+   {
+      mqtt.setHost(host);
+      mqtt.setVersion("3.1.1");
+      // shut off connect retry
+      mqtt.setConnectAttemptsMax(0);
+      mqtt.setReconnectAttemptsMax(0);
+      connection = mqtt.blockingConnection();
+
+      connection.connect();
+   }
+
+   @Override
+   public void disconnect() throws Exception
+   {
+      if (this.connection != null)
+      {
+         this.connection.disconnect();
+      }
+   }
+
+   @Override
+   public void publish(String topic, byte[] payload, int qos) throws Exception
+   {
+      publish(topic, payload, qos, false);
+   }
+
+   @Override
+   public void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception
+   {
+      connection.publish(topic, payload, QoS.values()[qos], retained);
+   }
+
+   @Override
+   public void subscribe(String topic, int qos) throws Exception
+   {
+      Topic[] topics = {new Topic(utf8(topic), QoS.values()[qos])};
+      connection.subscribe(topics);
+   }
+
+   @Override
+   public void unsubscribe(String topic) throws Exception
+   {
+      connection.unsubscribe(new String[]{topic});
+   }
+
+   @Override
+   public byte[] receive(int timeout) throws Exception
+   {
+      byte[] result = null;
+      Message message = connection.receive(timeout, TimeUnit.MILLISECONDS);
+      if (message != null)
+      {
+         result = message.getPayload();
+         message.ack();
+      }
+      return result;
+   }
+
+   @Override
+   public void setSslContext(SSLContext sslContext)
+   {
+      mqtt.setSslContext(sslContext);
+   }
+
+   @Override
+   public void setWillMessage(String string)
+   {
+      mqtt.setWillMessage(string);
+   }
+
+   @Override
+   public void setWillTopic(String topic)
+   {
+      mqtt.setWillTopic(topic);
+   }
+
+   @Override
+   public void setClientId(String clientId)
+   {
+      mqtt.setClientId(clientId);
+   }
+
+   @Override
+   public void kill() throws Exception
+   {
+      connection.kill();
+   }
+
+   @Override
+   public void setKeepAlive(int keepAlive) throws Exception
+   {
+      mqtt.setKeepAlive((short) keepAlive);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTClientProvider.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTClientProvider.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTClientProvider.java
new file mode 100644
index 0000000..f26bfb8
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTClientProvider.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+public interface MQTTClientProvider
+{
+   void connect(String host) throws Exception;
+
+   void disconnect() throws Exception;
+
+   void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception;
+
+   void publish(String topic, byte[] payload, int qos) throws Exception;
+
+   void subscribe(String topic, int qos) throws Exception;
+
+   void unsubscribe(String topic) throws Exception;
+
+   byte[] receive(int timeout) throws Exception;
+
+   void setSslContext(javax.net.ssl.SSLContext sslContext);
+
+   void setWillMessage(String string);
+
+   void setWillTopic(String topic);
+
+   void setClientId(String clientId);
+
+   void kill() throws Exception;
+
+   void setKeepAlive(int keepAlive) throws Exception;
+
+}


[4/5] activemq-artemis git commit: Added Initial MQTT Protocol Support

Posted by cl...@apache.org.
Added Initial MQTT Protocol Support


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0f82ca75
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0f82ca75
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0f82ca75

Branch: refs/heads/master
Commit: 0f82ca754bebcd739dfcce37707bc1d0c4b132ef
Parents: 077e9e2
Author: Martyn Taylor <mt...@redhat.com>
Authored: Mon Jul 6 17:01:08 2015 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Jul 9 11:08:04 2015 +0100

----------------------------------------------------------------------
 artemis-distribution/pom.xml                    |    9 +
 artemis-distribution/src/main/assembly/dep.xml  |    2 +
 artemis-protocols/artemis-mqtt-protocol/pom.xml |   56 +
 .../core/protocol/mqtt/MQTTConnection.java      |  241 +++
 .../protocol/mqtt/MQTTConnectionManager.java    |  205 ++
 .../core/protocol/mqtt/MQTTFailureListener.java |   47 +
 .../artemis/core/protocol/mqtt/MQTTLogger.java  |   43 +
 .../core/protocol/mqtt/MQTTMessageInfo.java     |   57 +
 .../core/protocol/mqtt/MQTTProtocolHandler.java |  362 ++++
 .../core/protocol/mqtt/MQTTProtocolManager.java |  140 ++
 .../mqtt/MQTTProtocolManagerFactory.java        |   58 +
 .../core/protocol/mqtt/MQTTPublishManager.java  |  270 +++
 .../protocol/mqtt/MQTTRetainMessageManager.java |   98 +
 .../artemis/core/protocol/mqtt/MQTTSession.java |  173 ++
 .../core/protocol/mqtt/MQTTSessionCallback.java |  111 ++
 .../core/protocol/mqtt/MQTTSessionState.java    |  250 +++
 .../protocol/mqtt/MQTTSubscriptionManager.java  |  183 ++
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |  178 ++
 ...mis.spi.core.protocol.ProtocolManagerFactory |    1 +
 artemis-protocols/pom.xml                       |    1 +
 pom.xml                                         |   23 +
 tests/integration-tests/pom.xml                 |   37 +
 .../mqtt/imported/FuseMQTTClientProvider.java   |  131 ++
 .../mqtt/imported/MQTTClientProvider.java       |   48 +
 .../integration/mqtt/imported/MQTTTest.java     | 1747 ++++++++++++++++++
 .../mqtt/imported/MQTTTestSupport.java          |  376 ++++
 .../integration/mqtt/imported/PahoMQTTTest.java |  175 ++
 .../util/ResourceLoadingSslContext.java         |  284 +++
 .../integration/mqtt/imported/util/Wait.java    |   56 +
 29 files changed, 5362 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-distribution/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-distribution/pom.xml b/artemis-distribution/pom.xml
index ebde324..d97f1a5 100644
--- a/artemis-distribution/pom.xml
+++ b/artemis-distribution/pom.xml
@@ -124,6 +124,11 @@
        </dependency>
       <dependency>
          <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-mqtt-protocol</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
          <artifactId>artemis-native</artifactId>
          <version>${project.version}</version>
       </dependency>
@@ -176,6 +181,10 @@
          <version>${project.version}</version>
          <classifier>javadoc</classifier>
       </dependency>
+      <dependency>
+         <groupId>io.netty</groupId>
+         <artifactId>netty-codec-mqtt</artifactId>
+      </dependency>
    </dependencies>
 
    <build>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-distribution/src/main/assembly/dep.xml
----------------------------------------------------------------------
diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml
index febc8ec..084d248 100644
--- a/artemis-distribution/src/main/assembly/dep.xml
+++ b/artemis-distribution/src/main/assembly/dep.xml
@@ -60,6 +60,7 @@
             <include>org.apache.activemq:artemis-proton-plug</include>
             <include>org.apache.activemq:artemis-hornetq-protocol</include>
             <include>org.apache.activemq:artemis-stomp-protocol</include>
+            <include>org.apache.activemq:artemis-mqtt-protocol</include>
             <include>org.apache.activemq:artemis-ra</include>
             <include>org.apache.activemq:artemis-selector</include>
             <include>org.apache.activemq:artemis-server</include>
@@ -86,6 +87,7 @@
             <include>commons-collections:commons-collections</include>
             <include>org.fusesource.hawtbuf:hawtbuf</include>
             <include>org.jgroups:jgroups</include>
+            <include>io.netty:netty-codec-mqtt</include>
          </includes>
          <!--excludes>
             <exclude>org.apache.activemq:artemis-website</exclude>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/pom.xml b/artemis-protocols/artemis-mqtt-protocol/pom.xml
new file mode 100644
index 0000000..d566d88
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/pom.xml
@@ -0,0 +1,56 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+   <parent>
+      <artifactId>artemis-protocols</artifactId>
+      <groupId>org.apache.activemq</groupId>
+      <version>1.0.1-SNAPSHOT</version>
+   </parent>
+   <modelVersion>4.0.0</modelVersion>
+
+   <artifactId>artemis-mqtt-protocol</artifactId>
+
+   <properties>
+      <activemq.basedir>${project.basedir}/../..</activemq.basedir>
+   </properties>
+
+   <dependencies>
+      <dependency>
+         <groupId>org.jboss.logging</groupId>
+         <artifactId>jboss-logging-processor</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss.logging</groupId>
+         <artifactId>jboss-logging</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-server</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>io.netty</groupId>
+         <artifactId>netty-codec-mqtt</artifactId>
+      </dependency>
+       <dependency>
+           <groupId>junit</groupId>
+           <artifactId>junit</artifactId>
+       </dependency>
+   </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
new file mode 100644
index 0000000..08dd157
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+
+public class MQTTConnection implements RemotingConnection
+{
+   private final Connection transportConnection;
+
+   private final long creationTime;
+
+   private AtomicBoolean dataReceived;
+
+   private boolean destroyed;
+
+   private boolean connected;
+
+   private final List<FailureListener> failureListeners = Collections.synchronizedList(new ArrayList<FailureListener>());
+
+   private final List<CloseListener> closeListeners = Collections.synchronizedList(new ArrayList<CloseListener>());
+
+   public MQTTConnection(Connection transportConnection) throws Exception
+   {
+      this.transportConnection = transportConnection;
+      this.creationTime = System.currentTimeMillis();
+      this.dataReceived = new AtomicBoolean();
+      this.destroyed = false;
+   }
+
+   public Object getID()
+   {
+      return transportConnection.getID();
+   }
+
+   @Override
+   public long getCreationTime()
+   {
+      return creationTime;
+   }
+
+   @Override
+   public String getRemoteAddress()
+   {
+      return transportConnection.getRemoteAddress();
+   }
+
+   @Override
+   public void addFailureListener(FailureListener listener)
+   {
+      failureListeners.add(listener);
+   }
+
+   @Override
+   public boolean removeFailureListener(FailureListener listener)
+   {
+      return failureListeners.remove(listener);
+   }
+
+   @Override
+   public void addCloseListener(CloseListener listener)
+   {
+      closeListeners.add(listener);
+   }
+
+   @Override
+   public boolean removeCloseListener(CloseListener listener)
+   {
+      return closeListeners.remove(listener);
+   }
+
+   @Override
+   public List<CloseListener> removeCloseListeners()
+   {
+      synchronized (closeListeners)
+      {
+         List<CloseListener> deletedCloseListeners = new ArrayList<CloseListener>(closeListeners);
+         closeListeners.clear();
+         return deletedCloseListeners;
+      }
+   }
+
+   @Override
+   public void setCloseListeners(List<CloseListener> listeners)
+   {
+      closeListeners.addAll(listeners);
+   }
+
+   @Override
+   public List<FailureListener> getFailureListeners()
+   {
+      return failureListeners;
+   }
+
+   @Override
+   public List<FailureListener> removeFailureListeners()
+   {
+      synchronized (failureListeners)
+      {
+         List<FailureListener> deletedFailureListeners = new ArrayList<FailureListener>(failureListeners);
+         failureListeners.clear();
+         return deletedFailureListeners;
+      }
+   }
+
+   @Override
+   public void setFailureListeners(List<FailureListener> listeners)
+   {
+      synchronized (failureListeners)
+      {
+         failureListeners.clear();
+         failureListeners.addAll(listeners);
+      }
+   }
+
+   @Override
+   public ActiveMQBuffer createTransportBuffer(int size)
+   {
+      return transportConnection.createTransportBuffer(size);
+   }
+
+   @Override
+   public void fail(ActiveMQException me)
+   {
+      synchronized (failureListeners)
+      {
+         for (FailureListener listener : failureListeners)
+         {
+            listener.connectionFailed(me, false);
+         }
+      }
+   }
+
+   @Override
+   public void fail(ActiveMQException me, String scaleDownTargetNodeID)
+   {
+      synchronized (failureListeners)
+      {
+         for (FailureListener listener : failureListeners)
+         {
+            //FIXME(mtaylor) How do we check if the node has failed over?
+            listener.connectionFailed(me, false);
+         }
+      }
+   }
+
+   @Override
+   public void destroy()
+   {
+      //TODO(mtaylor) ensure this properly destroys this connection.
+      destroyed = true;
+      disconnect(false);
+   }
+
+   @Override
+   public Connection getTransportConnection()
+   {
+      return transportConnection;
+   }
+
+   @Override
+   public boolean isClient()
+   {
+      return false;
+   }
+
+   @Override
+   public boolean isDestroyed()
+   {
+      return destroyed;
+   }
+
+   @Override
+   public void disconnect(boolean criticalError)
+   {
+      transportConnection.forceClose();
+   }
+
+   @Override
+   public void disconnect(String scaleDownNodeID, boolean criticalError)
+   {
+      transportConnection.forceClose();
+   }
+
+   protected void dataReceived()
+   {
+      dataReceived.set(true);
+   }
+
+   @Override
+   public boolean checkDataReceived()
+   {
+      return dataReceived.compareAndSet(true, false);
+   }
+
+   @Override
+   public void flush()
+   {
+      transportConnection.checkFlushBatchBuffer();
+   }
+
+   @Override
+   public void bufferReceived(Object connectionID, ActiveMQBuffer buffer)
+   {
+   }
+
+   public void setConnected(boolean connected)
+   {
+      this.connected = connected;
+   }
+
+   public boolean getConnected()
+   {
+      return connected;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
new file mode 100644
index 0000000..e4433d2
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * MQTTConnectionMananager is responsible for handle Connect and Disconnect packets and any resulting behaviour of these
+ * events.
+ */
+public class MQTTConnectionManager
+{
+   private MQTTSession session;
+
+   //TODO Read in a list of existing client IDs from stored Sessions.
+   public static Set<String> CONNECTED_CLIENTS = new ConcurrentHashSet<String>();
+
+   private MQTTLogger log = MQTTLogger.LOGGER;
+
+   public MQTTConnectionManager(MQTTSession session)
+   {
+      this.session = session;
+      MQTTFailureListener failureListener = new MQTTFailureListener(this);
+      session.getConnection().addFailureListener(failureListener);
+   }
+
+   /**
+    * Handles the connect packet.  See spec for details on each of parameters.
+    */
+   synchronized void connect(String cId, String username, String password, boolean will, String willMessage, String willTopic,
+                boolean willRetain, int willQosLevel, boolean cleanSession) throws Exception
+   {
+      String clientId = validateClientId(cId, cleanSession);
+      if (clientId == null)
+      {
+         session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
+         session.getProtocolHandler().disconnect();
+         return;
+      }
+
+      session.setSessionState(getSessionState(clientId, cleanSession));
+
+      ServerSessionImpl serverSession = createServerSession(username, password);
+      serverSession.start();
+
+      session.setServerSession(serverSession);
+
+      if (will)
+      {
+         ServerMessage w = MQTTUtil.createServerMessageFromString(session, willMessage, willTopic, willQosLevel, willRetain);
+         session.getSessionState().setWillMessage(w);
+      }
+
+      session.getConnection().setConnected(true);
+      session.start();
+      session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED);
+   }
+
+   /**
+    * Creates an internal Server Session.
+    * @param username
+    * @param password
+    * @return
+    * @throws Exception
+    */
+   ServerSessionImpl createServerSession(String username, String password) throws Exception
+   {
+      String id = UUIDGenerator.getInstance().generateStringUUID();
+      ActiveMQServer server =  session.getServer();
+
+      ServerSession serverSession = server.createSession(id,
+            username,
+            password,
+            ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+            session.getConnection(),
+            MQTTUtil.SESSION_AUTO_COMMIT_SENDS,
+            MQTTUtil.SESSION_AUTO_COMMIT_ACKS,
+            MQTTUtil.SESSION_PREACKNOWLEDGE,
+            MQTTUtil.SESSION_XA,
+            null,
+            session.getSessionCallback(),
+            null, // Session factory
+            MQTTUtil.SESSION_AUTO_CREATE_QUEUE);
+      return (ServerSessionImpl) serverSession;
+   }
+
+   void disconnect()
+   {
+      try
+      {
+         if (session != null && session.getSessionState() != null)
+         {
+            String clientId = session.getSessionState().getClientId();
+            if (clientId != null) CONNECTED_CLIENTS.remove(clientId);
+
+            if (session.getState().isWill())
+            {
+               session.getConnectionManager().sendWill();
+            }
+         }
+         session.stop();
+         session.getConnection().disconnect(false);
+         session.getConnection().destroy();
+      }
+      catch (Exception e)
+      {
+         /* FIXME Failure during disconnect would leave the session state in an unrecoverable state.  We should handle
+         errors more gracefully.
+          */
+         log.error("Error disconnecting client: " + e.getMessage());
+      }
+   }
+
+
+   private void sendWill() throws Exception
+   {
+      session.getServerSession().send(session.getSessionState().getWillMessage(), true);
+      session.getSessionState().deleteWillMessage();
+   }
+
+   private MQTTSessionState getSessionState(String clientId, boolean cleanSession) throws InterruptedException
+   {
+      synchronized (MQTTSession.SESSIONS)
+      {
+         /* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
+          * start a new one  This Session lasts as long as the Network Connection. State data associated with this Session
+          * MUST NOT be reused in any subsequent Session */
+         if (cleanSession)
+         {
+            MQTTSession.SESSIONS.remove(clientId);
+            return new MQTTSessionState(clientId);
+         }
+         else
+         {
+            /* [MQTT-3.1.2-4] Attach an existing session if one exists (if cleanSession flag is false) otherwise create
+            a new one. */
+            MQTTSessionState state = MQTTSession.SESSIONS.get(clientId);
+            if (state != null)
+            {
+               // TODO Add a count down latch for handling wait during attached session state.
+               while (state.getAttached())
+               {
+                  Thread.sleep(1000);
+               }
+               return  state;
+            }
+            else
+            {
+               state = new MQTTSessionState(clientId);
+               MQTTSession.SESSIONS.put(clientId, state);
+               return state;
+            }
+         }
+      }
+   }
+
+   private String validateClientId(String clientId, boolean cleanSession)
+   {
+      if (clientId == null || clientId.isEmpty())
+      {
+         // [MQTT-3.1.3-7] [MQTT-3.1.3-6] If client does not specify a client ID and clean session is set to 1 create it.
+         if (cleanSession)
+         {
+            clientId = UUID.randomUUID().toString();
+         }
+         else
+         {
+            // [MQTT-3.1.3-8] Return ID rejected and disconnect if clean session = false and client id is null
+            return null;
+         }
+      }
+      // If the client ID is not unique (i.e. it has already registered) then do not accept it.
+      else if (!CONNECTED_CLIENTS.add(clientId))
+      {
+         // [MQTT-3.1.3-9] Return ID Rejected if server rejects the client ID
+         return null;
+      }
+      return clientId;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTFailureListener.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTFailureListener.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTFailureListener.java
new file mode 100644
index 0000000..b33bc5e
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTFailureListener.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+
+/**
+ * Registered with the server and called during connection failure.  This class informs the ConnectionManager when a
+ * connection failure has occurred, which subsequently cleans up any connection data.
+ */
+public class MQTTFailureListener implements FailureListener
+{
+   private MQTTConnectionManager connectionManager;
+
+   public MQTTFailureListener(MQTTConnectionManager connectionManager)
+   {
+      this.connectionManager = connectionManager;
+   }
+
+   @Override
+   public void connectionFailed(ActiveMQException exception, boolean failedOver)
+   {
+      connectionManager.disconnect();
+   }
+
+   @Override
+   public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID)
+   {
+      connectionManager.disconnect();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
new file mode 100644
index 0000000..ab3b221
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.jboss.logging.BasicLogger;
+import org.jboss.logging.Logger;
+import org.jboss.logging.annotations.MessageLogger;
+
+/**
+ * Logger Code 83
+ *
+ * each message id must be 6 digits long starting with 10, the 3rd digit donates the level so
+ *
+ * INF0  1
+ * WARN  2
+ * DEBUG 3
+ * ERROR 4
+ * TRACE 5
+ * FATAL 6
+ *
+ * so an INFO message would be 101000 to 101999
+ */
+
+@MessageLogger(projectCode = "AMQ")
+public interface MQTTLogger extends BasicLogger
+{
+   MQTTLogger LOGGER = Logger.getMessageLogger(MQTTLogger.class, MQTTLogger.class.getPackage().getName());
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTMessageInfo.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTMessageInfo.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTMessageInfo.java
new file mode 100644
index 0000000..e20119d
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTMessageInfo.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+/**
+ * MQTT Acks only hold message ID information.  From this we must infer the internal message ID and consumer.
+ */
+class MQTTMessageInfo
+{
+   private long serverMessageId;
+
+   private long consumerId;
+
+   private String address;
+
+   MQTTMessageInfo(long serverMessageId, long consumerId, String address)
+   {
+      this.serverMessageId = serverMessageId;
+      this.consumerId = consumerId;
+      this.address = address;
+   }
+
+   long getServerMessageId()
+   {
+      return serverMessageId;
+   }
+
+   long getConsumerId()
+   {
+      return consumerId;
+   }
+
+   String getAddress()
+   {
+      return address;
+   }
+
+   public String toString()
+   {
+      return ("ServerMessageId: " + serverMessageId + " ConsumerId: " + consumerId + " addr: " + address);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
new file mode 100644
index 0000000..37610f8
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.mqtt.MqttConnAckMessage;
+import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
+import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessage;
+import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttPubAckMessage;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.handler.codec.mqtt.MqttSubAckMessage;
+import io.netty.handler.codec.mqtt.MqttSubAckPayload;
+import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
+import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+
+/**
+ * This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the
+ * MQTTConnectionManager, MQTTPublishMananger, MQTTSubscriptionManager classes.
+ */
+public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
+{
+   private ConnectionEntry connectionEntry;
+
+   private MQTTConnection connection;
+
+   private MQTTSession session;
+
+   private ActiveMQServer server;
+
+   // This Channel Handler is not sharable, therefore it can only ever be associated with a single ctx.
+   private ChannelHandlerContext ctx;
+
+   private final MQTTLogger log  = MQTTLogger.LOGGER;;
+
+   private boolean stopped = false;
+
+   public MQTTProtocolHandler(ActiveMQServer server)
+   {
+      this.server = server;
+   }
+
+   void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception
+   {
+      this.connectionEntry = entry;
+      this.connection = connection;
+      this.session = new MQTTSession(this, connection);
+   }
+
+
+   void stop(boolean error)
+   {
+      stopped = true;
+   }
+
+   public void channelRead(ChannelHandlerContext ctx, Object msg)
+   {
+      try
+      {
+         if (stopped)
+         {
+            disconnect();
+            return;
+         }
+
+         MqttMessage message = (MqttMessage) msg;
+
+         // Disconnect if Netty codec failed to decode the stream.
+         if (message.decoderResult().isFailure())
+         {
+            log.debug("Bad Message Disconnecting Client.");
+            disconnect();
+            return;
+         }
+
+         connection.dataReceived();
+
+         MQTTUtil.logMessage(log, message, true);
+
+         switch (message.fixedHeader().messageType())
+         {
+            case CONNECT:
+               handleConnect((MqttConnectMessage) message, ctx);
+               break;
+            case CONNACK:
+               handleConnack((MqttConnAckMessage) message);
+               break;
+            case PUBLISH:
+               handlePublish((MqttPublishMessage) message);
+               break;
+            case PUBACK:
+               handlePuback((MqttPubAckMessage) message);
+               break;
+            case PUBREC:
+               handlePubrec(message);
+               break;
+            case PUBREL:
+               handlePubrel(message);
+               break;
+            case PUBCOMP:
+               handlePubcomp(message);
+               break;
+            case SUBSCRIBE:
+               handleSubscribe((MqttSubscribeMessage) message, ctx);
+               break;
+            case SUBACK:
+               handleSuback((MqttSubAckMessage) message);
+               break;
+            case UNSUBSCRIBE:
+               handleUnsubscribe((MqttUnsubscribeMessage) message);
+               break;
+            case UNSUBACK:
+               handleUnsuback((MqttUnsubAckMessage) message);
+               break;
+            case PINGREQ:
+               handlePingreq(message, ctx);
+               break;
+            case PINGRESP:
+               handlePingresp(message);
+               break;
+            case DISCONNECT:
+               handleDisconnect(message);
+               break;
+            default:
+               disconnect();
+         }
+      }
+      catch (Exception e)
+      {
+         log.debug("Error processing Control Packet, Disconnecting Client" + e.getMessage());
+         disconnect();
+      }
+   }
+
+   /**
+    * Called during connection.
+    *
+    * @param connect
+    */
+   void handleConnect(MqttConnectMessage connect, ChannelHandlerContext ctx) throws Exception
+   {
+      this.ctx = ctx;
+      connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() * 750;
+
+      String clientId = connect.payload().clientIdentifier();
+      session.getConnectionManager().connect(clientId,
+            connect.payload().userName(),
+            connect.payload().password(),
+            connect.variableHeader().isWillFlag(),
+            connect.payload().willMessage(),
+            connect.payload().willTopic(),
+            connect.variableHeader().isWillRetain(),
+            connect.variableHeader().willQos(),
+            connect.variableHeader().isCleanSession());
+   }
+
+   void disconnect()
+   {
+      session.getConnectionManager().disconnect();
+   }
+
+   void sendConnack(MqttConnectReturnCode returnCode)
+   {
+      MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK,
+                                                        false,
+                                                        MqttQoS.AT_MOST_ONCE,
+                                                        false,
+                                                        0);
+      MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode);
+      MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader);
+
+      ctx.write(message);
+      ctx.flush();
+   }
+
+   /**
+    * The server does not instantiate connections therefore any CONNACK received over a connection is an invalid
+    * control message.
+    * @param message
+    */
+   void handleConnack(MqttConnAckMessage message)
+   {
+      log.debug("Received invalid CONNACK from client: " + session.getSessionState().getClientId());
+      log.debug("Disconnecting client: " + session.getSessionState().getClientId());
+      disconnect();
+   }
+
+   void handlePublish(MqttPublishMessage message) throws Exception
+   {
+      session.getMqttPublishManager().handleMessage(message.variableHeader().messageId(),
+            message.variableHeader().topicName(),
+            message.fixedHeader().qosLevel().value(),
+            message.payload(),
+            message.fixedHeader().isRetain());
+   }
+
+   void sendPubAck(int messageId)
+   {
+      sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBACK);
+   }
+
+   void sendPubRel(int messageId)
+   {
+      sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBREL);
+   }
+
+   void sendPubRec(int messageId)
+   {
+      sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBREC);
+   }
+
+   void sendPubComp(int messageId)
+   {
+      sendPublishProtocolControlMessage(messageId, MqttMessageType.PUBCOMP);
+   }
+
+   void sendPublishProtocolControlMessage(int messageId, MqttMessageType messageType)
+   {
+      MqttQoS qos = (messageType == MqttMessageType.PUBREL) ? MqttQoS.AT_LEAST_ONCE : MqttQoS.AT_MOST_ONCE;
+      MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType,
+                                                        false,
+                                                        qos, // Spec requires 01 in header for rel
+                                                        false,
+                                                        0);
+      MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId));
+      ctx.write(rel);
+      ctx.flush();
+   }
+
+   void handlePuback(MqttPubAckMessage message) throws Exception
+   {
+      session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId());
+   }
+
+   void handlePubrec(MqttMessage message) throws Exception
+   {
+      int messageId =  ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
+      session.getMqttPublishManager().handlePubRec(messageId);
+   }
+
+   void handlePubrel(MqttMessage message)
+   {
+      int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
+      session.getMqttPublishManager().handlePubRel(messageId);
+   }
+
+   void handlePubcomp( MqttMessage message) throws Exception
+   {
+      int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
+      session.getMqttPublishManager().handlePubComp(messageId);
+   }
+
+   void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception
+   {
+      MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager();
+      int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions());
+
+      MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK,
+                                                   false,
+                                                   MqttQoS.AT_MOST_ONCE,
+                                                   false,
+                                                   0);
+      MqttSubAckMessage ack = new MqttSubAckMessage(header,
+                                                    message.variableHeader(),
+                                                    new MqttSubAckPayload(qos));
+      ctx.write(ack);
+      ctx.flush();
+   }
+
+   void handleSuback(MqttSubAckMessage message)
+   {
+      disconnect();
+   }
+
+   void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception
+   {
+      session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
+      MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK,
+                                                   false,
+                                                   MqttQoS.AT_MOST_ONCE,
+                                                   false,
+                                                   0);
+      MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
+      ctx.write(m);
+      ctx.flush();
+   }
+
+   void handleUnsuback(MqttUnsubAckMessage message)
+   {
+      disconnect();
+   }
+
+   void handlePingreq(MqttMessage message, ChannelHandlerContext ctx)
+   {
+      ctx.write(new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP,
+                                                            false,
+                                                            MqttQoS.AT_MOST_ONCE,
+                                                            false,
+                                                            0)));
+      ctx.flush();
+   }
+
+   void handlePingresp(MqttMessage message)
+   {
+      disconnect();
+   }
+
+   void handleDisconnect(MqttMessage message)
+   {
+      if (session.getSessionState() != null) session.getState().deleteWillMessage();
+      disconnect();
+   }
+
+
+   protected int send(int messageId, String topicName, int qosLevel,  ByteBuf payload, int deliveryCount)
+   {
+      boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0);
+      MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH,
+                                                   redelivery,
+                                                   MqttQoS.valueOf(qosLevel),
+                                                   false,
+                                                   0);
+      MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
+      MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
+
+      ctx.write(publish);
+      ctx.flush();
+
+      return 1;
+   }
+
+   ActiveMQServer getServer()
+   {
+      return server;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
new file mode 100644
index 0000000..b92a09f
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.mqtt.MqttDecoder;
+import io.netty.handler.codec.mqtt.MqttEncoder;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.management.Notification;
+import org.apache.activemq.artemis.core.server.management.NotificationListener;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+
+import java.util.List;
+
+/**
+ * MQTTProtocolManager
+ */
+class MQTTProtocolManager implements ProtocolManager, NotificationListener
+{
+   private ActiveMQServer server;
+
+   private MQTTLogger log = MQTTLogger.LOGGER;
+
+   public MQTTProtocolManager(ActiveMQServer server)
+   {
+      this.server = server;
+   }
+
+   @Override
+   public void onNotification(Notification notification)
+   {
+      // TODO handle notifications
+   }
+
+
+   @Override
+   public ProtocolManagerFactory getFactory()
+   {
+      return new MQTTProtocolManagerFactory();
+   }
+
+   @Override
+   public void updateInterceptors(List incomingInterceptors, List outgoingInterceptors)
+   {
+      // TODO handle interceptors
+   }
+
+   @Override
+   public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection)
+   {
+      try
+      {
+         MQTTConnection  mqttConnection = new MQTTConnection(connection);
+         ConnectionEntry entry = new ConnectionEntry(mqttConnection,
+                                                     null,
+                                                     System.currentTimeMillis(),
+                                                     MQTTUtil.DEFAULT_KEEP_ALIVE_FREQUENCY);
+
+         NettyServerConnection nettyConnection = ((NettyServerConnection) connection);
+         MQTTProtocolHandler protocolHandler = nettyConnection.getChannel().pipeline().get(MQTTProtocolHandler.class);
+         protocolHandler.setConnection(mqttConnection, entry);
+         return entry;
+      }
+      catch (Exception e)
+      {
+         log.error(e);
+         return null;
+      }
+   }
+
+   @Override
+   public void removeHandler(String name)
+   {
+      // TODO add support for handlers
+   }
+
+   @Override
+   public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer)
+   {
+      connection.bufferReceived(connection.getID(), buffer);
+   }
+
+   @Override
+   public void addChannelHandlers(ChannelPipeline pipeline)
+   {
+      pipeline.addLast(new MqttEncoder());
+      pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE));
+
+      pipeline.addLast(new MQTTProtocolHandler(server));
+   }
+
+   @Override
+   public boolean isProtocol(byte[] array)
+   {
+      boolean mqtt311 = array[4] == 77 && // M
+                        array[5] == 81 && // Q
+                        array[6] == 84 && // T
+                        array[7] == 84;   // T
+
+      // FIXME The actual protocol name is 'MQIsdp' (However we are only passed the first 4 bytes of the protocol name)
+      boolean mqtt31  = array[4] == 77  && // M
+                        array[5] == 81  && // Q
+                        array[6] == 73  && // I
+                        array[7] == 115;   // s
+      return mqtt311 || mqtt31;
+   }
+
+   @Override
+   public MessageConverter getConverter()
+   {
+      return null;
+   }
+
+   @Override
+   public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer)
+   {
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
new file mode 100644
index 0000000..7194d02
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import java.util.List;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
+
+public class MQTTProtocolManagerFactory implements ProtocolManagerFactory
+{
+   public static final String MQTT_PROTOCOL_NAME = "MQTT";
+
+   private static final String MODULE_NAME = "artemis-mqtt-protocol";
+
+   private static final String[] SUPPORTED_PROTOCOLS = {MQTT_PROTOCOL_NAME};
+
+   @Override
+   public ProtocolManager createProtocolManager(ActiveMQServer server, List incomingInterceptors, List outgoingInterceptors)
+   {
+      return new MQTTProtocolManager(server);
+   }
+
+   @Override
+   public List filterInterceptors(List list)
+   {
+      // TODO Add support for interceptors.
+      return null;
+   }
+
+   @Override
+   public String[] getProtocols()
+   {
+      return SUPPORTED_PROTOCOLS;
+   }
+
+   @Override
+   public String getModuleName()
+   {
+      return MODULE_NAME;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
new file mode 100644
index 0000000..aa3f9e0
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.EmptyByteBuf;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.journal.IOAsyncTask;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+
+/**
+ * Handles MQTT Exactly Once (QoS level 2) Protocol.
+ */
+public class MQTTPublishManager
+{
+   private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2.";
+
+   private SimpleString managementAddress;
+
+   private ServerConsumer managementConsumer;
+
+   private MQTTSession session;
+
+   private MQTTLogger log = MQTTLogger.LOGGER;
+
+   private final Object lock = new Object();
+
+   public MQTTPublishManager(MQTTSession session)
+   {
+      this.session = session;
+   }
+
+   synchronized void start() throws Exception
+   {
+      createManagementAddress();
+      createManagementQueue();
+      createManagementConsumer();
+   }
+
+   synchronized void stop(boolean clean) throws Exception
+   {
+      if (managementConsumer != null)
+      {
+         managementConsumer.removeItself();
+         managementConsumer.setStarted(false);
+         managementConsumer.close(false);
+         if (clean) session.getServer().destroyQueue(managementAddress);
+      }
+   }
+
+   private void createManagementConsumer() throws Exception
+   {
+      long consumerId = session.getServer().getStorageManager().generateID();
+      managementConsumer = session.getServerSession().createConsumer(consumerId, managementAddress, null, false, false, -1);
+      managementConsumer.setStarted(true);
+   }
+
+   private void createManagementAddress()
+   {
+      String clientId = session.getSessionState().getClientId();
+      managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + clientId);
+   }
+
+   private void createManagementQueue() throws Exception
+   {
+      if (session.getServer().locateQueue(managementAddress) == null)
+      {
+         session.getServerSession().createQueue(managementAddress, managementAddress, null, false, MQTTUtil.DURABLE_MESSAGES);
+      }
+   }
+
+   boolean isManagementConsumer(ServerConsumer consumer)
+   {
+      return consumer == managementConsumer;
+   }
+
+   private int generateMqttId(int qos)
+   {
+      if (qos == 1)
+      {
+         return session.getSessionState().generateId();
+      }
+      else
+      {
+         Integer mqttid = session.getSessionState().generateId();
+         if (mqttid == null)
+         {
+            mqttid = (int) session.getServer().getStorageManager().generateID();
+         }
+         return mqttid;
+      }
+   }
+
+   /** Since MQTT Subscriptions can over lap; a client may receive the same message twice.  When this happens the client
+    * returns a PubRec or PubAck with ID.  But we need to know which consumer to ack, since we only have the ID to go on we
+    * are not able to decide which consumer to ack.  Instead we send MQTT messages with different IDs and store a reference
+    * to original ID and consumer in the Session state.  This way we can look up the consumer Id and the message Id from
+    * the PubAck or PubRec message id. **/
+   protected void sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) throws Exception
+   {
+      // This is to allow retries of PubRel.
+      if (isManagementConsumer(consumer))
+      {
+         sendPubRelMessage(message);
+      }
+      else
+      {
+         int qos = decideQoS(message, consumer);
+         if (qos == 0)
+         {
+            sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos);
+            session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
+         }
+         else
+         {
+            String consumerAddress = consumer.getQueue().getAddress().toString();
+            Integer mqttid = generateMqttId(qos);
+
+            session.getSessionState().addOutbandMessageRef(mqttid, consumerAddress, message.getMessageID(), qos);
+            sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos);
+         }
+      }
+   }
+
+   // INBOUND
+   void handleMessage(int messageId, String topic, int qos, ByteBuf payload, boolean retain) throws Exception
+   {
+      synchronized (lock)
+      {
+         ServerMessage serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, retain, qos, payload);
+
+         if (qos > 0)
+         {
+            serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
+         }
+
+         if (qos < 2 || !session.getSessionState().getPubRec().contains(messageId))
+         {
+            if (qos == 2) session.getSessionState().getPubRec().add(messageId);
+            session.getServerSession().send(serverMessage, true);
+         }
+
+
+         if (retain)
+         {
+            boolean reset = payload instanceof EmptyByteBuf || payload.capacity() == 0;
+            session.getRetainMessageManager().handleRetainedMessage(serverMessage, topic, reset);
+         }
+
+         createMessageAck(messageId, qos);
+      }
+   }
+
+   void sendPubRelMessage(ServerMessage message)
+   {
+      if (message.getIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY) == MqttMessageType.PUBREL.value())
+      {
+         int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY);
+         MQTTMessageInfo messageInfo = new MQTTMessageInfo(message.getMessageID(), managementConsumer.getID(), message.getAddress().toString());
+         session.getSessionState().storeMessageRef(messageId, messageInfo, false);
+         session.getProtocolHandler().sendPubRel(messageId);
+      }
+   }
+
+   private void createMessageAck(final int messageId, final int qos)
+   {
+      session.getServer().getStorageManager().afterCompleteOperations(new IOAsyncTask()
+      {
+         @Override
+         public void done()
+         {
+            if (qos == 1)
+            {
+               session.getProtocolHandler().sendPubAck(messageId);
+            }
+            else if (qos == 2)
+            {
+               session.getProtocolHandler().sendPubRec(messageId);
+            }
+         }
+
+         @Override
+         public void onError(int errorCode, String errorMessage)
+         {
+            log.error("Pub Sync Failed");
+         }
+      });
+   }
+
+   void handlePubRec(int messageId) throws Exception
+   {
+      MQTTMessageInfo messageRef = session.getSessionState().getMessageInfo(messageId);
+      if (messageRef != null)
+      {
+         ServerMessage pubRel = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
+         session.getServerSession().send(pubRel, true);
+         session.getServerSession().acknowledge(messageRef.getConsumerId(), messageRef.getServerMessageId());
+         session.getProtocolHandler().sendPubRel(messageId);
+      }
+   }
+
+   void handlePubComp(int messageId) throws Exception
+   {
+      MQTTMessageInfo messageInfo = session.getSessionState().getMessageInfo(messageId);
+
+      // Check to see if this message is stored if not just drop the packet.
+      if (messageInfo != null)
+      {
+         session.getServerSession().acknowledge(managementConsumer.getID(), messageInfo.getServerMessageId());
+      }
+   }
+
+   void handlePubRel(int messageId)
+   {
+      // We don't check to see if a PubRel existed for this message.  We assume it did and so send PubComp.
+      session.getSessionState().getPubRec().remove(messageId);
+      session.getProtocolHandler().sendPubComp(messageId);
+      session.getSessionState().removeMessageRef(messageId);
+   }
+
+
+   void handlePubAck(int messageId) throws Exception
+   {
+      Pair<String, Long> pub1MessageInfo = session.getSessionState().removeOutbandMessageRef(messageId, 1);
+      if (pub1MessageInfo != null)
+      {
+         String mqttAddress = MQTTUtil.convertCoreAddressFilterToMQTT(pub1MessageInfo.getA());
+         ServerConsumer consumer = session.getSubscriptionManager().getConsumerForAddress(mqttAddress);
+         session.getServerSession().acknowledge(consumer.getID(), pub1MessageInfo.getB());
+      }
+   }
+
+   private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos)
+   {
+      String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString()).toString();
+
+      //FIXME should we be copying the body buffer here?
+      ByteBuf payload = message.getBodyBufferCopy().byteBuf();
+      session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount);
+   }
+
+   private int decideQoS(ServerMessage message, ServerConsumer consumer)
+   {
+      int subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
+      int qos = message.getIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY);
+
+      /* Subscription QoS is the maximum QoS the client is willing to receive for this subscription.  If the message QoS
+      is less than the subscription QoS then use it, otherwise use the subscription qos). */
+      return subscriptionQoS < qos ? subscriptionQoS : qos;
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
new file mode 100644
index 0000000..c1fd17f
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+
+import java.util.Iterator;
+
+
+public class MQTTRetainMessageManager
+{
+   private MQTTSession session;
+
+   public MQTTRetainMessageManager(MQTTSession session)
+   {
+      this.session = session;
+   }
+
+   /** FIXME
+   *  Retained messages should be handled in the core API.  There is currently no support for retained messages
+   *  at the time of writing.  Instead we handle retained messages here.  This method will create a new queue for
+   *  every address that is used to store retained messages.  THere should only ever be one message in the retained
+   *  message queue.  When a new subscription is created the queue should be browsed and the message copied onto
+   *  the subscription queue for the consumer.  When a new retained message is received the message will be sent to
+   *  the retained queue and the previous retain message consumed to remove it from the queue. */
+   void handleRetainedMessage(ServerMessage message, String address, boolean reset) throws Exception
+   {
+      SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address));
+
+      if (!session.getServerSession().executeQueueQuery(retainAddress).isExists())
+      {
+         session.getServerSession().createQueue(retainAddress, retainAddress, null, false, true);
+      }
+      Queue queue = session.getServer().locateQueue(retainAddress);
+
+      // Set the address of this message to the retained queue.
+      message.setAddress(retainAddress);
+
+      Iterator<MessageReference> iterator = queue.iterator();
+      synchronized (iterator)
+      {
+         if (iterator.hasNext())
+         {
+            Long messageId = iterator.next().getMessage().getMessageID();
+            queue.deleteReference(messageId);
+         }
+
+         if (!reset)
+         {
+            session.getServerSession().send(message.copy(), true);
+         }
+      }
+   }
+
+   void addRetainedMessagesToQueue(SimpleString queueName, String address) throws Exception
+   {
+      // Queue to add the retained messages to
+      Queue queue = session.getServer().locateQueue(queueName);
+
+      // The address filter that matches all retained message queues.
+      String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address);
+      BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
+
+      // Iterate over all matching retain queues and add the head message to the original queue.
+      for (SimpleString retainedQueueName : bindingQueryResult.getQueueNames())
+      {
+         Queue retainedQueue = session.getServer().locateQueue(retainedQueueName);
+         synchronized (this)
+         {
+            Iterator<MessageReference> i = retainedQueue.iterator();
+            if (i.hasNext())
+            {
+               ServerMessage message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID());
+               queue.addTail(message.createReference(queue), true);
+            }
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
new file mode 100644
index 0000000..e3516f1
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class MQTTSession
+{
+   static Map<String, MQTTSessionState> SESSIONS = new ConcurrentHashMap<>();
+
+   private final String id = UUID.randomUUID().toString();
+
+   private MQTTProtocolHandler protocolHandler;
+
+   private MQTTSubscriptionManager subscriptionManager;
+
+   private MQTTSessionCallback sessionCallback;
+
+   private ServerSessionImpl serverSession;
+
+   private MQTTPublishManager mqttPublishManager;
+
+   private MQTTConnectionManager mqttConnectionManager;
+
+   private MQTTRetainMessageManager retainMessageManager;
+
+   private MQTTConnection connection;
+
+   protected MQTTSessionState state;
+
+   private boolean stopped = false;
+
+   private MQTTLogger log = MQTTLogger.LOGGER;
+
+   public MQTTSession( MQTTProtocolHandler protocolHandler, MQTTConnection connection) throws Exception
+   {
+      this.protocolHandler = protocolHandler;
+      this.connection = connection;
+
+      mqttConnectionManager = new MQTTConnectionManager(this);
+      mqttPublishManager = new MQTTPublishManager(this);
+      sessionCallback = new MQTTSessionCallback(this);
+      subscriptionManager = new MQTTSubscriptionManager(this);
+      retainMessageManager = new MQTTRetainMessageManager(this);
+
+      log.debug("SESSION CREATED: " + id);
+   }
+
+   // Called after the client has Connected.
+   synchronized void  start() throws Exception
+   {
+      mqttPublishManager.start();
+      subscriptionManager.start();
+      stopped = false;
+   }
+
+   // TODO ensure resources are cleaned up for GC.
+   synchronized void stop() throws Exception
+   {
+      if (!stopped)
+      {
+         protocolHandler.stop(false);
+         // TODO this should pass in clean session.
+         subscriptionManager.stop(false);
+         mqttPublishManager.stop(false);
+
+         if (serverSession != null)
+         {
+            serverSession.stop();
+            serverSession.close(false);
+         }
+
+         if (state != null)
+         {
+            state.setAttached(false);
+         }
+      }
+      stopped = true;
+   }
+
+   boolean getStopped()
+   {
+      return stopped;
+   }
+
+   MQTTPublishManager getMqttPublishManager()
+   {
+      return mqttPublishManager;
+   }
+
+   MQTTSessionState getState()
+   {
+      return state;
+   }
+
+   MQTTConnectionManager getConnectionManager()
+   {
+      return mqttConnectionManager;
+   }
+
+   MQTTSessionState getSessionState()
+   {
+      return state;
+   }
+
+   ServerSessionImpl getServerSession()
+   {
+      return serverSession;
+   }
+
+   ActiveMQServer getServer()
+   {
+      return protocolHandler.getServer();
+   }
+
+   MQTTSubscriptionManager getSubscriptionManager()
+   {
+      return subscriptionManager;
+   }
+
+   MQTTProtocolHandler getProtocolHandler()
+   {
+      return protocolHandler;
+   }
+
+   SessionCallback getSessionCallback()
+   {
+      return sessionCallback;
+   }
+
+   void setServerSession(ServerSessionImpl serverSession)
+   {
+      this.serverSession = serverSession;
+   }
+
+   void setSessionState(MQTTSessionState state)
+   {
+      this.state = state;
+      state.setAttached(true);
+   }
+
+   MQTTRetainMessageManager getRetainMessageManager()
+   {
+      return retainMessageManager;
+   }
+
+   MQTTConnection getConnection()
+   {
+      return connection;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
new file mode 100644
index 0000000..63e19a5
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+
+public class MQTTSessionCallback implements SessionCallback
+{
+   private MQTTSession session;
+
+   private MQTTLogger log = MQTTLogger.LOGGER;
+
+   public MQTTSessionCallback(MQTTSession session) throws Exception
+   {
+      this.session = session;
+   }
+
+   @Override
+   public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount)
+   {
+      try
+      {
+         session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage());
+      }
+      return 1;
+   }
+
+   @Override
+   public int sendLargeMessageContinuation(ServerConsumer consumerID, byte[] body, boolean continues, boolean requiresResponse)
+   {
+      log.warn("Sending LARGE MESSAGE");
+      return 1;
+   }
+
+   @Override
+   public void addReadyListener(ReadyListener listener)
+   {
+      session.getConnection().getTransportConnection().addReadyListener(listener);
+   }
+
+   @Override
+   public void removeReadyListener(ReadyListener listener)
+   {
+      session.getConnection().getTransportConnection().removeReadyListener(listener);
+   }
+
+   @Override
+   public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount)
+   {
+      return sendMessage(message, consumer, deliveryCount);
+   }
+
+   @Override
+   public void disconnect(ServerConsumer consumer, String queueName)
+   {
+      try
+      {
+         consumer.removeItself();
+      }
+      catch (Exception e)
+      {
+         log.error(e.getMessage());
+      }
+   }
+
+   @Override
+   public boolean hasCredits(ServerConsumer consumerID)
+   {
+      return true;
+   }
+
+   @Override
+   public void sendProducerCreditsMessage(int credits, SimpleString address)
+   {
+   }
+
+   @Override
+   public void sendProducerCreditsFailMessage(int credits, SimpleString address)
+   {
+   }
+
+   @Override
+   public void closed()
+   {
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
new file mode 100644
index 0000000..b7fa436
--- /dev/null
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MQTTSessionState
+{
+   private String clientId;
+
+   private ServerMessage willMessage;
+
+   private final ConcurrentHashMap<String, MqttTopicSubscription> subscriptions = new ConcurrentHashMap<>();
+
+   // Used to store Packet ID of Publish QoS1 and QoS2 message.  See spec: 4.3.3 QoS 2: Exactly once delivery.  Method B.
+   private Map<Integer, MQTTMessageInfo> messageRefStore;
+
+   private Map<String, Map<Long, Integer>> addressMessageMap;
+
+   private Set<Integer> pubRec;
+
+   private Set<Integer> pub;
+
+   private boolean attached = false;
+
+   private MQTTLogger log = MQTTLogger.LOGGER;
+
+   // Objects track the Outbound message references
+   private Map<Integer, Pair<String, Long>> outboundMessageReferenceStore;
+
+   private ConcurrentHashMap<String, ConcurrentHashMap<Long, Integer>> reverseOutboundReferenceStore;
+
+   private final Object outboundLock = new Object();
+
+   // FIXME We should use a better mechanism for creating packet IDs.
+   private AtomicInteger lastId = new AtomicInteger(0);
+
+   public MQTTSessionState(String clientId)
+   {
+      this.clientId = clientId;
+
+      pubRec = new HashSet<>();
+      pub = new HashSet<>();
+
+      outboundMessageReferenceStore = new ConcurrentHashMap<>();
+      reverseOutboundReferenceStore = new ConcurrentHashMap<>();
+
+      messageRefStore = new ConcurrentHashMap<>();
+      addressMessageMap = new ConcurrentHashMap<>();
+   }
+
+   int generateId()
+   {
+      lastId.compareAndSet(Short.MAX_VALUE, 1);
+      return lastId.addAndGet(1);
+   }
+
+   void addOutbandMessageRef(int mqttId, String address, long serverMessageId, int qos)
+   {
+      synchronized (outboundLock)
+      {
+         outboundMessageReferenceStore.put(mqttId, new Pair<String, Long>(address, serverMessageId));
+         if (qos == 2)
+         {
+            if (reverseOutboundReferenceStore.containsKey(address))
+            {
+               reverseOutboundReferenceStore.get(address).put(serverMessageId, mqttId);
+            }
+            else
+            {
+               ConcurrentHashMap<Long, Integer> serverToMqttId = new ConcurrentHashMap<Long, Integer>();
+               serverToMqttId.put(serverMessageId, mqttId);
+               reverseOutboundReferenceStore.put(address, serverToMqttId);
+            }
+         }
+      }
+   }
+
+   Pair<String, Long> removeOutbandMessageRef(int mqttId, int qos)
+   {
+      synchronized (outboundLock)
+      {
+         Pair<String, Long> messageInfo = outboundMessageReferenceStore.remove(mqttId);
+         if (qos == 1)
+         {
+            return messageInfo;
+         }
+
+         Map<Long, Integer> map = reverseOutboundReferenceStore.get(messageInfo.getA());
+         if (map != null)
+         {
+            map.remove(messageInfo.getB());
+            if (map.isEmpty())
+            {
+               reverseOutboundReferenceStore.remove(messageInfo.getA());
+            }
+            return messageInfo;
+         }
+         return null;
+      }
+   }
+
+   Set<Integer> getPubRec()
+   {
+      return pubRec;
+   }
+
+   Set<Integer> getPub()
+   {
+      return pub;
+   }
+
+   boolean getAttached()
+   {
+      return attached;
+   }
+
+   void setAttached(boolean attached)
+   {
+      this.attached = attached;
+   }
+
+   boolean isWill()
+   {
+      return willMessage != null;
+   }
+
+   ServerMessage getWillMessage()
+   {
+      return willMessage;
+   }
+
+   void setWillMessage(ServerMessage willMessage)
+   {
+      this.willMessage = willMessage;
+   }
+
+   void deleteWillMessage()
+   {
+      willMessage = null;
+   }
+
+   Collection<MqttTopicSubscription> getSubscriptions()
+   {
+      return subscriptions.values();
+   }
+
+   boolean addSubscription(MqttTopicSubscription subscription)
+   {
+      synchronized (subscriptions)
+      {
+         addressMessageMap.putIfAbsent(MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName()), new ConcurrentHashMap<Long, Integer>());
+
+         MqttTopicSubscription existingSubscription = subscriptions.get(subscription.topicName());
+         if (existingSubscription != null)
+         {
+            if (subscription.qualityOfService().value() > existingSubscription.qualityOfService().value())
+            {
+               subscriptions.put(subscription.topicName(), subscription);
+               return true;
+            }
+         }
+         else
+         {
+            subscriptions.put(subscription.topicName(), subscription);
+            return true;
+         }
+      }
+      return false;
+   }
+
+   void removeSubscription(String address)
+   {
+      synchronized (subscriptions)
+      {
+         subscriptions.remove(address);
+         addressMessageMap.remove(address);
+      }
+   }
+
+   MqttTopicSubscription getSubscription(String address)
+   {
+      return subscriptions.get(address);
+   }
+
+   String getClientId()
+   {
+      return clientId;
+   }
+
+   void setClientId(String clientId)
+   {
+      this.clientId = clientId;
+   }
+
+   void storeMessageRef(Integer mqttId, MQTTMessageInfo messageInfo, boolean storeAddress)
+   {
+      messageRefStore.put(mqttId, messageInfo);
+      if (storeAddress)
+      {
+         Map<Long, Integer> addressMap = addressMessageMap.get(messageInfo.getAddress());
+         if (addressMap != null)
+         {
+            addressMap.put(messageInfo.getServerMessageId(), mqttId);
+         }
+      }
+   }
+
+   void removeMessageRef(Integer mqttId)
+   {
+      MQTTMessageInfo info = messageRefStore.remove(mqttId);
+      if (info != null)
+      {
+         Map<Long, Integer> addressMap = addressMessageMap.get(info.getAddress());
+         if (addressMap != null)
+         {
+            addressMap.remove(info.getServerMessageId());
+         }
+      }
+   }
+
+   MQTTMessageInfo getMessageInfo(Integer mqttId)
+   {
+      return messageRefStore.get(mqttId);
+   }
+}


[5/5] activemq-artemis git commit: This closes #62 MQTT support

Posted by cl...@apache.org.
This closes #62 MQTT support


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/17cc62bc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/17cc62bc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/17cc62bc

Branch: refs/heads/master
Commit: 17cc62bca2a0659c39bec026bb0b5ea45292dd68
Parents: 077e9e2 0f82ca7
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Jul 9 10:17:48 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jul 9 10:17:48 2015 -0400

----------------------------------------------------------------------
 artemis-distribution/pom.xml                    |    9 +
 artemis-distribution/src/main/assembly/dep.xml  |    2 +
 artemis-protocols/artemis-mqtt-protocol/pom.xml |   56 +
 .../core/protocol/mqtt/MQTTConnection.java      |  241 +++
 .../protocol/mqtt/MQTTConnectionManager.java    |  205 ++
 .../core/protocol/mqtt/MQTTFailureListener.java |   47 +
 .../artemis/core/protocol/mqtt/MQTTLogger.java  |   43 +
 .../core/protocol/mqtt/MQTTMessageInfo.java     |   57 +
 .../core/protocol/mqtt/MQTTProtocolHandler.java |  362 ++++
 .../core/protocol/mqtt/MQTTProtocolManager.java |  140 ++
 .../mqtt/MQTTProtocolManagerFactory.java        |   58 +
 .../core/protocol/mqtt/MQTTPublishManager.java  |  270 +++
 .../protocol/mqtt/MQTTRetainMessageManager.java |   98 +
 .../artemis/core/protocol/mqtt/MQTTSession.java |  173 ++
 .../core/protocol/mqtt/MQTTSessionCallback.java |  111 ++
 .../core/protocol/mqtt/MQTTSessionState.java    |  250 +++
 .../protocol/mqtt/MQTTSubscriptionManager.java  |  183 ++
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |  178 ++
 ...mis.spi.core.protocol.ProtocolManagerFactory |    1 +
 artemis-protocols/pom.xml                       |    1 +
 pom.xml                                         |   23 +
 tests/integration-tests/pom.xml                 |   37 +
 .../mqtt/imported/FuseMQTTClientProvider.java   |  131 ++
 .../mqtt/imported/MQTTClientProvider.java       |   48 +
 .../integration/mqtt/imported/MQTTTest.java     | 1747 ++++++++++++++++++
 .../mqtt/imported/MQTTTestSupport.java          |  376 ++++
 .../integration/mqtt/imported/PahoMQTTTest.java |  175 ++
 .../util/ResourceLoadingSslContext.java         |  284 +++
 .../integration/mqtt/imported/util/Wait.java    |   56 +
 29 files changed, 5362 insertions(+)
----------------------------------------------------------------------



[2/5] activemq-artemis git commit: Added Initial MQTT Protocol Support

Posted by cl...@apache.org.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
new file mode 100644
index 0000000..2a790e8
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -0,0 +1,1747 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import java.lang.reflect.Field;
+import java.net.ProtocolException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
+import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.client.Tracer;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.fusesource.mqtt.codec.PUBLISH;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.vertx.java.core.impl.ConcurrentHashSet;
+
+/**
+ * MQTT Test imported from ActiveMQ MQTT component.
+ */
+public class MQTTTest extends MQTTTestSupport
+{
+
+   private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
+
+   private static final int NUM_MESSAGES = 250;
+
+   @Before
+   public void setUp() throws Exception
+   {
+      Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
+      sessions.setAccessible(true);
+      sessions.set(null, new ConcurrentHashMap<>());
+
+      Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
+      connectedClients.setAccessible(true);
+      connectedClients.set(null, new ConcurrentHashSet<>());
+      super.setUp();
+
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testSendAndReceiveMQTT() throws Exception
+   {
+      final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
+      initializeConnection(subscriptionProvider);
+
+      subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
+
+      final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES);
+
+      Thread thread = new Thread(new Runnable()
+      {
+         @Override
+         public void run()
+         {
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               try
+               {
+                  byte[] payload = subscriptionProvider.receive(10000);
+                  assertNotNull("Should get a message", payload);
+                  latch.countDown();
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace();
+                  break;
+               }
+
+            }
+         }
+      });
+      thread.start();
+
+      final MQTTClientProvider publishProvider = getMQTTClientProvider();
+      initializeConnection(publishProvider);
+
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         String payload = "Message " + i;
+         publishProvider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
+      }
+
+      latch.await(10, TimeUnit.SECONDS);
+      assertEquals(0, latch.getCount());
+      subscriptionProvider.disconnect();
+      publishProvider.disconnect();
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testUnsubscribeMQTT() throws Exception
+   {
+      final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
+      initializeConnection(subscriptionProvider);
+
+      String topic = "foo/bah";
+
+      subscriptionProvider.subscribe(topic, AT_MOST_ONCE);
+
+      final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES / 2);
+
+      Thread thread = new Thread(new Runnable()
+      {
+         @Override
+         public void run()
+         {
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               try
+               {
+                  byte[] payload = subscriptionProvider.receive(10000);
+                  assertNotNull("Should get a message", payload);
+                  latch.countDown();
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace();
+                  break;
+               }
+
+            }
+         }
+      });
+      thread.start();
+
+      final MQTTClientProvider publishProvider = getMQTTClientProvider();
+      initializeConnection(publishProvider);
+
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         String payload = "Message " + i;
+         if (i == NUM_MESSAGES / 2)
+         {
+            subscriptionProvider.unsubscribe(topic);
+         }
+         publishProvider.publish(topic, payload.getBytes(), AT_LEAST_ONCE);
+      }
+
+      latch.await(20, TimeUnit.SECONDS);
+      assertEquals(0, latch.getCount());
+      subscriptionProvider.disconnect();
+      publishProvider.disconnect();
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testSendAtMostOnceReceiveExactlyOnce() throws Exception
+   {
+      /**
+       * Although subscribing with EXACTLY ONCE, the message gets published
+       * with AT_MOST_ONCE - in MQTT the QoS is always determined by the
+       * message as published - not the wish of the subscriber
+       */
+      final MQTTClientProvider provider = getMQTTClientProvider();
+      initializeConnection(provider);
+      provider.subscribe("foo", EXACTLY_ONCE);
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         String payload = "Test Message: " + i;
+         provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
+         byte[] message = provider.receive(5000);
+         assertNotNull("Should get a message", message);
+         assertEquals(payload, new String(message));
+      }
+      provider.disconnect();
+   }
+
+   @Test(timeout = 2 * 60 * 1000)
+   public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception
+   {
+      final MQTTClientProvider provider = getMQTTClientProvider();
+      initializeConnection(provider);
+      provider.subscribe("foo", EXACTLY_ONCE);
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         String payload = "Test Message: " + i;
+         provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
+         byte[] message = provider.receive(5000);
+         assertNotNull("Should get a message", message);
+         assertEquals(payload, new String(message));
+      }
+      provider.disconnect();
+   }
+
+   @Test(timeout = 2 * 60 * 1000)
+   public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception
+   {
+      final MQTTClientProvider provider = getMQTTClientProvider();
+      initializeConnection(provider);
+      provider.subscribe("foo", AT_MOST_ONCE);
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         String payload = "Test Message: " + i;
+         provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
+         byte[] message = provider.receive(5000);
+         assertNotNull("Should get a message", message);
+         assertEquals(payload, new String(message));
+      }
+      provider.disconnect();
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testSendAndReceiveAtMostOnce() throws Exception
+   {
+      final MQTTClientProvider provider = getMQTTClientProvider();
+      initializeConnection(provider);
+      provider.subscribe("foo", AT_MOST_ONCE);
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         String payload = "Test Message: " + i;
+         provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
+         byte[] message = provider.receive(5000);
+         assertNotNull("Should get a message", message);
+         assertEquals(payload, new String(message));
+      }
+      provider.disconnect();
+   }
+
+   @Test(timeout = 2 * 60 * 1000)
+   public void testSendAndReceiveAtLeastOnce() throws Exception
+   {
+      final MQTTClientProvider provider = getMQTTClientProvider();
+      initializeConnection(provider);
+      provider.subscribe("foo", AT_LEAST_ONCE);
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         String payload = "Test Message: " + i;
+         provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
+         byte[] message = provider.receive(5000);
+         assertNotNull("Should get a message", message);
+         assertEquals(payload, new String(message));
+      }
+      provider.disconnect();
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testSendAndReceiveExactlyOnce() throws Exception
+   {
+      final MQTTClientProvider publisher = getMQTTClientProvider();
+      initializeConnection(publisher);
+
+      final MQTTClientProvider subscriber = getMQTTClientProvider();
+      initializeConnection(subscriber);
+
+      subscriber.subscribe("foo", EXACTLY_ONCE);
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         String payload = "Test Message: " + i;
+         publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
+         byte[] message = subscriber.receive(5000);
+         assertNotNull("Should get a message + [" + i + "]", message);
+         assertEquals(payload, new String(message));
+      }
+      subscriber.disconnect();
+      publisher.disconnect();
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testSendAndReceiveLargeMessages() throws Exception
+   {
+      byte[] payload = new byte[1024 * 32];
+      for (int i = 0; i < payload.length; i++)
+      {
+         payload[i] = '2';
+      }
+      final MQTTClientProvider publisher = getMQTTClientProvider();
+      initializeConnection(publisher);
+
+      final MQTTClientProvider subscriber = getMQTTClientProvider();
+      initializeConnection(subscriber);
+
+      subscriber.subscribe("foo", AT_LEAST_ONCE);
+      for (int i = 0; i < 10; i++)
+      {
+         publisher.publish("foo", payload, AT_LEAST_ONCE);
+         byte[] message = subscriber.receive(5000);
+         assertNotNull("Should get a message", message);
+
+         assertArrayEquals(payload, message);
+      }
+      subscriber.disconnect();
+      publisher.disconnect();
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testSendAndReceiveRetainedMessages() throws Exception
+   {
+      final MQTTClientProvider publisher = getMQTTClientProvider();
+      initializeConnection(publisher);
+
+      final MQTTClientProvider subscriber = getMQTTClientProvider();
+      initializeConnection(subscriber);
+
+      String RETAINED = "retained";
+      publisher.publish("foo", RETAINED.getBytes(), AT_LEAST_ONCE, true);
+
+      List<String> messages = new ArrayList<String>();
+      for (int i = 0; i < 10; i++)
+      {
+         messages.add("TEST MESSAGE:" + i);
+      }
+
+      subscriber.subscribe("foo", AT_LEAST_ONCE);
+
+      for (int i = 0; i < 10; i++)
+      {
+         publisher.publish("foo", messages.get(i).getBytes(), AT_LEAST_ONCE);
+      }
+      byte[] msg = subscriber.receive(5000);
+      assertNotNull(msg);
+      assertEquals(RETAINED, new String(msg));
+
+      for (int i = 0; i < 10; i++)
+      {
+         msg = subscriber.receive(5000);
+         assertNotNull(msg);
+         assertEquals(messages.get(i), new String(msg));
+      }
+      subscriber.disconnect();
+      publisher.disconnect();
+   }
+
+   @Test(timeout = 30 * 1000)
+   public void testValidZeroLengthClientId() throws Exception
+   {
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId("");
+      mqtt.setCleanSession(true);
+
+      BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+      connection.disconnect();
+   }
+
+   @Test(timeout = 2 * 60 * 1000)
+   public void testMQTTPathPatterns() throws Exception
+   {
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId("");
+      mqtt.setCleanSession(true);
+
+      BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+
+      final String RETAINED = "RETAINED";
+      String[] topics = {"TopicA", "/TopicA", "/", "TopicA/", "//"};
+      for (String topic : topics)
+      {
+         // test retained message
+         connection.publish(topic, (RETAINED + topic).getBytes(), QoS.AT_LEAST_ONCE, true);
+
+         connection.subscribe(new Topic[]{new Topic(topic, QoS.AT_LEAST_ONCE)});
+         Message msg = connection.receive(5, TimeUnit.SECONDS);
+         assertNotNull("No message for " + topic, msg);
+         assertEquals(RETAINED + topic, new String(msg.getPayload()));
+         msg.ack();
+
+         // test non-retained message
+         connection.publish(topic, topic.getBytes(), QoS.AT_LEAST_ONCE, false);
+         msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+         assertNotNull(msg);
+         assertEquals(topic, new String(msg.getPayload()));
+         msg.ack();
+
+         connection.unsubscribe(new String[]{topic});
+      }
+      connection.disconnect();
+
+      // test wildcard patterns with above topics
+      String[] wildcards = {"#", "+", "+/#", "/+", "+/", "+/+", "+/+/", "+/+/+"};
+      for (String wildcard : wildcards)
+      {
+         final Pattern pattern = Pattern.compile(wildcard.replaceAll("/?#", "(/?.*)*").replaceAll("\\+", "[^/]*"));
+
+         connection = mqtt.blockingConnection();
+         connection.connect();
+         final byte[] qos = connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)});
+         assertNotEquals("Subscribe failed " + wildcard, (byte) 0x80, qos[0]);
+
+         // test retained messages
+         Message msg = connection.receive(5, TimeUnit.SECONDS);
+         do
+         {
+            assertNotNull("RETAINED null " + wildcard, msg);
+            assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
+            assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
+            msg.ack();
+            msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+         } while (msg != null);
+
+         // test non-retained message
+         for (String topic : topics)
+         {
+            connection.publish(topic, topic.getBytes(), QoS.AT_LEAST_ONCE, false);
+         }
+         msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+         do
+         {
+            assertNotNull("Non-retained Null " + wildcard, msg);
+            assertTrue("Non-retained matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
+            msg.ack();
+            msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+         } while (msg != null);
+
+         connection.unsubscribe(new String[]{wildcard});
+         connection.disconnect();
+      }
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testMQTTRetainQoS() throws Exception
+   {
+      String[] topics = {"AT_MOST_ONCE", "AT_LEAST_ONCE", "EXACTLY_ONCE"};
+      for (int i = 0; i < topics.length; i++)
+      {
+         final String topic = topics[i];
+
+         MQTT mqtt = createMQTTConnection();
+         mqtt.setClientId("foo");
+         mqtt.setKeepAlive((short) 2);
+
+         final int[] actualQoS = {-1};
+         mqtt.setTracer(new Tracer()
+         {
+            @Override
+            public void onReceive(MQTTFrame frame)
+            {
+               // validate the QoS
+               if (frame.messageType() == PUBLISH.TYPE)
+               {
+                  actualQoS[0] = frame.qos().ordinal();
+               }
+            }
+         });
+
+         final BlockingConnection connection = mqtt.blockingConnection();
+         connection.connect();
+         connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, true);
+         connection.subscribe(new Topic[]{new Topic(topic, QoS.valueOf(topic))});
+
+         final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+         assertNotNull(msg);
+         assertEquals(topic, new String(msg.getPayload()));
+         int waitCount = 0;
+         while (actualQoS[0] == -1 && waitCount < 10)
+         {
+            Thread.sleep(1000);
+            waitCount++;
+         }
+         assertEquals(i, actualQoS[0]);
+         msg.ack();
+
+         connection.unsubscribe(new String[]{topic});
+         connection.disconnect();
+      }
+
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testDuplicateSubscriptions() throws Exception
+   {
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId("foo");
+      mqtt.setKeepAlive((short) 20);
+
+      final int[] actualQoS = {-1};
+      mqtt.setTracer(new Tracer()
+      {
+         @Override
+         public void onReceive(MQTTFrame frame)
+         {
+            // validate the QoS
+            if (frame.messageType() == PUBLISH.TYPE)
+            {
+               actualQoS[0] = frame.qos().ordinal();
+            }
+         }
+      });
+
+      final BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+
+      final String RETAIN = "RETAIN";
+      connection.publish("TopicA", RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+
+      QoS[] qoss = {QoS.AT_MOST_ONCE, QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE};
+      for (QoS qos : qoss)
+      {
+         connection.subscribe(new Topic[]{new Topic("TopicA", qos)});
+
+         final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+         assertNotNull("No message for " + qos, msg);
+         assertEquals(RETAIN, new String(msg.getPayload()));
+         msg.ack();
+         int waitCount = 0;
+         while (actualQoS[0] == -1 && waitCount < 10)
+         {
+            Thread.sleep(1000);
+            waitCount++;
+         }
+         assertEquals(qos.ordinal(), actualQoS[0]);
+         actualQoS[0] = -1;
+      }
+
+      connection.unsubscribe(new String[]{"TopicA"});
+      connection.disconnect();
+
+   }
+
+   @Test(timeout = 120 * 1000)
+   public void testRetainedMessage() throws Exception
+   {
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setKeepAlive((short) 60);
+
+      final String RETAIN = "RETAIN";
+      final String TOPICA = "TopicA";
+
+      final String[] clientIds = {null, "foo", "durable"};
+      for (String clientId : clientIds)
+      {
+         LOG.info("Testing now with Client ID: {}", clientId);
+
+         mqtt.setClientId(clientId);
+         mqtt.setCleanSession(!"durable".equals(clientId));
+
+         BlockingConnection connection = mqtt.blockingConnection();
+         connection.connect();
+
+         // set retained message and check
+         connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+         connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+         Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+         assertNotNull("No retained message for " + clientId, msg);
+         assertEquals(RETAIN, new String(msg.getPayload()));
+         msg.ack();
+         assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+         // test duplicate subscription
+         connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+         msg = connection.receive(15000, TimeUnit.MILLISECONDS);
+         assertNotNull("No retained message on duplicate subscription for " + clientId, msg);
+         assertEquals(RETAIN, new String(msg.getPayload()));
+         msg.ack();
+         assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+         connection.unsubscribe(new String[]{TOPICA});
+
+         // clear retained message and check that we don't receive it
+         connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true);
+         connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+         msg = connection.receive(500, TimeUnit.MILLISECONDS);
+         assertNull("Retained message not cleared for " + clientId, msg);
+         connection.unsubscribe(new String[]{TOPICA});
+
+         // set retained message again and check
+         connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+         connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+         msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+         assertNotNull("No reset retained message for " + clientId, msg);
+         assertEquals(RETAIN, new String(msg.getPayload()));
+         msg.ack();
+         assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+         // re-connect and check
+         connection.disconnect();
+         connection = mqtt.blockingConnection();
+         connection.connect();
+         connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+         msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+         assertNotNull("No reset retained message for " + clientId, msg);
+         assertEquals(RETAIN, new String(msg.getPayload()));
+         msg.ack();
+         assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+         connection.unsubscribe(new String[]{TOPICA});
+         connection.disconnect();
+      }
+   }
+
+   @Ignore
+   @Test(timeout = 120 * 1000)
+   public void testRetainedMessageOnVirtualTopics() throws Exception
+   {
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setKeepAlive((short) 60);
+
+      final String RETAIN = "RETAIN";
+      final String TOPICA = "VirtualTopic/TopicA";
+
+      final String[] clientIds = {null, "foo", "durable"};
+      for (String clientId : clientIds)
+      {
+         LOG.info("Testing now with Client ID: {}", clientId);
+
+         mqtt.setClientId(clientId);
+         mqtt.setCleanSession(!"durable".equals(clientId));
+
+         BlockingConnection connection = mqtt.blockingConnection();
+         connection.connect();
+
+         // set retained message and check
+         connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+         connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+         Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+         assertNotNull("No retained message for " + clientId, msg);
+         assertEquals(RETAIN, new String(msg.getPayload()));
+         msg.ack();
+         assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+         // test duplicate subscription
+         connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+         msg = connection.receive(15000, TimeUnit.MILLISECONDS);
+         assertNotNull("No retained message on duplicate subscription for " + clientId, msg);
+         assertEquals(RETAIN, new String(msg.getPayload()));
+         msg.ack();
+         assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+         connection.unsubscribe(new String[]{TOPICA});
+
+         // clear retained message and check that we don't receive it
+         connection.publish(TOPICA, "".getBytes(), QoS.AT_MOST_ONCE, true);
+         connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+         msg = connection.receive(500, TimeUnit.MILLISECONDS);
+         assertNull("Retained message not cleared for " + clientId, msg);
+         connection.unsubscribe(new String[]{TOPICA});
+
+         // set retained message again and check
+         connection.publish(TOPICA, RETAIN.getBytes(), QoS.EXACTLY_ONCE, true);
+         connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+         msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+         assertNotNull("No reset retained message for " + clientId, msg);
+         assertEquals(RETAIN, new String(msg.getPayload()));
+         msg.ack();
+         assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+         // re-connect and check
+         connection.disconnect();
+         connection = mqtt.blockingConnection();
+         connection.connect();
+         connection.subscribe(new Topic[]{new Topic(TOPICA, QoS.AT_LEAST_ONCE)});
+         msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+         assertNotNull("No reset retained message for " + clientId, msg);
+         assertEquals(RETAIN, new String(msg.getPayload()));
+         msg.ack();
+         assertNull(connection.receive(500, TimeUnit.MILLISECONDS));
+
+         LOG.info("Test now unsubscribing from: {} for the last time", TOPICA);
+         connection.unsubscribe(new String[]{TOPICA});
+         connection.disconnect();
+      }
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testUniqueMessageIds() throws Exception
+   {
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId("foo");
+      mqtt.setKeepAlive((short) 2);
+      mqtt.setCleanSession(true);
+
+      final List<PUBLISH> publishList = new ArrayList<PUBLISH>();
+      mqtt.setTracer(new Tracer()
+      {
+         @Override
+         public void onReceive(MQTTFrame frame)
+         {
+            LOG.info("Client received:\n" + frame);
+            if (frame.messageType() == PUBLISH.TYPE)
+            {
+               PUBLISH publish = new PUBLISH();
+               try
+               {
+                  publish.decode(frame);
+               }
+               catch (ProtocolException e)
+               {
+                  fail("Error decoding publish " + e.getMessage());
+               }
+               publishList.add(publish);
+            }
+         }
+
+         @Override
+         public void onSend(MQTTFrame frame)
+         {
+            LOG.info("Client sent:\n" + frame);
+         }
+      });
+
+      final BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+
+      // create overlapping subscriptions with different QoSs
+      QoS[] qoss = {QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE};
+      final String TOPIC = "TopicA/";
+
+      // publish retained message
+      connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, true);
+
+      String[] subs = {TOPIC, "TopicA/#", "TopicA/+"};
+      for (int i = 0; i < qoss.length; i++)
+      {
+         connection.subscribe(new Topic[]{new Topic(subs[i], qoss[i])});
+      }
+
+      // publish non-retained message
+      connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+      int received = 0;
+
+      Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+      do
+      {
+         assertNotNull(msg);
+         assertEquals(TOPIC, new String(msg.getPayload()));
+         msg.ack();
+         int waitCount = 0;
+         while (publishList.size() <= received && waitCount < 10)
+         {
+            Thread.sleep(1000);
+            waitCount++;
+         }
+         msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+      } while (msg != null && received++ < subs.length * 2);
+      assertEquals("Unexpected number of messages", subs.length * 2, received + 1);
+
+      // make sure we received distinct ids for QoS != AT_MOST_ONCE, and 0 for
+      // AT_MOST_ONCE
+      for (int i = 0; i < publishList.size(); i++)
+      {
+         for (int j = i + 1; j < publishList.size(); j++)
+         {
+            final PUBLISH publish1 = publishList.get(i);
+            final PUBLISH publish2 = publishList.get(j);
+            boolean qos0 = false;
+            if (publish1.qos() == QoS.AT_MOST_ONCE)
+            {
+               qos0 = true;
+               assertEquals(0, publish1.messageId());
+            }
+            if (publish2.qos() == QoS.AT_MOST_ONCE)
+            {
+               qos0 = true;
+               assertEquals(0, publish2.messageId());
+            }
+            if (!qos0)
+            {
+               assertNotEquals(publish1.messageId(), publish2.messageId());
+            }
+         }
+      }
+
+      connection.unsubscribe(subs);
+      connection.disconnect();
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testResendMessageId() throws Exception
+   {
+      final MQTT mqtt = createMQTTConnection("resend", false);
+      mqtt.setKeepAlive((short) 5);
+
+      final List<PUBLISH> publishList = new ArrayList<PUBLISH>();
+      mqtt.setTracer(new Tracer()
+      {
+         @Override
+         public void onReceive(MQTTFrame frame)
+         {
+            LOG.info("Client received:\n" + frame);
+            if (frame.messageType() == PUBLISH.TYPE)
+            {
+               PUBLISH publish = new PUBLISH();
+               try
+               {
+                  publish.decode(frame);
+               }
+               catch (ProtocolException e)
+               {
+                  fail("Error decoding publish " + e.getMessage());
+               }
+               publishList.add(publish);
+            }
+         }
+
+         @Override
+         public void onSend(MQTTFrame frame)
+         {
+            LOG.info("Client sent:\n" + frame);
+         }
+      });
+
+      BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+      final String TOPIC = "TopicA/";
+      final String[] topics = new String[]{TOPIC, "TopicA/+"};
+      connection.subscribe(new Topic[]{new Topic(topics[0], QoS.AT_LEAST_ONCE), new Topic(topics[1], QoS.EXACTLY_ONCE)});
+
+      // publish non-retained message
+      connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+
+      Wait.waitFor(new Wait.Condition()
+      {
+         @Override
+         public boolean isSatisified() throws Exception
+         {
+            return publishList.size() == 2;
+         }
+      }, 5000);
+      assertEquals(2, publishList.size());
+
+      connection.disconnect();
+
+      connection = mqtt.blockingConnection();
+      connection.connect();
+
+      Wait.waitFor(new Wait.Condition()
+      {
+         @Override
+         public boolean isSatisified() throws Exception
+         {
+            return publishList.size() == 4;
+         }
+      }, 5000);
+      assertEquals(4, publishList.size());
+
+      // TODO Investigate if receiving the same ID for overlapping subscriptions is actually spec compliant.
+      // In Artemis we send a new ID for every copy of the message.
+
+      // make sure we received duplicate message ids
+//      assertTrue(publishList.get(0).messageId() == publishList.get(2).messageId() || publishList.get(0).messageId() == publishList.get(3).messageId());
+//      assertTrue(publishList.get(1).messageId() == publishList.get(3).messageId() || publishList.get(1).messageId() == publishList.get(2).messageId());
+//      assertTrue(publishList.get(2).dup() && publishList.get(3).dup());
+
+      connection.unsubscribe(topics);
+      connection.disconnect();
+   }
+
+   @Test(timeout = 90 * 1000)
+   public void testPacketIdGeneratorNonCleanSession() throws Exception
+   {
+      final MQTT mqtt = createMQTTConnection("nonclean-packetid", false);
+      mqtt.setKeepAlive((short) 15);
+
+      final Map<Short, PUBLISH> publishMap = new ConcurrentHashMap<Short, PUBLISH>();
+      mqtt.setTracer(new Tracer()
+      {
+         @Override
+         public void onReceive(MQTTFrame frame)
+         {
+            LOG.info("Client received:\n" + frame);
+            if (frame.messageType() == PUBLISH.TYPE)
+            {
+               PUBLISH publish = new PUBLISH();
+               try
+               {
+                  publish.decode(frame);
+                  LOG.info("PUBLISH " + publish);
+               }
+               catch (ProtocolException e)
+               {
+                  fail("Error decoding publish " + e.getMessage());
+               }
+               if (publishMap.get(publish.messageId()) != null)
+               {
+                  assertTrue(publish.dup());
+               }
+               publishMap.put(publish.messageId(), publish);
+            }
+         }
+
+         @Override
+         public void onSend(MQTTFrame frame)
+         {
+            LOG.info("Client sent:\n" + frame);
+         }
+      });
+
+      BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+      final String TOPIC = "TopicA/";
+      connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+
+      // publish non-retained messages
+      final int TOTAL_MESSAGES = 10;
+      for (int i = 0; i < TOTAL_MESSAGES; i++)
+      {
+         connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+      }
+
+      // receive half the messages in this session
+      for (int i = 0; i < TOTAL_MESSAGES / 2; i++)
+      {
+         final Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+         assertNotNull(msg);
+         assertEquals(TOPIC, new String(msg.getPayload()));
+         msg.ack();
+      }
+
+      connection.disconnect();
+      // resume session
+      connection = mqtt.blockingConnection();
+      connection.connect();
+      // receive rest of the messages
+      Message msg = null;
+      do
+      {
+         msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+         if (msg != null)
+         {
+            assertEquals(TOPIC, new String(msg.getPayload()));
+            msg.ack();
+         }
+      } while (msg != null);
+
+      // make sure we received all message ids
+      for (short id = 1; id <= TOTAL_MESSAGES; id++)
+      {
+         assertNotNull("No message for id " + id, publishMap.get(id));
+      }
+
+      connection.unsubscribe(new String[]{TOPIC});
+      connection.disconnect();
+   }
+
+   @Ignore
+   @Test(timeout = 90 * 1000)
+   // TODO ActiveMQ 5.x does not reset the message id generator even after a clean session.  In Artemis we always reset.
+   // If there is a good reason for this we should follow ActiveMQ.
+   public void testPacketIdGeneratorCleanSession() throws Exception
+   {
+      final String[] cleanClientIds = new String[]{"", "clean-packetid", null};
+      final Map<Short, PUBLISH> publishMap = new ConcurrentHashMap<Short, PUBLISH>();
+      MQTT[] mqtts = new MQTT[cleanClientIds.length];
+      for (int i = 0; i < cleanClientIds.length; i++)
+      {
+         mqtts[i] = createMQTTConnection("", true);
+         mqtts[i].setKeepAlive((short) 15);
+
+         mqtts[i].setTracer(new Tracer()
+         {
+            @Override
+            public void onReceive(MQTTFrame frame)
+            {
+               LOG.info("Client received:\n" + frame);
+               if (frame.messageType() == PUBLISH.TYPE)
+               {
+                  PUBLISH publish = new PUBLISH();
+                  try
+                  {
+                     publish.decode(frame);
+                     LOG.info("PUBLISH " + publish);
+                  }
+                  catch (ProtocolException e)
+                  {
+                     fail("Error decoding publish " + e.getMessage());
+                  }
+                  if (publishMap.get(publish.messageId()) != null)
+                  {
+                     assertTrue(publish.dup());
+                  }
+                  publishMap.put(publish.messageId(), publish);
+               }
+            }
+
+            @Override
+            public void onSend(MQTTFrame frame)
+            {
+               LOG.info("Client sent:\n" + frame);
+            }
+         });
+      }
+
+      final Random random = new Random();
+      for (short i = 0; i < 10; i++)
+      {
+         BlockingConnection connection = mqtts[random.nextInt(cleanClientIds.length)].blockingConnection();
+         connection.connect();
+         final String TOPIC = "TopicA/";
+         connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+
+         // publish non-retained message
+         connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+         Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+         assertNotNull(msg);
+         assertEquals(TOPIC, new String(msg.getPayload()));
+         msg.ack();
+
+         assertEquals(1, publishMap.size());
+         final short id = (short) (i + 1);
+         assertNotNull("No message for id " + id, publishMap.get(id));
+         publishMap.clear();
+
+         connection.disconnect();
+      }
+
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testClientConnectionFailure() throws Exception
+   {
+      MQTT mqtt = createMQTTConnection("reconnect", false);
+      mqtt.setKeepAlive((short) 1);
+
+      final BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+      Wait.waitFor(new Wait.Condition()
+      {
+         @Override
+         public boolean isSatisified() throws Exception
+         {
+            return connection.isConnected();
+         }
+      });
+
+      final String TOPIC = "TopicA";
+      final byte[] qos = connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+      assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]);
+      connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+      // kill transport
+      connection.kill();
+
+      // FIXME Wait for the previous connection to timeout.  This is not required in ActiveMQ.  Needs investigating.
+      Thread.sleep(10000);
+
+      final BlockingConnection newConnection = mqtt.blockingConnection();
+      newConnection.connect();
+      Wait.waitFor(new Wait.Condition()
+      {
+         @Override
+         public boolean isSatisified() throws Exception
+         {
+            return newConnection.isConnected();
+         }
+      });
+
+      assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]);
+      Message msg = newConnection.receive(1000, TimeUnit.MILLISECONDS);
+      assertNotNull(msg);
+      assertEquals(TOPIC, new String(msg.getPayload()));
+      msg.ack();
+      newConnection.disconnect();
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testCleanSession() throws Exception
+   {
+      final String CLIENTID = "cleansession";
+      final MQTT mqttNotClean = createMQTTConnection(CLIENTID, false);
+      BlockingConnection notClean = mqttNotClean.blockingConnection();
+      final String TOPIC = "TopicA";
+      notClean.connect();
+      notClean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+      notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+      notClean.disconnect();
+
+      // MUST receive message from previous not clean session
+      notClean = mqttNotClean.blockingConnection();
+      notClean.connect();
+      Message msg = notClean.receive(10000, TimeUnit.MILLISECONDS);
+      assertNotNull(msg);
+      assertEquals(TOPIC, new String(msg.getPayload()));
+      msg.ack();
+      notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+      notClean.disconnect();
+
+      // MUST NOT receive message from previous not clean session
+      final MQTT mqttClean = createMQTTConnection(CLIENTID, true);
+      final BlockingConnection clean = mqttClean.blockingConnection();
+      clean.connect();
+      msg = clean.receive(10000, TimeUnit.MILLISECONDS);
+      assertNull(msg);
+      clean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+      clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+      clean.disconnect();
+
+      // MUST NOT receive message from previous clean session
+      notClean = mqttNotClean.blockingConnection();
+      notClean.connect();
+      msg = notClean.receive(1000, TimeUnit.MILLISECONDS);
+      assertNull(msg);
+      notClean.disconnect();
+   }
+
+   /* TODO These Cross protocol tests were imported from ActiveMQ and need reworking to apply to Artemis.  There is an
+   outstanding task to add cross protocol support.  This task should rework these tests.  The tests are included here
+   and commented out to ensure ActiveMQ and Artemis tests are in sync. */
+
+//   @Test(timeout = 60 * 1000)
+//   public void testSendMQTTReceiveJMS() throws Exception {
+//      doTestSendMQTTReceiveJMS("foo.*");
+//   }
+
+//   public void doTestSendMQTTReceiveJMS(String destinationName) throws Exception {
+//      final MQTTClientProvider provider = getMQTTClientProvider();
+//      initializeConnection(provider);
+//
+//      // send retained message
+//      final String RETAINED = "RETAINED";
+//      provider.publish("foo/bah", RETAINED.getBytes(), AT_LEAST_ONCE, true);
+//
+//      ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
+//      // MUST set to true to receive retained messages
+//      activeMQConnection.setUseRetroactiveConsumer(true);
+//      activeMQConnection.start();
+//      Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//      javax.jms.Topic jmsTopic = s.createTopic(destinationName);
+//      MessageConsumer consumer = s.createConsumer(jmsTopic);
+//
+//      // check whether we received retained message on JMS subscribe
+//      ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
+//      assertNotNull("Should get retained message", message);
+//      ByteSequence bs = message.getContent();
+//      assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
+//      assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
+//
+//      for (int i = 0; i < NUM_MESSAGES; i++) {
+//         String payload = "Test Message: " + i;
+//         provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
+//         message = (ActiveMQMessage) consumer.receive(5000);
+//         assertNotNull("Should get a message", message);
+//         bs = message.getContent();
+//         assertEquals(payload, new String(bs.data, bs.offset, bs.length));
+//      }
+//
+//      activeMQConnection.close();
+//      provider.disconnect();
+//   }
+
+   // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
+//   @Test(timeout = 2 * 60 * 1000)
+//   public void testSendJMSReceiveMQTT() throws Exception {
+//      doTestSendJMSReceiveMQTT("foo.far");
+//   }
+
+   // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
+//   public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
+//      final MQTTClientProvider provider = getMQTTClientProvider();
+//      initializeConnection(provider);
+//
+//      ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
+//      activeMQConnection.setUseRetroactiveConsumer(true);
+//      activeMQConnection.start();
+//      Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//      javax.jms.Topic jmsTopic = s.createTopic(destinationName);
+//      MessageProducer producer = s.createProducer(jmsTopic);
+//
+//      // send retained message from JMS
+//      final String RETAINED = "RETAINED";
+//      TextMessage sendMessage = s.createTextMessage(RETAINED);
+//      // mark the message to be retained
+//      sendMessage.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
+//      // MQTT QoS can be set using MQTTProtocolConverter.QOS_PROPERTY_NAME property
+//      sendMessage.setIntProperty(MQTTProtocolConverter.QOS_PROPERTY_NAME, 0);
+//      producer.send(sendMessage);
+//
+//      provider.subscribe("foo/+", AT_MOST_ONCE);
+//      byte[] message = provider.receive(10000);
+//      assertNotNull("Should get retained message", message);
+//      assertEquals(RETAINED, new String(message));
+//
+//      for (int i = 0; i < NUM_MESSAGES; i++) {
+//         String payload = "This is Test Message: " + i;
+//         sendMessage = s.createTextMessage(payload);
+//         producer.send(sendMessage);
+//         message = provider.receive(5000);
+//         assertNotNull("Should get a message", message);
+//
+//         assertEquals(payload, new String(message));
+//      }
+//      provider.disconnect();
+//      activeMQConnection.close();
+//   }
+
+   @Test(timeout = 60 * 1000)
+   public void testPingKeepsInactivityMonitorAlive() throws Exception
+   {
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId("foo");
+      mqtt.setKeepAlive((short) 2);
+      final BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+
+      assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition()
+      {
+
+         @Override
+         public boolean isSatisified() throws Exception
+         {
+            return connection.isConnected();
+         }
+      }));
+
+      connection.disconnect();
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testTurnOffInactivityMonitor() throws Exception
+   {
+      stopBroker();
+      protocolConfig = "transport.useInactivityMonitor=false";
+      startBroker();
+
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId("foo3");
+      mqtt.setKeepAlive((short) 2);
+      final BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+
+      assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition()
+      {
+
+         @Override
+         public boolean isSatisified() throws Exception
+         {
+            return connection.isConnected();
+         }
+      }));
+
+      connection.disconnect();
+   }
+
+   @Ignore
+   @Test(timeout = 60 * 1000)
+   // TODO Make dollar topics configurable in code base.
+   public void testPublishDollarTopics() throws Exception
+   {
+      MQTT mqtt = createMQTTConnection();
+      final String clientId = "publishDollar";
+      mqtt.setClientId(clientId);
+      mqtt.setKeepAlive((short) 2);
+      BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+
+      final String DOLLAR_TOPIC = "$TopicA";
+      connection.subscribe(new Topic[]{new Topic(DOLLAR_TOPIC, QoS.EXACTLY_ONCE)});
+      connection.publish(DOLLAR_TOPIC, DOLLAR_TOPIC.getBytes(), QoS.EXACTLY_ONCE, true);
+
+      Message message = connection.receive(10, TimeUnit.SECONDS);
+      assertNull("Publish enabled for $ Topics by default", message);
+      connection.disconnect();
+
+      stopBroker();
+      protocolConfig = "transport.publishDollarTopics=true";
+      startBroker();
+
+      mqtt = createMQTTConnection();
+      mqtt.setClientId(clientId);
+      mqtt.setKeepAlive((short) 2);
+      connection = mqtt.blockingConnection();
+      connection.connect();
+
+      connection.subscribe(new Topic[]{new Topic(DOLLAR_TOPIC, QoS.EXACTLY_ONCE)});
+      connection.publish(DOLLAR_TOPIC, DOLLAR_TOPIC.getBytes(), QoS.EXACTLY_ONCE, true);
+
+      message = connection.receive(10, TimeUnit.SECONDS);
+      assertNotNull(message);
+      message.ack();
+      assertEquals("Message body", DOLLAR_TOPIC, new String(message.getPayload()));
+
+      connection.disconnect();
+   }
+
+   @Ignore
+   @Test(timeout = 60 * 1000)
+   // TODO We currently do not support link stealing.  This needs to be enabled for this test to pass.
+   public void testDuplicateClientId() throws Exception
+   {
+      // test link stealing enabled by default
+      final String clientId = "duplicateClient";
+      MQTT mqtt = createMQTTConnection(clientId, false);
+      mqtt.setKeepAlive((short) 2);
+      final BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+      final String TOPICA = "TopicA";
+      connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+
+      MQTT mqtt1 = createMQTTConnection(clientId, false);
+      mqtt1.setKeepAlive((short) 2);
+      final BlockingConnection connection1 = mqtt1.blockingConnection();
+      connection1.connect();
+
+      assertTrue("Duplicate client disconnected", Wait.waitFor(new Wait.Condition()
+      {
+         @Override
+         public boolean isSatisified() throws Exception
+         {
+            return connection1.isConnected();
+         }
+      }));
+
+      assertTrue("Old client still connected", Wait.waitFor(new Wait.Condition()
+      {
+         @Override
+         public boolean isSatisified() throws Exception
+         {
+            return !connection.isConnected();
+         }
+      }));
+
+      connection1.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+      connection1.disconnect();
+
+      // disable link stealing
+      stopBroker();
+      protocolConfig = "allowLinkStealing=false";
+      startBroker();
+
+      mqtt = createMQTTConnection(clientId, false);
+      mqtt.setKeepAlive((short) 2);
+      final BlockingConnection connection2 = mqtt.blockingConnection();
+      connection2.connect();
+      connection2.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+
+      mqtt1 = createMQTTConnection(clientId, false);
+      mqtt1.setKeepAlive((short) 2);
+      final BlockingConnection connection3 = mqtt1.blockingConnection();
+      try
+      {
+         connection3.connect();
+         fail("Duplicate client connected");
+      }
+      catch (Exception e)
+      {
+         // ignore
+      }
+
+      assertTrue("Old client disconnected", connection2.isConnected());
+      connection2.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true);
+      connection2.disconnect();
+   }
+
+   // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
+//   @Test(timeout = 30 * 10000)
+//   public void testJmsMapping() throws Exception {
+//      doTestJmsMapping("test.foo");
+//   }
+
+   // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
+//   public void doTestJmsMapping(String destinationName) throws Exception {
+//      // start up jms consumer
+//      Connection jmsConn = cf.createConnection();
+//      Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//      Destination dest = session.createTopic(destinationName);
+//      MessageConsumer consumer = session.createConsumer(dest);
+//      jmsConn.start();
+//
+//      // set up mqtt producer
+//      MQTT mqtt = createMQTTConnection();
+//      mqtt.setClientId("foo3");
+//      mqtt.setKeepAlive((short) 2);
+//      final BlockingConnection connection = mqtt.blockingConnection();
+//      connection.connect();
+//
+//      int messagesToSend = 5;
+//
+//      // publish
+//      for (int i = 0; i < messagesToSend; ++i) {
+//         connection.publish("test/foo", "hello world".getBytes(), QoS.AT_LEAST_ONCE, false);
+//      }
+//
+//      connection.disconnect();
+//
+//      for (int i = 0; i < messagesToSend; i++) {
+//
+//         javax.jms.Message message = consumer.receive(2 * 1000);
+//         assertNotNull(message);
+//         assertTrue(message instanceof BytesMessage);
+//         BytesMessage bytesMessage = (BytesMessage) message;
+//
+//         int length = (int) bytesMessage.getBodyLength();
+//         byte[] buffer = new byte[length];
+//         bytesMessage.readBytes(buffer);
+//         assertEquals("hello world", new String(buffer));
+//      }
+//
+//      jmsConn.close();
+//   }
+
+   @Test(timeout = 30 * 10000)
+   public void testSubscribeMultipleTopics() throws Exception
+   {
+
+      byte[] payload = new byte[1024 * 32];
+      for (int i = 0; i < payload.length; i++)
+      {
+         payload[i] = '2';
+      }
+
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId("MQTT-Client");
+      mqtt.setCleanSession(false);
+
+      final BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+
+      Topic[] topics = {new Topic("Topic/A", QoS.EXACTLY_ONCE), new Topic("Topic/B", QoS.EXACTLY_ONCE)};
+      Topic[] wildcardTopic = {new Topic("Topic/#", QoS.AT_LEAST_ONCE)};
+      connection.subscribe(wildcardTopic);
+
+      for (Topic topic : topics)
+      {
+         connection.publish(topic.name().toString(), payload, QoS.AT_LEAST_ONCE, false);
+      }
+
+      int received = 0;
+      for (int i = 0; i < topics.length; ++i)
+      {
+         Message message = connection.receive();
+         assertNotNull(message);
+         received++;
+         payload = message.getPayload();
+         String messageContent = new String(payload);
+         LOG.info("Received message from topic: " + message.getTopic() + " Message content: " + messageContent);
+         message.ack();
+      }
+
+      assertEquals("Should have received " + topics.length + " messages", topics.length, received);
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testReceiveMessageSentWhileOffline() throws Exception
+   {
+      final byte[] payload = new byte[1024 * 32];
+      for (int i = 0; i < payload.length; i++)
+      {
+         payload[i] = '2';
+      }
+
+      int numberOfRuns = 100;
+      int messagesPerRun = 2;
+
+      final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true);
+      final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
+
+      final BlockingConnection connectionPub = mqttPub.blockingConnection();
+      connectionPub.connect();
+
+      BlockingConnection connectionSub = mqttSub.blockingConnection();
+      connectionSub.connect();
+
+      Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
+      connectionSub.subscribe(topics);
+
+      for (int i = 0; i < messagesPerRun; ++i)
+      {
+         connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
+      }
+
+      int received = 0;
+      for (int i = 0; i < messagesPerRun; ++i)
+      {
+         Message message = connectionSub.receive(5, TimeUnit.SECONDS);
+         assertNotNull(message);
+         received++;
+         assertTrue(Arrays.equals(payload, message.getPayload()));
+         message.ack();
+      }
+      connectionSub.disconnect();
+
+      for (int j = 0; j < numberOfRuns; j++)
+      {
+
+         for (int i = 0; i < messagesPerRun; ++i)
+         {
+            connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
+         }
+
+         connectionSub = mqttSub.blockingConnection();
+         connectionSub.connect();
+         connectionSub.subscribe(topics);
+
+         for (int i = 0; i < messagesPerRun; ++i)
+         {
+            Message message = connectionSub.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            received++;
+            assertTrue(Arrays.equals(payload, message.getPayload()));
+            message.ack();
+         }
+         connectionSub.disconnect();
+      }
+      assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
+   }
+
+   @Test(timeout = 30 * 1000)
+   public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception
+   {
+      stopBroker();
+      protocolConfig = "transport.defaultKeepAlive=2000";
+      startBroker();
+
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId("foo");
+      mqtt.setKeepAlive((short) 0);
+      final BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+
+      assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition()
+      {
+
+         @Override
+         public boolean isSatisified() throws Exception
+         {
+            return connection.isConnected();
+         }
+      }));
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testReuseConnection() throws Exception
+   {
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId("Test-Client");
+
+      {
+         BlockingConnection connection = mqtt.blockingConnection();
+         connection.connect();
+         connection.disconnect();
+         Thread.sleep(1000);
+      }
+      {
+         BlockingConnection connection = mqtt.blockingConnection();
+         connection.connect();
+         connection.disconnect();
+         Thread.sleep(1000);
+      }
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testNoMessageReceivedAfterUnsubscribeMQTT() throws Exception
+   {
+      Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
+
+      MQTT mqttPub = createMQTTConnection("MQTTPub-Client", true);
+      // mqttPub.setVersion("3.1.1");
+
+      MQTT mqttSub = createMQTTConnection("MQTTSub-Client", false);
+      // mqttSub.setVersion("3.1.1");
+
+      BlockingConnection connectionPub = mqttPub.blockingConnection();
+      connectionPub.connect();
+
+      BlockingConnection connectionSub = mqttSub.blockingConnection();
+      connectionSub.connect();
+      connectionSub.subscribe(topics);
+      connectionSub.disconnect();
+
+      for (int i = 0; i < 5; i++)
+      {
+         String payload = "Message " + i;
+         connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false);
+      }
+
+      connectionSub = mqttSub.blockingConnection();
+      connectionSub.connect();
+
+      int received = 0;
+      for (int i = 0; i < 5; ++i)
+      {
+         Message message = connectionSub.receive(5, TimeUnit.SECONDS);
+         assertNotNull("Missing message " + i, message);
+         LOG.info("Message is " + new String(message.getPayload()));
+         received++;
+         message.ack();
+      }
+      assertEquals(5, received);
+
+      // unsubscribe from topic
+      connectionSub.unsubscribe(new String[]{"TopicA"});
+
+      // send more messages
+      for (int i = 0; i < 5; i++)
+      {
+         String payload = "Message " + i;
+         connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false);
+      }
+
+      // these should not be received
+      assertNull(connectionSub.receive(5, TimeUnit.SECONDS));
+
+      connectionSub.disconnect();
+      connectionPub.disconnect();
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testMQTT311Connection() throws Exception
+   {
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId("foo");
+      mqtt.setVersion("3.1.1");
+      final BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+      connection.disconnect();
+   }
+
+   // TODO This should be reworked to align with Artemis recovery.
+//   @Test(timeout = 60 * 1000)
+//   public void testActiveMQRecoveryPolicy() throws Exception {
+//      // test with ActiveMQ LastImageSubscriptionRecoveryPolicy
+//      final PolicyMap policyMap = new PolicyMap();
+//      final PolicyEntry policyEntry = new PolicyEntry();
+//      policyEntry.setSubscriptionRecoveryPolicy(new LastImageSubscriptionRecoveryPolicy());
+//      policyMap.put(new ActiveMQTopic(">"), policyEntry);
+//      brokerService.setDestinationPolicy(policyMap);
+//
+//      MQTT mqtt = createMQTTConnection("pub-sub", true);
+//      final int[] retain = new int[1];
+//      final int[] nonretain  = new int[1];
+//      mqtt.setTracer(new Tracer() {
+//         @Override
+//         public void onReceive(MQTTFrame frame) {
+//            if (frame.messageType() == PUBLISH.TYPE) {
+//               LOG.info("Received message with retain=" + frame.retain());
+//               if (frame.retain()) {
+//                  retain[0]++;
+//               } else {
+//                  nonretain[0]++;
+//               }
+//            }
+//         }
+//      });
+//
+//      BlockingConnection connection = mqtt.blockingConnection();
+//      connection.connect();
+//      final String RETAINED = "RETAINED";
+//      connection.publish("one", RETAINED.getBytes(), QoS.AT_LEAST_ONCE, true);
+//      connection.publish("two", RETAINED.getBytes(), QoS.AT_LEAST_ONCE, true);
+//
+//      final String NONRETAINED = "NONRETAINED";
+//      connection.publish("one", NONRETAINED.getBytes(), QoS.AT_LEAST_ONCE, false);
+//      connection.publish("two", NONRETAINED.getBytes(), QoS.AT_LEAST_ONCE, false);
+//
+//      connection.subscribe(new Topic[]{new Topic("#", QoS.AT_LEAST_ONCE)});
+//      for (int i = 0; i < 4; i++) {
+//         final Message message = connection.receive(30, TimeUnit.SECONDS);
+//         assertNotNull("Should receive 4 messages", message);
+//         message.ack();
+//      }
+//      assertEquals("Should receive 2 retained messages", 2, retain[0]);
+//      assertEquals("Should receive 2 non-retained messages", 2, nonretain[0]);
+//   }
+
+   // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
+//   @Test(timeout = 60 * 1000)
+//   public void testSendMQTTReceiveJMSVirtualTopic() throws Exception {
+//
+//      final MQTTClientProvider provider = getMQTTClientProvider();
+//      initializeConnection(provider);
+//      final String DESTINATION_NAME = "Consumer.jms.VirtualTopic.TopicA";
+//
+//      // send retained message
+//      final String RETAINED = "RETAINED";
+//      final String MQTT_DESTINATION_NAME = "VirtualTopic/TopicA";
+//      provider.publish(MQTT_DESTINATION_NAME, RETAINED.getBytes(), AT_LEAST_ONCE, true);
+//
+//      ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(jmsUri).createConnection();
+//      // MUST set to true to receive retained messages
+//      activeMQConnection.setUseRetroactiveConsumer(true);
+//      activeMQConnection.start();
+//      Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//      Queue jmsQueue = s.createQueue(DESTINATION_NAME);
+//      MessageConsumer consumer = s.createConsumer(jmsQueue);
+//
+//      // check whether we received retained message on JMS subscribe
+//      ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
+//      assertNotNull("Should get retained message", message);
+//      ByteSequence bs = message.getContent();
+//      assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
+//      assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
+//
+//      for (int i = 0; i < NUM_MESSAGES; i++) {
+//         String payload = "Test Message: " + i;
+//         provider.publish(MQTT_DESTINATION_NAME, payload.getBytes(), AT_LEAST_ONCE);
+//         message = (ActiveMQMessage) consumer.receive(5000);
+//         assertNotNull("Should get a message", message);
+//         bs = message.getContent();
+//         assertEquals(payload, new String(bs.data, bs.offset, bs.length));
+//      }
+//
+//      // re-create consumer and check we received retained message again
+//      consumer.close();
+//      consumer = s.createConsumer(jmsQueue);
+//      message = (ActiveMQMessage) consumer.receive(5000);
+//      assertNotNull("Should get retained message", message);
+//      bs = message.getContent();
+//      assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
+//      assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
+//
+//      activeMQConnection.close();
+//      provider.disconnect();
+//   }
+
+   @Test(timeout = 60 * 1000)
+   public void testPingOnMQTT() throws Exception
+   {
+      stopBroker();
+      protocolConfig = "maxInactivityDuration=-1";
+      startBroker();
+
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId("test-mqtt");
+      mqtt.setKeepAlive((short) 2);
+      final BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+      assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition()
+      {
+
+         @Override
+         public boolean isSatisified() throws Exception
+         {
+            return connection.isConnected();
+         }
+      }));
+
+      connection.disconnect();
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
new file mode 100644
index 0000000..fd802ec
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.File;
+import java.io.IOException;
+import java.security.ProtectionDomain;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Tracer;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MQTTTestSupport extends ActiveMQTestBase
+{
+   private ActiveMQServer server;
+
+   private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class);
+
+   protected int port = 1883;
+   protected ActiveMQConnectionFactory cf;
+   protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
+   protected boolean persistent;
+   protected String protocolConfig;
+   protected String protocolScheme;
+   protected boolean useSSL;
+
+   public static final int AT_MOST_ONCE = 0;
+   public static final int AT_LEAST_ONCE = 1;
+   public static final int EXACTLY_ONCE = 2;
+
+   @Rule
+   public TestName name = new TestName();
+
+   public MQTTTestSupport()
+   {
+      this.protocolScheme = "mqtt";
+      this.useSSL = false;
+      cf = new ActiveMQConnectionFactory(false, new TransportConfiguration(ActiveMQTestBase.NETTY_CONNECTOR_FACTORY));
+   }
+
+   public File basedir() throws IOException
+   {
+      ProtectionDomain protectionDomain = getClass().getProtectionDomain();
+      return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
+   }
+
+
+   public MQTTTestSupport(String connectorScheme, boolean useSSL)
+   {
+      this.protocolScheme = connectorScheme;
+      this.useSSL = useSSL;
+   }
+
+   public String getName()
+   {
+      return name.getMethodName();
+   }
+
+   @Before
+   public void setUp() throws Exception
+   {
+      String basedir = basedir().getPath();
+      System.setProperty("javax.net.ssl.trustStore", basedir + "/src/test/resources/client.keystore");
+      System.setProperty("javax.net.ssl.trustStorePassword", "password");
+      System.setProperty("javax.net.ssl.trustStoreType", "jks");
+      System.setProperty("javax.net.ssl.keyStore", basedir + "/src/test/resources/server.keystore");
+      System.setProperty("javax.net.ssl.keyStorePassword", "password");
+      System.setProperty("javax.net.ssl.keyStoreType", "jks");
+
+      exceptions.clear();
+      startBroker();
+   }
+
+   @After
+   public void tearDown() throws Exception
+   {
+      stopBroker();
+   }
+
+   public void startBroker() throws Exception
+   {
+      // TODO Add SSL
+      super.setUp();
+      server = createServer(true, true);
+      addCoreConnector();
+      addMQTTConnector();
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setMaxSizeBytes(999999999);
+      server.getAddressSettingsRepository().addMatch("#", addressSettings);
+      server.start();
+      server.waitForActivation(10, TimeUnit.SECONDS);
+   }
+
+   protected void addCoreConnector() throws Exception
+   {
+      // Overrides of this method can add additional configuration options or add multiple
+      // MQTT transport connectors as needed, the port variable is always supposed to be
+      // assigned the primary MQTT connector's port.
+
+      HashMap<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.PORT_PROP_NAME, "" + 5445);
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, "CORE");
+      TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+      server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
+
+      LOG.info("Added connector {} to broker", getProtocolScheme());
+   }
+
+   protected void addMQTTConnector() throws Exception
+   {
+      // Overrides of this method can add additional configuration options or add multiple
+      // MQTT transport connectors as needed, the port variable is always supposed to be
+      // assigned the primary MQTT connector's port.
+
+      HashMap<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.PORT_PROP_NAME, "" + port);
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, "MQTT");
+      TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
+      server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
+
+      LOG.info("Added connector {} to broker", getProtocolScheme());
+   }
+
+   public void stopBroker() throws Exception
+   {
+      if (server.isStarted())
+      {
+         server.stop();
+         server = null;
+      }
+   }
+
+   protected String getQueueName()
+   {
+      return getClass().getName() + "." + name.getMethodName();
+   }
+
+   protected String getTopicName()
+   {
+      return getClass().getName() + "." + name.getMethodName();
+   }
+
+   /**
+    * Initialize an MQTTClientProvider instance.  By default this method uses the port that's
+    * assigned to be the TCP based port using the base version of addMQTTConnector.  A subclass
+    * can either change the value of port or override this method to assign the correct port.
+    *
+    * @param provider the MQTTClientProvider instance to initialize.
+    * @throws Exception if an error occurs during initialization.
+    */
+   protected void initializeConnection(MQTTClientProvider provider) throws Exception
+   {
+      if (!isUseSSL())
+      {
+         provider.connect("tcp://localhost:" + port);
+      }
+      else
+      {
+         SSLContext ctx = SSLContext.getInstance("TLS");
+         ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
+         provider.setSslContext(ctx);
+         provider.connect("ssl://localhost:" + port);
+      }
+   }
+
+   public String getProtocolScheme()
+   {
+      return protocolScheme;
+   }
+
+   public void setProtocolScheme(String scheme)
+   {
+      this.protocolScheme = scheme;
+   }
+
+   public boolean isUseSSL()
+   {
+      return this.useSSL;
+   }
+
+   public void setUseSSL(boolean useSSL)
+   {
+      this.useSSL = useSSL;
+   }
+
+   public boolean isPersistent()
+   {
+      return persistent;
+   }
+
+   public int getPort()
+   {
+      return this.port;
+   }
+
+   public boolean isSchedulerSupportEnabled()
+   {
+      return false;
+   }
+
+   protected interface Task
+   {
+      void run() throws Exception;
+   }
+
+   protected void within(int time, TimeUnit unit, Task task) throws InterruptedException
+   {
+      long timeMS = unit.toMillis(time);
+      long deadline = System.currentTimeMillis() + timeMS;
+      while (true)
+      {
+         try
+         {
+            task.run();
+            return;
+         }
+         catch (Throwable e)
+         {
+            long remaining = deadline - System.currentTimeMillis();
+            if (remaining <= 0)
+            {
+               if (e instanceof RuntimeException)
+               {
+                  throw (RuntimeException) e;
+               }
+               if (e instanceof Error)
+               {
+                  throw (Error) e;
+               }
+               throw new RuntimeException(e);
+            }
+            Thread.sleep(Math.min(timeMS / 10, remaining));
+         }
+      }
+   }
+
+   protected MQTTClientProvider getMQTTClientProvider()
+   {
+      return new FuseMQTTClientProvider();
+   }
+
+   protected MQTT createMQTTConnection() throws Exception
+   {
+      MQTT client = createMQTTConnection(null, false);
+      client.setVersion("3.1.1");
+      return client;
+   }
+
+   protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception
+   {
+      if (isUseSSL())
+      {
+         return createMQTTSslConnection(clientId, clean);
+      }
+      else
+      {
+         return createMQTTTcpConnection(clientId, clean);
+      }
+   }
+
+   private MQTT createMQTTTcpConnection(String clientId, boolean clean) throws Exception
+   {
+      MQTT mqtt = new MQTT();
+      mqtt.setConnectAttemptsMax(1);
+      mqtt.setReconnectAttemptsMax(0);
+      mqtt.setTracer(createTracer());
+      mqtt.setVersion("3.1.1");
+      if (clientId != null)
+      {
+         mqtt.setClientId(clientId);
+      }
+      mqtt.setCleanSession(clean);
+      mqtt.setHost("localhost", port);
+      return mqtt;
+   }
+
+   private MQTT createMQTTSslConnection(String clientId, boolean clean) throws Exception
+   {
+      MQTT mqtt = new MQTT();
+      mqtt.setConnectAttemptsMax(1);
+      mqtt.setReconnectAttemptsMax(0);
+      mqtt.setTracer(createTracer());
+      mqtt.setHost("ssl://localhost:" + port);
+      if (clientId != null)
+      {
+         mqtt.setClientId(clientId);
+      }
+      mqtt.setCleanSession(clean);
+
+      SSLContext ctx = SSLContext.getInstance("TLS");
+      ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
+      mqtt.setSslContext(ctx);
+      return mqtt;
+   }
+
+   protected Tracer createTracer()
+   {
+      return new Tracer()
+      {
+         @Override
+         public void onReceive(MQTTFrame frame)
+         {
+            LOG.info("Client Received:\n" + frame);
+         }
+
+         @Override
+         public void onSend(MQTTFrame frame)
+         {
+            LOG.info("Client Sent:\n" + frame);
+         }
+
+         @Override
+         public void debug(String message, Object... args)
+         {
+            LOG.info(String.format(message, args));
+         }
+      };
+   }
+
+   static class DefaultTrustManager implements X509TrustManager
+   {
+
+      @Override
+      public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException
+      {
+      }
+
+      @Override
+      public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException
+      {
+      }
+
+      @Override
+      public X509Certificate[] getAcceptedIssuers()
+      {
+         return new X509Certificate[0];
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0f82ca75/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.java
new file mode 100644
index 0000000..9a1313b
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/PahoMQTTTest.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTLogger;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.junit.Test;
+
+public class PahoMQTTTest extends MQTTTestSupport
+{
+
+   private static MQTTLogger LOG = MQTTLogger.LOGGER;
+
+   @Test(timeout = 300000)
+   public void testLotsOfClients() throws Exception
+   {
+
+      final int CLIENTS = Integer.getInteger("PahoMQTTTest.CLIENTS", 100);
+      LOG.info("Using: {} clients: " + CLIENTS);
+
+      final AtomicInteger receiveCounter = new AtomicInteger();
+      MqttClient client = createPahoClient("consumer");
+      client.setCallback(new MqttCallback()
+      {
+         @Override
+         public void connectionLost(Throwable cause)
+         {
+         }
+
+         @Override
+         public void messageArrived(String topic, MqttMessage message) throws Exception
+         {
+            receiveCounter.incrementAndGet();
+         }
+
+         @Override
+         public void deliveryComplete(IMqttDeliveryToken token)
+         {
+         }
+      });
+      client.connect();
+      client.subscribe("test");
+
+      final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>();
+      final CountDownLatch connectedDoneLatch = new CountDownLatch(CLIENTS);
+      final CountDownLatch disconnectDoneLatch = new CountDownLatch(CLIENTS);
+      final CountDownLatch sendBarrier = new CountDownLatch(1);
+
+      for (int i = 0; i < CLIENTS; i++)
+      {
+         Thread.sleep(10);
+         new Thread(null, null, "client:" + i)
+         {
+            @Override
+            public void run()
+            {
+               try
+               {
+                  MqttClient client = createPahoClient(Thread.currentThread().getName());
+                  client.connect();
+                  connectedDoneLatch.countDown();
+                  sendBarrier.await();
+                  for (int i = 0; i < 10; i++)
+                  {
+                     Thread.sleep(1000);
+                     client.publish("test", "hello".getBytes(), 1, false);
+                  }
+                  client.disconnect();
+                  client.close();
+               }
+               catch (Throwable e)
+               {
+                  e.printStackTrace();
+                  asyncError.set(e);
+               }
+               finally
+               {
+                  disconnectDoneLatch.countDown();
+               }
+            }
+         }.start();
+      }
+
+      connectedDoneLatch.await();
+      assertNull("Async error: " + asyncError.get(), asyncError.get());
+      sendBarrier.countDown();
+
+      LOG.info("All clients connected... waiting to receive sent messages...");
+
+      // We should eventually get all the messages.
+      within(30, TimeUnit.SECONDS, new Task()
+      {
+         @Override
+         public void run() throws Exception
+         {
+            assertTrue(receiveCounter.get() == CLIENTS * 10);
+         }
+      });
+
+      LOG.info("All messages received.");
+
+      disconnectDoneLatch.await();
+      assertNull("Async error: " + asyncError.get(), asyncError.get());
+   }
+
+   @Test(timeout = 300000)
+   public void testSendAndReceiveMQTT() throws Exception
+   {
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      MqttClient consumer = createPahoClient("consumerId");
+      MqttClient producer = createPahoClient("producerId");
+
+      consumer.connect();
+      consumer.subscribe("test");
+      consumer.setCallback(new MqttCallback()
+      {
+         @Override
+         public void connectionLost(Throwable cause)
+         {
+
+         }
+
+         @Override
+         public void messageArrived(String topic, MqttMessage message) throws Exception
+         {
+            latch.countDown();
+         }
+
+         @Override
+         public void deliveryComplete(IMqttDeliveryToken token)
+         {
+
+         }
+      });
+
+      producer.connect();
+      producer.publish("test", "hello".getBytes(), 1, false);
+
+      waitForLatch(latch);
+      producer.disconnect();
+      producer.close();
+   }
+
+   private MqttClient createPahoClient(String clientId) throws MqttException
+   {
+      return new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence());
+   }
+
+}
\ No newline at end of file