You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/05/25 18:17:41 UTC

[1/3] activemq-artemis git commit: Artemis-233 Support JMS BytesMessage -> MQTT

Repository: activemq-artemis
Updated Branches:
  refs/heads/master a31355879 -> 0373abf25


Artemis-233 Support JMS BytesMessage -> MQTT


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0d8a5658
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0d8a5658
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0d8a5658

Branch: refs/heads/master
Commit: 0d8a5658369da566e9f2e3b2f47a9ae4cdf6b24d
Parents: 3c7c2ed
Author: Martyn Taylor <mt...@redhat.com>
Authored: Wed May 25 17:57:38 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Wed May 25 17:59:00 2016 +0100

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTPublishManager.java  |   6 +-
 .../integration/mqtt/imported/MQTTTest.java     | 165 +++++++++----------
 2 files changed, 82 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0d8a5658/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 19bbff8..93d0bd2 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -223,7 +223,11 @@ public class MQTTPublishManager {
 
    private int decideQoS(ServerMessage message, ServerConsumer consumer) {
       int subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
-      int qos = message.getIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY);
+
+      int qos = 2;
+      if (message.containsProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY)) {
+         qos = message.getIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY);
+      }
 
       /* Subscription QoS is the maximum QoS the client is willing to receive for this subscription.  If the message QoS
       is less than the subscription QoS then use it, otherwise use the subscription qos). */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0d8a5658/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 257cf8f..b305c80 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -18,7 +18,9 @@ package org.apache.activemq.artemis.tests.integration.mqtt.imported;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
 import java.lang.reflect.Field;
 import java.net.ProtocolException;
@@ -50,7 +52,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.vertx.java.core.impl.ConcurrentHashSet;
 
-/**QT
+/**
+ * QT
  * MQTT Test imported from ActiveMQ MQTT component.
  */
 public class MQTTTest extends MQTTTestSupport {
@@ -1001,10 +1004,6 @@ public class MQTTTest extends MQTTTestSupport {
       notClean.disconnect();
    }
 
-   /* TODO These Cross protocol tests were imported from ActiveMQ and need reworking to apply to Artemis.  There is an
-   outstanding task to add cross protocol support.  This task should rework these tests.  The tests are included here
-   and commented out to ensure ActiveMQ and Artemis tests are in sync. */
-
    @Test(timeout = 60 * 1000)
    public void testSendMQTTReceiveJMS() throws Exception {
       doTestSendMQTTReceiveJMS("foo.*", "foo/bar");
@@ -1053,45 +1052,36 @@ public class MQTTTest extends MQTTTestSupport {
       provider.disconnect();
    }
 
-//   @Test(timeout = 2 * 60 * 1000)
-//   public void testSendJMSReceiveMQTT() throws Exception {
-//      doTestSendJMSReceiveMQTT("foo.far");
-//   }
-//
-//   public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
-//      final MQTTClientProvider provider = getMQTTClientProvider();
-//      initializeConnection(provider);
-//
-//      Connection connection = cf.createConnection();
-//      connection.start();
-//
-//      Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//      javax.jms.Queue queue = s.createQueue(destinationName);
-//      MessageProducer producer = s.createProducer(queue);
-//
-//      // send retained message from JMS
-//      final String RETAINED = "RETAINED";
-//      TextMessage sendMessage = s.createTextMessage(RETAINED);
-//      sendMessage.setIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY, 0);
-//      producer.send(sendMessage);
-//
-//      provider.subscribe("jms/queue/foo/+", AT_MOST_ONCE);
-//      byte[] message = provider.receive(10000);
-//      assertNotNull("Should get retained message", message);
-//      assertEquals(RETAINED, new String(message));
-//
-//      for (int i = 0; i < NUM_MESSAGES; i++) {
-//         String payload = "This is Test Message: " + i;
-//         sendMessage = s.createTextMessage(payload);
-//         producer.send(sendMessage);
-//         message = provider.receive(5000);
-//         assertNotNull("Should get a message", message);
-//
-//         assertEquals(payload, new String(message));
-//      }
-//      provider.disconnect();
-//      connection.close();
-//   }
+   @Test(timeout = 2 * 60 * 1000)
+   public void testSendJMSReceiveMQTT() throws Exception {
+      doTestSendJMSReceiveMQTT("foo.far");
+   }
+
+   public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
+      final MQTTClientProvider provider = getMQTTClientProvider();
+      initializeConnection(provider);
+      provider.subscribe("jms/queue/foo/+", AT_MOST_ONCE);
+
+      Connection connection = cf.createConnection();
+      connection.start();
+
+      Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue queue = s.createQueue(destinationName);
+      MessageProducer producer = s.createProducer(queue);
+
+      // send retained message from JMS
+      final byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+      BytesMessage bytesMessage = s.createBytesMessage();
+      bytesMessage.writeBytes(bytes);
+      producer.send(bytesMessage);
+
+      byte[] message = provider.receive(10000);
+      assertNotNull("Should get retained message", message);
+      assertArrayEquals(bytes, message);
+
+      provider.disconnect();
+      connection.close();
+   }
 
    @Test(timeout = 60 * 1000)
    public void testPingKeepsInactivityMonitorAlive() throws Exception {
@@ -1237,52 +1227,51 @@ public class MQTTTest extends MQTTTestSupport {
       connection2.disconnect();
    }
 
-   // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
-   //   @Test(timeout = 30 * 10000)
-   //   public void testJmsMapping() throws Exception {
-   //      doTestJmsMapping("test.foo");
-   //   }
+   @Test(timeout = 30 * 10000)
+   public void testJmsMapping() throws Exception {
+      doTestJmsMapping("test.foo");
+   }
 
    // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
-   //   public void doTestJmsMapping(String destinationName) throws Exception {
-   //      // start up jms consumer
-   //      Connection jmsConn = cf.createConnection();
-   //      Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-   //      Destination dest = session.createTopic(destinationName);
-   //      MessageConsumer consumer = session.createConsumer(dest);
-   //      jmsConn.start();
-   //
-   //      // set up mqtt producer
-   //      MQTT mqtt = createMQTTConnection();
-   //      mqtt.setClientId("foo3");
-   //      mqtt.setKeepAlive((short) 2);
-   //      final BlockingConnection connection = mqtt.blockingConnection();
-   //      connection.connect();
-   //
-   //      int messagesToSend = 5;
-   //
-   //      // publish
-   //      for (int i = 0; i < messagesToSend; ++i) {
-   //         connection.publish("test/foo", "hello world".getBytes(), QoS.AT_LEAST_ONCE, false);
-   //      }
-   //
-   //      connection.disconnect();
-   //
-   //      for (int i = 0; i < messagesToSend; i++) {
-   //
-   //         javax.jms.Message message = consumer.receive(2 * 1000);
-   //         assertNotNull(message);
-   //         assertTrue(message instanceof BytesMessage);
-   //         BytesMessage bytesMessage = (BytesMessage) message;
-   //
-   //         int length = (int) bytesMessage.getBodyLength();
-   //         byte[] buffer = new byte[length];
-   //         bytesMessage.readBytes(buffer);
-   //         assertEquals("hello world", new String(buffer));
-   //      }
-   //
-   //      jmsConn.close();
-   //   }
+   public void doTestJmsMapping(String destinationName) throws Exception {
+      // start up jms consumer
+      Connection jmsConn = cf.createConnection();
+      Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Destination dest = session.createQueue(destinationName);
+      MessageConsumer consumer = session.createConsumer(dest);
+      jmsConn.start();
+
+      // set up mqtt producer
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId("foo3");
+      mqtt.setKeepAlive((short) 2);
+      final BlockingConnection connection = mqtt.blockingConnection();
+      connection.connect();
+
+      int messagesToSend = 5;
+
+      // publish
+      for (int i = 0; i < messagesToSend; ++i) {
+         connection.publish("jms/queue/test/foo", "hello world".getBytes(), QoS.AT_LEAST_ONCE, false);
+      }
+
+      connection.disconnect();
+
+      for (int i = 0; i < messagesToSend; i++) {
+
+         javax.jms.Message message = consumer.receive(2 * 1000);
+         assertNotNull(message);
+         assertTrue(message instanceof BytesMessage);
+         BytesMessage bytesMessage = (BytesMessage) message;
+
+         int length = (int) bytesMessage.getBodyLength();
+         byte[] buffer = new byte[length];
+         bytesMessage.readBytes(buffer);
+         assertEquals("hello world", new String(buffer));
+      }
+
+      jmsConn.close();
+   }
 
    @Test(timeout = 30 * 10000)
    public void testSubscribeMultipleTopics() throws Exception {


[3/3] activemq-artemis git commit: This closes #541

Posted by cl...@apache.org.
This closes #541


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0373abf2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0373abf2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0373abf2

Branch: refs/heads/master
Commit: 0373abf2560926afcf6e1acaa4f645da58dc4b81
Parents: a313558 0d8a565
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed May 25 14:17:35 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed May 25 14:17:35 2016 -0400

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTPublishManager.java  |   6 +-
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |  10 +-
 .../integration/mqtt/imported/MQTTTest.java     | 253 +++++++++----------
 .../mqtt/imported/MQTTTestSupport.java          |   2 +
 4 files changed, 135 insertions(+), 136 deletions(-)
----------------------------------------------------------------------



[2/3] activemq-artemis git commit: ARTEMIS-233 Remove MQTT Address PreFix for cross protocol support

Posted by cl...@apache.org.
ARTEMIS-233 Remove MQTT Address PreFix for cross protocol support


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3c7c2ed5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3c7c2ed5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3c7c2ed5

Branch: refs/heads/master
Commit: 3c7c2ed5d34d705651c6797c52029205c43b301a
Parents: a313558
Author: Martyn Taylor <mt...@redhat.com>
Authored: Fri Sep 25 14:47:23 2015 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Wed May 25 17:59:00 2016 +0100

----------------------------------------------------------------------
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |  10 +-
 .../integration/mqtt/imported/MQTTTest.java     | 172 ++++++++++---------
 .../mqtt/imported/MQTTTestSupport.java          |   2 +
 3 files changed, 95 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c7c2ed5/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index aa2262a..2313248 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
 import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
 import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.server.ServerMessage;
@@ -52,8 +53,6 @@ public class MQTTUtil {
 
    public static final int MAX_MESSAGE_SIZE = 268435455;
 
-   public static final String MQTT_ADDRESS_PREFIX = "$sys.mqtt.";
-
    public static final String MQTT_RETAIN_ADDRESS_PREFIX = "$sys.mqtt.retain.";
 
    public static final String MQTT_QOS_LEVEL_KEY = "mqtt.qos.level";
@@ -67,16 +66,13 @@ public class MQTTUtil {
    public static final int DEFAULT_KEEP_ALIVE_FREQUENCY = 5000;
 
    public static String convertMQTTAddressFilterToCore(String filter) {
-      return MQTT_ADDRESS_PREFIX + swapMQTTAndCoreWildCards(filter);
+      return swapMQTTAndCoreWildCards(filter);
    }
 
    public static String convertCoreAddressFilterToMQTT(String filter) {
       if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) {
          filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
       }
-      else if (filter.startsWith(MQTT_ADDRESS_PREFIX)) {
-         filter = filter.substring(MQTT_ADDRESS_PREFIX.length(), filter.length());
-      }
       return swapMQTTAndCoreWildCards(filter);
    }
 
@@ -117,6 +113,8 @@ public class MQTTUtil {
       message.setAddress(address);
       message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
       message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
+      // For JMS Consumption
+      message.setType(Message.BYTES_TYPE);
       return message;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c7c2ed5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index f15af10..257cf8f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.artemis.tests.integration.mqtt.imported;
 
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
 import java.lang.reflect.Field;
 import java.net.ProtocolException;
 import java.util.ArrayList;
@@ -46,7 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.vertx.java.core.impl.ConcurrentHashSet;
 
-/**
+/**QT
  * MQTT Test imported from ActiveMQ MQTT component.
  */
 public class MQTTTest extends MQTTTestSupport {
@@ -1001,91 +1005,93 @@ public class MQTTTest extends MQTTTestSupport {
    outstanding task to add cross protocol support.  This task should rework these tests.  The tests are included here
    and commented out to ensure ActiveMQ and Artemis tests are in sync. */
 
-   //   @Test(timeout = 60 * 1000)
-   //   public void testSendMQTTReceiveJMS() throws Exception {
-   //      doTestSendMQTTReceiveJMS("foo.*");
-   //   }
+   @Test(timeout = 60 * 1000)
+   public void testSendMQTTReceiveJMS() throws Exception {
+      doTestSendMQTTReceiveJMS("foo.*", "foo/bar");
+   }
 
-   //   public void doTestSendMQTTReceiveJMS(String destinationName) throws Exception {
-   //      final MQTTClientProvider provider = getMQTTClientProvider();
-   //      initializeConnection(provider);
-   //
-   //      // send retained message
-   //      final String RETAINED = "RETAINED";
-   //      provider.publish("foo/bah", RETAINED.getBytes(), AT_LEAST_ONCE, true);
-   //
-   //      ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
-   //      // MUST set to true to receive retained messages
-   //      activeMQConnection.setUseRetroactiveConsumer(true);
-   //      activeMQConnection.start();
-   //      Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-   //      javax.jms.Topic jmsTopic = s.createTopic(destinationName);
-   //      MessageConsumer consumer = s.createConsumer(jmsTopic);
-   //
-   //      // check whether we received retained message on JMS subscribe
-   //      ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
-   //      assertNotNull("Should get retained message", message);
-   //      ByteSequence bs = message.getContent();
-   //      assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
-   //      assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
-   //
-   //      for (int i = 0; i < NUM_MESSAGES; i++) {
-   //         String payload = "Test Message: " + i;
-   //         provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
-   //         message = (ActiveMQMessage) consumer.receive(5000);
-   //         assertNotNull("Should get a message", message);
-   //         bs = message.getContent();
-   //         assertEquals(payload, new String(bs.data, bs.offset, bs.length));
-   //      }
-   //
-   //      activeMQConnection.close();
-   //      provider.disconnect();
-   //   }
+   public void doTestSendMQTTReceiveJMS(String jmsTopicAddress, String mqttAddress) throws Exception {
+      final MQTTClientProvider provider = getMQTTClientProvider();
+      initializeConnection(provider);
 
-   // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
-   //   @Test(timeout = 2 * 60 * 1000)
-   //   public void testSendJMSReceiveMQTT() throws Exception {
-   //      doTestSendJMSReceiveMQTT("foo.far");
-   //   }
+      // send retained message
+      final String address = "jms/queue/" + mqttAddress;
+      final String RETAINED = "RETAINED";
 
-   // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
-   //   public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
-   //      final MQTTClientProvider provider = getMQTTClientProvider();
-   //      initializeConnection(provider);
-   //
-   //      ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
-   //      activeMQConnection.setUseRetroactiveConsumer(true);
-   //      activeMQConnection.start();
-   //      Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-   //      javax.jms.Topic jmsTopic = s.createTopic(destinationName);
-   //      MessageProducer producer = s.createProducer(jmsTopic);
-   //
-   //      // send retained message from JMS
-   //      final String RETAINED = "RETAINED";
-   //      TextMessage sendMessage = s.createTextMessage(RETAINED);
-   //      // mark the message to be retained
-   //      sendMessage.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
-   //      // MQTT QoS can be set using MQTTProtocolConverter.QOS_PROPERTY_NAME property
-   //      sendMessage.setIntProperty(MQTTProtocolConverter.QOS_PROPERTY_NAME, 0);
-   //      producer.send(sendMessage);
-   //
-   //      provider.subscribe("foo/+", AT_MOST_ONCE);
-   //      byte[] message = provider.receive(10000);
-   //      assertNotNull("Should get retained message", message);
-   //      assertEquals(RETAINED, new String(message));
-   //
-   //      for (int i = 0; i < NUM_MESSAGES; i++) {
-   //         String payload = "This is Test Message: " + i;
-   //         sendMessage = s.createTextMessage(payload);
-   //         producer.send(sendMessage);
-   //         message = provider.receive(5000);
-   //         assertNotNull("Should get a message", message);
-   //
-   //         assertEquals(payload, new String(message));
-   //      }
-   //      provider.disconnect();
-   //      activeMQConnection.close();
-   //   }
+      final byte[] payload = RETAINED.getBytes();
+
+      Connection connection = cf.createConnection();
+      // MUST set to true to receive retained messages
+      connection.start();
+
+      Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue jmsQueue = s.createQueue(jmsTopicAddress);
+      MessageConsumer consumer = s.createConsumer(jmsQueue);
+
+      provider.publish(address, RETAINED.getBytes(), AT_LEAST_ONCE, true);
+
+      // check whether we received retained message on JMS subscribe
+      BytesMessage message = (BytesMessage) consumer.receive(5000);
+      assertNotNull("Should get retained message", message);
+
+      byte[] b = new byte[8];
+      message.readBytes(b);
+      assertArrayEquals(payload, b);
+
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         String p = "Test Message: " + i;
+         provider.publish(address, p.getBytes(), AT_LEAST_ONCE);
+         message = (BytesMessage) consumer.receive(5000);
+         assertNotNull("Should get a message", message);
+
+         byte[] bytePayload = new byte[p.getBytes().length];
+         message.readBytes(bytePayload);
+         assertArrayEquals(payload, b);
+      }
+
+      connection.close();
+      provider.disconnect();
+   }
+
+//   @Test(timeout = 2 * 60 * 1000)
+//   public void testSendJMSReceiveMQTT() throws Exception {
+//      doTestSendJMSReceiveMQTT("foo.far");
+//   }
+//
+//   public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
+//      final MQTTClientProvider provider = getMQTTClientProvider();
+//      initializeConnection(provider);
+//
+//      Connection connection = cf.createConnection();
+//      connection.start();
+//
+//      Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//      javax.jms.Queue queue = s.createQueue(destinationName);
+//      MessageProducer producer = s.createProducer(queue);
+//
+//      // send retained message from JMS
+//      final String RETAINED = "RETAINED";
+//      TextMessage sendMessage = s.createTextMessage(RETAINED);
+//      sendMessage.setIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY, 0);
+//      producer.send(sendMessage);
+//
+//      provider.subscribe("jms/queue/foo/+", AT_MOST_ONCE);
+//      byte[] message = provider.receive(10000);
+//      assertNotNull("Should get retained message", message);
+//      assertEquals(RETAINED, new String(message));
+//
+//      for (int i = 0; i < NUM_MESSAGES; i++) {
+//         String payload = "This is Test Message: " + i;
+//         sendMessage = s.createTextMessage(payload);
+//         producer.send(sendMessage);
+//         message = provider.receive(5000);
+//         assertNotNull("Should get a message", message);
+//
+//         assertEquals(payload, new String(message));
+//      }
+//      provider.disconnect();
+//      connection.close();
+//   }
 
    @Test(timeout = 60 * 1000)
    public void testPingKeepsInactivityMonitorAlive() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c7c2ed5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index 61fcec0..73489af 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -125,6 +125,8 @@ public class MQTTTestSupport extends ActiveMQTestBase {
       addMQTTConnector();
       AddressSettings addressSettings = new AddressSettings();
       addressSettings.setMaxSizeBytes(999999999);
+      addressSettings.setAutoCreateJmsQueues(true);
+
       server.getAddressSettingsRepository().addMatch("#", addressSettings);
       server.start();
       server.waitForActivation(10, TimeUnit.SECONDS);