You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/11/11 22:07:05 UTC

git commit: Adding an mqtt test that uses the eclipse paho client.

Updated Branches:
  refs/heads/trunk b0b3a169c -> bc9751ac2


Adding an mqtt test that uses the eclipse paho client.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bc9751ac
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bc9751ac
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bc9751ac

Branch: refs/heads/trunk
Commit: bc9751ac234d6db138b5b4f4dec786952f0b23ff
Parents: b0b3a16
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Mon Nov 11 16:06:59 2013 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Mon Nov 11 16:06:59 2013 -0500

----------------------------------------------------------------------
 activemq-mqtt/pom.xml                           |   6 +-
 .../transport/mqtt/AbstractMQTTTest.java        | 295 ------------------
 .../activemq/transport/mqtt/MQTTTest.java       | 302 ++++++++++++++++++-
 .../activemq/transport/mqtt/PahoMQTTTest.java   |  59 ++++
 pom.xml                                         |  15 +-
 5 files changed, 378 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/bc9751ac/activemq-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml
index 60b1e39..a1205e2 100755
--- a/activemq-mqtt/pom.xml
+++ b/activemq-mqtt/pom.xml
@@ -134,7 +134,11 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
-
+    <dependency>
+      <groupId>org.eclipse.paho</groupId>
+      <artifactId>mqtt-client</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc9751ac/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
index ac6cfc5..b976056 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
@@ -77,300 +77,6 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport {
         super.tearDown();
     }
 
-
-    @Test(timeout=300000)
-    public void testSendAndReceiveMQTT() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-        final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
-        initializeConnection(subscriptionProvider);
-
-
-        subscriptionProvider.subscribe("foo/bah",AT_MOST_ONCE);
-
-        final CountDownLatch latch = new CountDownLatch(numberOfMessages);
-
-        Thread thread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                for (int i = 0; i < numberOfMessages; i++){
-                    try {
-                        byte[] payload = subscriptionProvider.receive(10000);
-                        assertNotNull("Should get a message", payload);
-                        latch.countDown();
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                        break;
-                    }
-
-                }
-            }
-        });
-        thread.start();
-
-        final MQTTClientProvider publishProvider = getMQTTClientProvider();
-        initializeConnection(publishProvider);
-
-        for (int i = 0; i < numberOfMessages; i++){
-            String payload = "Message " + i;
-            publishProvider.publish("foo/bah",payload.getBytes(),AT_LEAST_ONCE);
-        }
-
-        latch.await(10, TimeUnit.SECONDS);
-        assertEquals(0, latch.getCount());
-        subscriptionProvider.disconnect();
-        publishProvider.disconnect();
-    }
-
-    @Test(timeout=300000)
-    public void testUnsubscribeMQTT() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-        final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
-        initializeConnection(subscriptionProvider);
-
-        String topic = "foo/bah";
-
-        subscriptionProvider.subscribe(topic,AT_MOST_ONCE);
-
-        final CountDownLatch latch = new CountDownLatch(numberOfMessages/2);
-
-        Thread thread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                for (int i = 0; i < numberOfMessages; i++){
-                    try {
-                        byte[] payload = subscriptionProvider.receive(10000);
-                        assertNotNull("Should get a message", payload);
-                        latch.countDown();
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                        break;
-                    }
-
-                }
-            }
-        });
-        thread.start();
-
-        final MQTTClientProvider publishProvider = getMQTTClientProvider();
-        initializeConnection(publishProvider);
-
-        for (int i = 0; i < numberOfMessages; i++){
-            String payload = "Message " + i;
-            if (i == numberOfMessages/2){
-                subscriptionProvider.unsubscribe(topic);
-            }
-            publishProvider.publish(topic,payload.getBytes(),AT_LEAST_ONCE);
-        }
-
-        latch.await(10, TimeUnit.SECONDS);
-        assertEquals(0, latch.getCount());
-        subscriptionProvider.disconnect();
-        publishProvider.disconnect();
-    }
-
-    @Test(timeout=300000)
-    public void testSendAtMostOnceReceiveExactlyOnce() throws Exception {
-        /**
-         * Although subscribing with EXACTLY ONCE, the message gets published
-         * with AT_MOST_ONCE - in MQTT the QoS is always determined by the message
-         * as published - not the wish of the subscriber
-         */
-        addMQTTConnector();
-        brokerService.start();
-
-        final MQTTClientProvider provider = getMQTTClientProvider();
-        initializeConnection(provider);
-        provider.subscribe("foo",EXACTLY_ONCE);
-        for (int i = 0; i < numberOfMessages; i++) {
-            String payload = "Test Message: " + i;
-            provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
-            byte[] message = provider.receive(5000);
-            assertNotNull("Should get a message", message);
-            assertEquals(payload, new String(message));
-        }
-        provider.disconnect();
-    }
-
-    @Test(timeout=300000)
-    public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
-        final MQTTClientProvider provider = getMQTTClientProvider();
-        initializeConnection(provider);
-        provider.subscribe("foo",EXACTLY_ONCE);
-        for (int i = 0; i < numberOfMessages; i++) {
-            String payload = "Test Message: " + i;
-            provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
-            byte[] message = provider.receive(5000);
-            assertNotNull("Should get a message", message);
-            assertEquals(payload, new String(message));
-        }
-        provider.disconnect();
-    }
-
-    @Test(timeout=300000)
-    public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
-        final MQTTClientProvider provider = getMQTTClientProvider();
-        initializeConnection(provider);
-        provider.subscribe("foo",AT_MOST_ONCE);
-        for (int i = 0; i < numberOfMessages; i++) {
-            String payload = "Test Message: " + i;
-            provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
-            byte[] message = provider.receive(5000);
-            assertNotNull("Should get a message", message);
-            assertEquals(payload, new String(message));
-        }
-        provider.disconnect();
-    }
-
-    @Test(timeout=300000)
-    public void testSendAndReceiveAtMostOnce() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
-        final MQTTClientProvider provider = getMQTTClientProvider();
-        initializeConnection(provider);
-        provider.subscribe("foo",AT_MOST_ONCE);
-        for (int i = 0; i < numberOfMessages; i++) {
-            String payload = "Test Message: " + i;
-            provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
-            byte[] message = provider.receive(5000);
-            assertNotNull("Should get a message", message);
-            assertEquals(payload, new String(message));
-        }
-        provider.disconnect();
-    }
-
-    @Test(timeout=300000)
-    public void testSendAndReceiveAtLeastOnce() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-
-        final MQTTClientProvider provider = getMQTTClientProvider();
-        initializeConnection(provider);
-        provider.subscribe("foo",AT_LEAST_ONCE);
-        for (int i = 0; i < numberOfMessages; i++) {
-            String payload = "Test Message: " + i;
-            provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
-            byte[] message = provider.receive(5000);
-            assertNotNull("Should get a message", message);
-            assertEquals(payload, new String(message));
-        }
-        provider.disconnect();
-    }
-
-    @Test(timeout=300000)
-    public void testSendAndReceiveExactlyOnce() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-        final MQTTClientProvider publisher = getMQTTClientProvider();
-        initializeConnection(publisher);
-
-        final MQTTClientProvider subscriber = getMQTTClientProvider();
-        initializeConnection(subscriber);
-
-        subscriber.subscribe("foo",EXACTLY_ONCE);
-        for (int i = 0; i < numberOfMessages; i++) {
-            String payload = "Test Message: " + i;
-            publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
-            byte[] message = subscriber.receive(5000);
-            assertNotNull("Should get a message + ["+ i + "]", message);
-            assertEquals(payload, new String(message));
-        }
-        subscriber.disconnect();
-        publisher.disconnect();
-    }
-
-    @Test(timeout=300000)
-    public void testSendAndReceiveLargeMessages() throws Exception {
-        byte[] payload = new byte[1024 * 32];
-        for (int i = 0; i < payload.length; i++){
-            payload[i] = '2';
-        }
-        addMQTTConnector();
-        brokerService.start();
-
-        final MQTTClientProvider publisher = getMQTTClientProvider();
-        initializeConnection(publisher);
-
-        final MQTTClientProvider subscriber = getMQTTClientProvider();
-        initializeConnection(subscriber);
-
-        subscriber.subscribe("foo",AT_LEAST_ONCE);
-        for (int i = 0; i < 10; i++) {
-            publisher.publish("foo", payload, AT_LEAST_ONCE);
-            byte[] message = subscriber.receive(5000);
-            assertNotNull("Should get a message", message);
-
-            assertArrayEquals(payload, message);
-        }
-        subscriber.disconnect();
-        publisher.disconnect();
-    }
-
-    @Test(timeout=300000)
-    public void testSendMQTTReceiveJMS() throws Exception {
-        addMQTTConnector();
-        TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
-        brokerService.start();
-
-        final MQTTClientProvider provider = getMQTTClientProvider();
-        initializeConnection(provider);
-        final String DESTINATION_NAME = "foo.*";
-
-        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
-        activeMQConnection.start();
-        Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
-        MessageConsumer consumer = s.createConsumer(jmsTopic);
-
-        for (int i = 0; i < numberOfMessages; i++) {
-            String payload = "Test Message: " + i;
-            provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
-            ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
-            assertNotNull("Should get a message", message);
-            ByteSequence bs = message.getContent();
-            assertEquals(payload, new String(bs.data, bs.offset, bs.length));
-        }
-
-        activeMQConnection.close();
-        provider.disconnect();
-    }
-
-    @Test(timeout=300000)
-    public void testSendJMSReceiveMQTT() throws Exception {
-        addMQTTConnector();
-        TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
-        brokerService.start();
-        final MQTTClientProvider provider = getMQTTClientProvider();
-        initializeConnection(provider);
-
-        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
-        activeMQConnection.start();
-        Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        javax.jms.Topic jmsTopic = s.createTopic("foo.far");
-        MessageProducer producer = s.createProducer(jmsTopic);
-
-        provider.subscribe("foo/+",AT_MOST_ONCE);
-        for (int i = 0; i < numberOfMessages; i++) {
-            String payload = "This is Test Message: " + i;
-            TextMessage sendMessage = s.createTextMessage(payload);
-            producer.send(sendMessage);
-            byte[] message = provider.receive(5000);
-            assertNotNull("Should get a message", message);
-
-            assertEquals(payload, new String(message));
-        }
-        provider.disconnect();
-        activeMQConnection.close();
-    }
-
     protected String getProtocolScheme() {
         return "mqtt";
     }
@@ -391,5 +97,4 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport {
         provider.connect("tcp://localhost:"+mqttConnector.getConnectUri().getPort());
     }
 
-    protected abstract MQTTClientProvider getMQTTClientProvider();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc9751ac/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 221abf3..61934b6 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -16,9 +16,14 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.Wait;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
@@ -33,11 +38,306 @@ import org.slf4j.LoggerFactory;
 
 import javax.jms.*;
 
+import static org.junit.Assert.assertArrayEquals;
+
 public class MQTTTest extends AbstractMQTTTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
 
     @Test(timeout=300000)
+    public void testSendAndReceiveMQTT() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+        final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
+        initializeConnection(subscriptionProvider);
+
+
+        subscriptionProvider.subscribe("foo/bah",AT_MOST_ONCE);
+
+        final CountDownLatch latch = new CountDownLatch(numberOfMessages);
+
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                for (int i = 0; i < numberOfMessages; i++){
+                    try {
+                        byte[] payload = subscriptionProvider.receive(10000);
+                        assertNotNull("Should get a message", payload);
+                        latch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        break;
+                    }
+
+                }
+            }
+        });
+        thread.start();
+
+        final MQTTClientProvider publishProvider = getMQTTClientProvider();
+        initializeConnection(publishProvider);
+
+        for (int i = 0; i < numberOfMessages; i++){
+            String payload = "Message " + i;
+            publishProvider.publish("foo/bah",payload.getBytes(),AT_LEAST_ONCE);
+        }
+
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(0, latch.getCount());
+        subscriptionProvider.disconnect();
+        publishProvider.disconnect();
+    }
+
+    @Test(timeout=300000)
+    public void testUnsubscribeMQTT() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+        final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
+        initializeConnection(subscriptionProvider);
+
+        String topic = "foo/bah";
+
+        subscriptionProvider.subscribe(topic,AT_MOST_ONCE);
+
+        final CountDownLatch latch = new CountDownLatch(numberOfMessages/2);
+
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                for (int i = 0; i < numberOfMessages; i++){
+                    try {
+                        byte[] payload = subscriptionProvider.receive(10000);
+                        assertNotNull("Should get a message", payload);
+                        latch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        break;
+                    }
+
+                }
+            }
+        });
+        thread.start();
+
+        final MQTTClientProvider publishProvider = getMQTTClientProvider();
+        initializeConnection(publishProvider);
+
+        for (int i = 0; i < numberOfMessages; i++){
+            String payload = "Message " + i;
+            if (i == numberOfMessages/2){
+                subscriptionProvider.unsubscribe(topic);
+            }
+            publishProvider.publish(topic,payload.getBytes(),AT_LEAST_ONCE);
+        }
+
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(0, latch.getCount());
+        subscriptionProvider.disconnect();
+        publishProvider.disconnect();
+    }
+
+    @Test(timeout=300000)
+    public void testSendAtMostOnceReceiveExactlyOnce() throws Exception {
+        /**
+         * Although subscribing with EXACTLY ONCE, the message gets published
+         * with AT_MOST_ONCE - in MQTT the QoS is always determined by the message
+         * as published - not the wish of the subscriber
+         */
+        addMQTTConnector();
+        brokerService.start();
+
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+        provider.subscribe("foo",EXACTLY_ONCE);
+        for (int i = 0; i < numberOfMessages; i++) {
+            String payload = "Test Message: " + i;
+            provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
+            byte[] message = provider.receive(5000);
+            assertNotNull("Should get a message", message);
+            assertEquals(payload, new String(message));
+        }
+        provider.disconnect();
+    }
+
+    @Test(timeout=300000)
+    public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+        provider.subscribe("foo",EXACTLY_ONCE);
+        for (int i = 0; i < numberOfMessages; i++) {
+            String payload = "Test Message: " + i;
+            provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
+            byte[] message = provider.receive(5000);
+            assertNotNull("Should get a message", message);
+            assertEquals(payload, new String(message));
+        }
+        provider.disconnect();
+    }
+
+    @Test(timeout=300000)
+    public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+        provider.subscribe("foo",AT_MOST_ONCE);
+        for (int i = 0; i < numberOfMessages; i++) {
+            String payload = "Test Message: " + i;
+            provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
+            byte[] message = provider.receive(5000);
+            assertNotNull("Should get a message", message);
+            assertEquals(payload, new String(message));
+        }
+        provider.disconnect();
+    }
+
+    @Test(timeout=300000)
+    public void testSendAndReceiveAtMostOnce() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+        provider.subscribe("foo",AT_MOST_ONCE);
+        for (int i = 0; i < numberOfMessages; i++) {
+            String payload = "Test Message: " + i;
+            provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
+            byte[] message = provider.receive(5000);
+            assertNotNull("Should get a message", message);
+            assertEquals(payload, new String(message));
+        }
+        provider.disconnect();
+    }
+
+    @Test(timeout=300000)
+    public void testSendAndReceiveAtLeastOnce() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+        provider.subscribe("foo",AT_LEAST_ONCE);
+        for (int i = 0; i < numberOfMessages; i++) {
+            String payload = "Test Message: " + i;
+            provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
+            byte[] message = provider.receive(5000);
+            assertNotNull("Should get a message", message);
+            assertEquals(payload, new String(message));
+        }
+        provider.disconnect();
+    }
+
+    @Test(timeout=300000)
+    public void testSendAndReceiveExactlyOnce() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+        final MQTTClientProvider publisher = getMQTTClientProvider();
+        initializeConnection(publisher);
+
+        final MQTTClientProvider subscriber = getMQTTClientProvider();
+        initializeConnection(subscriber);
+
+        subscriber.subscribe("foo",EXACTLY_ONCE);
+        for (int i = 0; i < numberOfMessages; i++) {
+            String payload = "Test Message: " + i;
+            publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
+            byte[] message = subscriber.receive(5000);
+            assertNotNull("Should get a message + ["+ i + "]", message);
+            assertEquals(payload, new String(message));
+        }
+        subscriber.disconnect();
+        publisher.disconnect();
+    }
+
+    @Test(timeout=300000)
+    public void testSendAndReceiveLargeMessages() throws Exception {
+        byte[] payload = new byte[1024 * 32];
+        for (int i = 0; i < payload.length; i++){
+            payload[i] = '2';
+        }
+        addMQTTConnector();
+        brokerService.start();
+
+        final MQTTClientProvider publisher = getMQTTClientProvider();
+        initializeConnection(publisher);
+
+        final MQTTClientProvider subscriber = getMQTTClientProvider();
+        initializeConnection(subscriber);
+
+        subscriber.subscribe("foo",AT_LEAST_ONCE);
+        for (int i = 0; i < 10; i++) {
+            publisher.publish("foo", payload, AT_LEAST_ONCE);
+            byte[] message = subscriber.receive(5000);
+            assertNotNull("Should get a message", message);
+
+            assertArrayEquals(payload, message);
+        }
+        subscriber.disconnect();
+        publisher.disconnect();
+    }
+
+    @Test(timeout=300000)
+    public void testSendMQTTReceiveJMS() throws Exception {
+        addMQTTConnector();
+        TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
+        brokerService.start();
+
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+        final String DESTINATION_NAME = "foo.*";
+
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
+        activeMQConnection.start();
+        Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
+        MessageConsumer consumer = s.createConsumer(jmsTopic);
+
+        for (int i = 0; i < numberOfMessages; i++) {
+            String payload = "Test Message: " + i;
+            provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
+            ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
+            assertNotNull("Should get a message", message);
+            ByteSequence bs = message.getContent();
+            assertEquals(payload, new String(bs.data, bs.offset, bs.length));
+        }
+
+        activeMQConnection.close();
+        provider.disconnect();
+    }
+
+    @Test(timeout=300000)
+    public void testSendJMSReceiveMQTT() throws Exception {
+        addMQTTConnector();
+        TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
+        brokerService.start();
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
+        activeMQConnection.start();
+        Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        javax.jms.Topic jmsTopic = s.createTopic("foo.far");
+        MessageProducer producer = s.createProducer(jmsTopic);
+
+        provider.subscribe("foo/+",AT_MOST_ONCE);
+        for (int i = 0; i < numberOfMessages; i++) {
+            String payload = "This is Test Message: " + i;
+            TextMessage sendMessage = s.createTextMessage(payload);
+            producer.send(sendMessage);
+            byte[] message = provider.receive(5000);
+            assertNotNull("Should get a message", message);
+
+            assertEquals(payload, new String(message));
+        }
+        provider.disconnect();
+        activeMQConnection.close();
+    }
+
+    @Test(timeout=300000)
     public void testPingKeepsInactivityMonitorAlive() throws Exception {
         addMQTTConnector();
         brokerService.start();
@@ -287,8 +587,6 @@ public class MQTTTest extends AbstractMQTTTest {
         return "mqtt";
     }
 
-
-    @Override
     protected MQTTClientProvider getMQTTClientProvider() {
         return new FuseMQQTTClientProvider();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc9751ac/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
new file mode 100644
index 0000000..5251567
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.TransportConnector;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+public class PahoMQTTTest extends AbstractMQTTTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class);
+
+
+    @Test(timeout=300000)
+    public void testSendAndReceiveMQTT() throws Exception {
+        addMQTTConnector();
+        TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
+        brokerService.start();
+
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
+        activeMQConnection.start();
+        Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = s.createConsumer(s.createTopic("test"));
+
+        MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), "clientid");
+        client.connect();
+        client.publish("test", "hello".getBytes(), 1, false);
+
+        Message msg = consumer.receive(100 * 5);
+        assertNotNull(msg);
+
+        client.disconnect();
+        client.close();
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc9751ac/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b4cc16d..bd095f5 100755
--- a/pom.xml
+++ b/pom.xml
@@ -97,6 +97,7 @@
     <org-apache-derby-version>10.10.1.1</org-apache-derby-version>
     <org.osgi.core-version>4.3.1</org.osgi.core-version>
     <p2psockets-version>1.1.2</p2psockets-version>
+    <paho-version>0.4.0</paho-version>
     <linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
     <zookeeper-version>3.4.5</zookeeper-version>
     <qpid-proton-version>0.5</qpid-proton-version>
@@ -366,6 +367,11 @@
         <scope>test</scope>
       </dependency>
       <dependency>
+        <groupId>org.eclipse.paho</groupId>
+        <artifactId>mqtt-client</artifactId>
+        <version>${paho-version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.activemq</groupId>
         <artifactId>activemq-jaas</artifactId>
         <version>${project.version}</version>
@@ -1591,12 +1597,19 @@
       <releases><enabled>true</enabled></releases>
       <snapshots><enabled>false</enabled></snapshots>
     </repository>
+    <!-- for the paho dependency -->
+    <repository>
+      <id>eclipse.m2</id>
+      <url>https://repo.eclipse.org/content/groups/releases/</url>
+      <releases><enabled>true</enabled></releases>
+      <snapshots><enabled>false</enabled></snapshots>
+    </repository>
     <repository>
         <id>com.fusesource.m2.snapshot</id>
         <url>http://repo.fusesource.com/nexus/content/repositories/snapshots/</url>
         <releases><enabled>false</enabled></releases>
         <snapshots><enabled>true</enabled></snapshots>
-      </repository>
+    </repository>
 
   </repositories>