You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/01/08 21:33:17 UTC

svn commit: r1430496 - in /activemq/trunk/activemq-mqtt/src: main/java/org/apache/activemq/transport/mqtt/ test/java/org/apache/activemq/transport/mqtt/

Author: rajdavies
Date: Tue Jan  8 20:33:16 2013
New Revision: 1430496

URL: http://svn.apache.org/viewvc?rev=1430496&view=rev
Log:
Refactored to make it easier to test with multiple MQTT client providers

Added:
    activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java
      - copied, changed from r1429809, activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
    activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java   (with props)
    activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java   (with props)
Modified:
    activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
    activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
    activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
    activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java

Modified: activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java?rev=1430496&r1=1430495&r2=1430496&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java (original)
+++ activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java Tue Jan  8 20:33:16 2013
@@ -305,7 +305,6 @@ class MQTTProtocolConverter {
             //by default subscribers are persistent
             consumerInfo.setSubscriptionName(connect.clientId().toString());
         }
-
         MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
 
         subscriptionsByConsumerId.put(id, mqttSubscription);
@@ -364,13 +363,13 @@ class MQTTProtocolConverter {
             if (sub != null) {
                 MessageAck ack = sub.createMessageAck(md);
                 PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
-                if (ack != null && sub.expectAck()) {
+                if (ack != null && sub.expectAck(publish)) {
                     synchronized (consumerAcks) {
                         consumerAcks.put(publish.messageId(), ack);
                     }
                 }
                 getMQTTTransport().sendToMQTT(publish.encode());
-                if (ack != null && !sub.expectAck()) {
+                if (ack != null && !sub.expectAck(publish)) {
                     getMQTTTransport().sendToActiveMQ(ack);
                 }
             }
@@ -683,11 +682,10 @@ class MQTTProtocolConverter {
     /**
      * Set the default keep alive time (in milliseconds) that would be used if configured on server side
      * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
-     *
-     * @param defaultKeepAlive the keepAlive in milliseconds
+     * @param keepAlive the keepAlive in milliseconds
      */
-    public void setDefaultKeepAlive(long defaultKeepAlive) {
-        this.defaultKeepAlive = defaultKeepAlive;
+    public void setDefaultKeepAlive(long keepAlive) {
+        this.defaultKeepAlive = keepAlive;
     }
 
     public int getActiveMQSubscriptionPrefetch() {

Modified: activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java?rev=1430496&r1=1430495&r2=1430496&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java (original)
+++ activemq/trunk/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java Tue Jan  8 20:33:16 2013
@@ -56,16 +56,12 @@ class MQTTSubscription {
         return publish;
     }
 
-    public boolean expectAck() {
-        return qos != QoS.AT_MOST_ONCE;
-    }
-
-    public void setDestination(ActiveMQDestination destination) {
-        this.destination = destination;
-    }
-
-    public ActiveMQDestination getDestination() {
-        return destination;
+    public boolean expectAck(PUBLISH publish) {
+        QoS publishQoS = publish.qos();
+        if (publishQoS.compareTo(this.qos) > 0){
+            publishQoS = this.qos;
+        }
+        return !publishQoS.equals(QoS.AT_MOST_ONCE);
     }
 
     public ConsumerInfo getConsumerInfo() {

Copied: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java (from r1429809, activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java?p2=activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java&p1=activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java&r1=1429809&r2=1430496&rev=1430496&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java (original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java Tue Jan  8 20:33:16 2013
@@ -18,8 +18,6 @@ package org.apache.activemq.transport.mq
 
 import java.io.File;
 import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.security.ProtectionDomain;
 import java.util.LinkedList;
 import java.util.concurrent.CountDownLatch;
@@ -29,42 +27,31 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import javax.net.SocketFactory;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.AutoFailTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.Wait;
-import org.fusesource.hawtbuf.UTF8Buffer;
-import org.fusesource.mqtt.client.BlockingConnection;
-import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.Message;
-import org.fusesource.mqtt.client.QoS;
-import org.fusesource.mqtt.client.Topic;
-import org.fusesource.mqtt.codec.CONNECT;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
 import static org.junit.Assert.*;
 
 
-public class MQTTTest {
+public abstract class AbstractMQTTTest {
+    protected TransportConnector mqttConnector;
+
+    public static final int AT_MOST_ONCE =0;
+    public static final int AT_LEAST_ONCE = 1;
+    public static final int EXACTLY_ONCE =2;
 
     public File basedir() throws IOException {
         ProtectionDomain protectionDomain = getClass().getProtectionDomain();
         return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
     }
 
-    protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
     protected BrokerService brokerService;
     protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
     protected int numberOfMessages;
@@ -93,21 +80,20 @@ public class MQTTTest {
     public void testSendAndReceiveMQTT() throws Exception {
         addMQTTConnector();
         brokerService.start();
-        MQTT mqtt = createMQTTConnection();
-        final BlockingConnection subscribeConnection = mqtt.blockingConnection();
-        subscribeConnection.connect();
-        Topic topic = new Topic("foo/bah",QoS.AT_MOST_ONCE);
-        Topic[] topics = {topic};
-        subscribeConnection.subscribe(topics);
+        final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
+        initializeConnection(subscriptionProvider);
+
+
+        subscriptionProvider.subscribe("foo/bah",AT_MOST_ONCE);
+
         final CountDownLatch latch = new CountDownLatch(numberOfMessages);
 
         Thread thread = new Thread(new Runnable() {
             public void run() {
                 for (int i = 0; i < numberOfMessages; i++){
                     try {
-                        Message message = subscribeConnection.receive(5, TimeUnit.SECONDS);
-                        assertNotNull("Should get a message", message);
-                        message.ack();
+                        byte[] payload = subscriptionProvider.receive(10000);
+                        assertNotNull("Should get a message", payload);
                         latch.countDown();
                     } catch (Exception e) {
                         e.printStackTrace();
@@ -119,92 +105,137 @@ public class MQTTTest {
         });
         thread.start();
 
-        BlockingConnection publisherConnection = mqtt.blockingConnection();
-        publisherConnection.connect();
+        final MQTTClientProvider publishProvider = getMQTTClientProvider();
+        initializeConnection(publishProvider);
+
         for (int i = 0; i < numberOfMessages; i++){
             String payload = "Message " + i;
-            publisherConnection.publish(topic.name().toString(),payload.getBytes(),QoS.AT_LEAST_ONCE,false);
+            publishProvider.publish("foo/bah",payload.getBytes(),AT_LEAST_ONCE);
         }
 
         latch.await(10, TimeUnit.SECONDS);
         assertEquals(0, latch.getCount());
-        subscribeConnection.disconnect();
-        publisherConnection.disconnect();
+        subscriptionProvider.disconnect();
+        publishProvider.disconnect();
     }
 
     @Test
-    public void testSendAndReceiveAtMostOnce() throws Exception {
+    public void testSendAtMostOnceReceiveExactlyOnce() throws Exception {
+        /**
+         * Although subscribing with EXACTLY ONCE, the message gets published
+         * with AT_MOST_ONCE - in MQTT the QoS is always determined by the message
+         * as published - not the wish of the subscriber
+         */
         addMQTTConnector();
         brokerService.start();
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setKeepAlive(Short.MAX_VALUE);
-        BlockingConnection connection = mqtt.blockingConnection();
 
-        connection.connect();
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+        provider.subscribe("foo",EXACTLY_ONCE);
+        for (int i = 0; i < numberOfMessages; i++) {
+            String payload = "Test Message: " + i;
+            provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
+            byte[] message = provider.receive(5000);
+            assertNotNull("Should get a message", message);
+            assertEquals(payload, new String(message));
+        }
+        provider.disconnect();
+    }
 
-        Topic[] topics = {new Topic(utf8("foo"), QoS.AT_MOST_ONCE)};
-        connection.subscribe(topics);
+    @Test
+    public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+        provider.subscribe("foo",EXACTLY_ONCE);
         for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Test Message: " + i;
-            connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false);
-            Message message = connection.receive(5, TimeUnit.SECONDS);
+            provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
+            byte[] message = provider.receive(5000);
             assertNotNull("Should get a message", message);
-            assertEquals(payload, new String(message.getPayload()));
+            assertEquals(payload, new String(message));
         }
-        connection.disconnect();
+        provider.disconnect();
     }
 
     @Test
-    public void testSendAndReceiveAtLeastOnce() throws Exception {
+    public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception {
         addMQTTConnector();
         brokerService.start();
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setKeepAlive(Short.MAX_VALUE);
-        BlockingConnection connection = mqtt.blockingConnection();
 
-        connection.connect();
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+        provider.subscribe("foo",AT_MOST_ONCE);
+        for (int i = 0; i < numberOfMessages; i++) {
+            String payload = "Test Message: " + i;
+            provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
+            byte[] message = provider.receive(5000);
+            assertNotNull("Should get a message", message);
+            assertEquals(payload, new String(message));
+        }
+        provider.disconnect();
+    }
 
-        Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
-        connection.subscribe(topics);
+
+    @Test
+    public void testSendAndReceiveAtMostOnce() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+        provider.subscribe("foo",AT_MOST_ONCE);
         for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Test Message: " + i;
-            connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
-            Message message = connection.receive(5, TimeUnit.SECONDS);
+            provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
+            byte[] message = provider.receive(5000);
             assertNotNull("Should get a message", message);
-            message.ack();
-            assertEquals(payload, new String(message.getPayload()));
+            assertEquals(payload, new String(message));
         }
-        connection.disconnect();
+        provider.disconnect();
     }
 
     @Test
-    public void testSendAndReceiveExactlyOnce() throws Exception {
+    public void testSendAndReceiveAtLeastOnce() throws Exception {
         addMQTTConnector();
         brokerService.start();
-        MQTT publisher = createMQTTConnection();
-        BlockingConnection pubConnection = publisher.blockingConnection();
 
-        pubConnection.connect();
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+        provider.subscribe("foo",AT_LEAST_ONCE);
+        for (int i = 0; i < numberOfMessages; i++) {
+            String payload = "Test Message: " + i;
+            provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
+            byte[] message = provider.receive(5000);
+            assertNotNull("Should get a message", message);
+            assertEquals(payload, new String(message));
+        }
+        provider.disconnect();
+    }
+
+    @Test
+    public void testSendAndReceiveExactlyOnce() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+        final MQTTClientProvider publisher = getMQTTClientProvider();
+        initializeConnection(publisher);
 
-        MQTT subscriber = createMQTTConnection();
-        BlockingConnection subConnection = subscriber.blockingConnection();
+        final MQTTClientProvider subscriber = getMQTTClientProvider();
+        initializeConnection(subscriber);
 
-        subConnection.connect();
 
-        Topic[] topics = {new Topic(utf8("foo"), QoS.EXACTLY_ONCE)};
-        subConnection.subscribe(topics);
+            subscriber.subscribe("foo",EXACTLY_ONCE);
         for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Test Message: " + i;
-            pubConnection.publish("foo", payload.getBytes(), QoS.EXACTLY_ONCE, false);
-            Message message = subConnection.receive(5, TimeUnit.SECONDS);
+            publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
+            byte[] message = subscriber.receive(5000);
             assertNotNull("Should get a message", message);
-            LOG.debug(payload);
-            message.ack();
-            //System.err.println("Sent " + payload + " GOT " + new String(message.getPayload()));
-            assertEquals(payload, new String(message.getPayload()));
+            assertEquals(payload, new String(message));
         }
-        subConnection.disconnect();
-        pubConnection.disconnect();
+        subscriber.disconnect();
+        publisher.disconnect();
     }
 
     @Test
@@ -216,27 +247,22 @@ public class MQTTTest {
         addMQTTConnector();
         brokerService.start();
 
-        MQTT publisher = createMQTTConnection();
-        BlockingConnection pubConnection = publisher.blockingConnection();
-
-        pubConnection.connect();
+        final MQTTClientProvider publisher = getMQTTClientProvider();
+        initializeConnection(publisher);
 
-        MQTT subscriber = createMQTTConnection();
-        BlockingConnection subConnection = subscriber.blockingConnection();
+        final MQTTClientProvider subscriber = getMQTTClientProvider();
+        initializeConnection(subscriber);
 
-        subConnection.connect();
-
-        Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
-        subConnection.subscribe(topics);
+        subscriber.subscribe("foo",AT_LEAST_ONCE);
         for (int i = 0; i < 10; i++) {
-            pubConnection.publish("foo", payload, QoS.AT_LEAST_ONCE, false);
-            Message message = subConnection.receive(5, TimeUnit.SECONDS);
+            publisher.publish("foo", payload, AT_LEAST_ONCE);
+            byte[] message = subscriber.receive(5000);
             assertNotNull("Should get a message", message);
-            message.ack();
-            assertArrayEquals(payload, message.getPayload());
+
+            assertArrayEquals(payload, message);
         }
-        subConnection.disconnect();
-        pubConnection.disconnect();
+        subscriber.disconnect();
+        publisher.disconnect();
     }
 
 
@@ -245,10 +271,11 @@ public class MQTTTest {
         addMQTTConnector();
         TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
         brokerService.start();
-        MQTT mqtt = createMQTTConnection();
-        BlockingConnection connection = mqtt.blockingConnection();
+
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
         final String DESTINATION_NAME = "foo.*";
-        connection.connect();
+
 
         ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
         activeMQConnection.start();
@@ -258,7 +285,7 @@ public class MQTTTest {
 
         for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Test Message: " + i;
-            connection.publish("foo/bah", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
+            provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
             ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
             assertNotNull("Should get a message", message);
             ByteSequence bs = message.getContent();
@@ -267,7 +294,7 @@ public class MQTTTest {
 
 
         activeMQConnection.close();
-        connection.disconnect();
+        provider.disconnect();
     }
 
     @Test
@@ -275,10 +302,8 @@ public class MQTTTest {
         addMQTTConnector();
         TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
         brokerService.start();
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setKeepAlive(Short.MAX_VALUE);
-        BlockingConnection connection = mqtt.blockingConnection();
-        connection.connect();
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
 
         ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
         activeMQConnection.start();
@@ -286,141 +311,20 @@ public class MQTTTest {
         javax.jms.Topic jmsTopic = s.createTopic("foo.far");
         MessageProducer producer = s.createProducer(jmsTopic);
 
-        Topic[] topics = {new Topic(utf8("foo/+"), QoS.AT_MOST_ONCE)};
-        connection.subscribe(topics);
+        provider.subscribe("foo/+",AT_MOST_ONCE);
         for (int i = 0; i < numberOfMessages; i++) {
             String payload = "This is Test Message: " + i;
             TextMessage sendMessage = s.createTextMessage(payload);
             producer.send(sendMessage);
-            Message message = connection.receive(5, TimeUnit.SECONDS);
+            byte[] message = provider.receive(5000);
             assertNotNull("Should get a message", message);
-            message.ack();
-            assertEquals(payload, new String(message.getPayload()));
-        }
-        connection.disconnect();
-    }
-
-    public void testInactivityTimeoutDisconnectsClient() throws Exception{
-
-        addMQTTConnector();
-        brokerService.start();
-
-        // manually need to create the client so we don't send keep alive (PINGREQ) frames to keep the conn
-        // from timing out
-        Transport clientTransport = createManualMQTTClient();
-        clientTransport.start();
-        CONNECT connectFrame = new CONNECT().clientId(new UTF8Buffer("testClient")).keepAlive((short)2);
-        clientTransport.oneway(connectFrame.encode());
-
-        // wait for broker to register the MQTT connection
-        assertTrue("MQTT Connection should be registered.", Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return mqttConnector.getConnections().size() > 0;
-            }
-        }));
-
-        // wait for broker to time out the MQTT connection due to inactivity
-        assertTrue("MQTT Connection should be timed out.", Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return mqttConnector.getConnections().size() == 0;
-            }
-        }));
-
-        assertTrue("Should have seen client transport exception", exceptions.size() > 0);
-
-        clientTransport.stop();
-    }
-
-    private Transport createManualMQTTClient() throws IOException, URISyntaxException {
-        Transport clientTransport = new TcpTransport(new MQTTWireFormat(), SocketFactory.getDefault(),
-                new URI("tcp://localhost:"+mqttConnector.getConnectUri().getPort()), null);
-        clientTransport.setTransportListener(new TransportListener() {
-            @Override
-            public void onCommand(Object command) {
-            }
-
-            @Override
-            public void onException(IOException error) {
-                exceptions.add(error);
-            }
-
-            @Override
-            public void transportInterupted() {
-            }
-
-            @Override
-            public void transportResumed() {
-            }
-        });
-        return clientTransport;
-    }
-
-    @Test
-    public void testPingKeepsInactivityMonitorAlive() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setKeepAlive((short)2);
-        final BlockingConnection connection = mqtt.blockingConnection();
-        connection.connect();
-
-        assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return connection.isConnected();
-            }
-        }));
-
-        connection.disconnect();
-    }
-
-    @Test
-    public void testTurnOffInactivityMonitor()throws Exception{
-        addMQTTConnector("?transport.useInactivityMonitor=false");
-        brokerService.start();
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setKeepAlive((short)2);
-        final BlockingConnection connection = mqtt.blockingConnection();
-        connection.connect();
-
-        assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return connection.isConnected();
-            }
-        }));
-
-        connection.disconnect();
-    }
 
-
-    @Test
-    public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
-        // default keep alive in milliseconds
-        addMQTTConnector("?transport.defaultKeepAlive=2000");
-        brokerService.start();
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setKeepAlive((short)0);
-        final BlockingConnection connection = mqtt.blockingConnection();
-        connection.connect();
-
-        assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return connection.isConnected();
-            }
-        }));
+            assertEquals(payload, new String(message));
+        }
+        provider.disconnect();
+        activeMQConnection.close();
     }
 
-    TransportConnector mqttConnector;
-
     protected String getProtocolScheme() {
         return "mqtt";
     }
@@ -433,12 +337,9 @@ public class MQTTTest {
         mqttConnector= brokerService.addConnector(getProtocolScheme()+"://localhost:0" + config);
     }
 
-    protected MQTT createMQTTConnection() throws Exception {
-        MQTT mqtt = new MQTT();
-        mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());
-        // shut off connect retry
-        mqtt.setConnectAttemptsMax(0);
-        mqtt.setReconnectAttemptsMax(0);
-        return mqtt;
+    protected void initializeConnection(MQTTClientProvider provider) throws Exception {
+        provider.connect("tcp://localhost:"+mqttConnector.getConnectUri().getPort());
     }
+
+    protected abstract MQTTClientProvider getMQTTClientProvider();
 }
\ No newline at end of file

Added: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java?rev=1430496&view=auto
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java (added)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java Tue Jan  8 20:33:16 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
+
+class FuseMQQTTClientProvider implements MQTTClientProvider {
+    private final MQTT mqtt = new MQTT();
+    private BlockingConnection connection;
+    @Override
+    public void connect(String host) throws Exception {
+        mqtt.setHost(host);
+        // shut off connect retry
+        mqtt.setConnectAttemptsMax(0);
+        mqtt.setReconnectAttemptsMax(0);
+        connection = mqtt.blockingConnection();
+        connection.connect();
+    }
+
+    @Override
+    public void disconnect() throws Exception {
+        if (this.connection != null){
+            this.connection.disconnect();
+        }
+    }
+
+    @Override
+    public void publish(String topic, byte[] payload, int qos) throws Exception {
+        connection.publish(topic,payload, QoS.values()[qos],false);
+    }
+
+    @Override
+    public void subscribe(String topic, int qos) throws Exception {
+        Topic[] topics = {new Topic(utf8(topic), QoS.values()[qos])};
+        connection.subscribe(topics);
+    }
+
+    @Override
+    public byte[] receive(int timeout) throws Exception {
+        byte[] result = null;
+        Message message = connection.receive(timeout, TimeUnit.MILLISECONDS);
+        if (message != null){
+            result = message.getPayload();
+            message.ack();
+        }
+        return result;
+    }
+
+    @Override
+    public void setSslContext(SSLContext sslContext) {
+        mqtt.setSslContext(sslContext);
+    }
+}

Propchange: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java?rev=1430496&view=auto
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java (added)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java Tue Jan  8 20:33:16 2013
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+public interface  MQTTClientProvider {
+    void connect(String host) throws Exception;
+    void disconnect() throws Exception;
+    void publish(String topic,byte[] payload,int qos) throws Exception;
+    void subscribe(String topic,int qos) throws Exception;
+    byte[] receive(int timeout) throws Exception;
+    void setSslContext(javax.net.ssl.SSLContext sslContext);
+
+}

Propchange: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java?rev=1430496&r1=1430495&r2=1430496&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java (original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java Tue Jan  8 20:33:16 2013
@@ -24,9 +24,7 @@ import javax.net.ssl.KeyManager;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.X509TrustManager;
-import org.apache.activemq.broker.BrokerService;
 import org.fusesource.mqtt.client.MQTT;
-import org.junit.Ignore;
 
 public class MQTTSSLTest extends MQTTTest {
 
@@ -55,6 +53,15 @@ public class MQTTSSLTest extends MQTTTes
         return mqtt;
     }
 
+    protected void initializeConnection(MQTTClientProvider provider) throws Exception {
+        SSLContext ctx = SSLContext.getInstance("TLS");
+        ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
+        provider.setSslContext(ctx);
+        provider.connect("ssl://localhost:"+mqttConnector.getConnectUri().getPort());
+    }
+
+
+
     static class DefaultTrustManager implements X509TrustManager {
 
         public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {

Modified: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1430496&r1=1430495&r2=1430496&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java (original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java Tue Jan  8 20:33:16 2013
@@ -16,348 +16,14 @@
  */
 package org.apache.activemq.transport.mqtt;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.ProtectionDomain;
-import java.util.LinkedList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.net.SocketFactory;
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.AutoFailTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.Wait;
-import org.fusesource.hawtbuf.UTF8Buffer;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
-import org.fusesource.mqtt.client.Message;
-import org.fusesource.mqtt.client.QoS;
-import org.fusesource.mqtt.client.Topic;
-import org.fusesource.mqtt.codec.CONNECT;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
 
 
-public class MQTTTest {
-
-    public File basedir() throws IOException {
-        ProtectionDomain protectionDomain = getClass().getProtectionDomain();
-        return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
-    }
-
-    protected static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
-    protected BrokerService brokerService;
-    protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
-    protected int numberOfMessages;
-    AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
-
-    @Before
-    public void startBroker() throws Exception {
-        autoFailTestSupport.startAutoFailThread();
-        exceptions.clear();
-        brokerService = new BrokerService();
-        brokerService.setPersistent(false);
-        brokerService.setAdvisorySupport(false);
-        brokerService.setUseJmx(false);
-        this.numberOfMessages = 3000;
-    }
-
-    @After
-    public void stopBroker() throws Exception {
-        if (brokerService != null) {
-            brokerService.stop();
-        }
-        autoFailTestSupport.stopAutoFailThread();
-    }
-
-    @Test
-    public void testSendAndReceiveMQTT() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-        MQTT mqtt = createMQTTConnection();
-        final BlockingConnection subscribeConnection = mqtt.blockingConnection();
-        subscribeConnection.connect();
-        Topic topic = new Topic("foo/bah",QoS.AT_MOST_ONCE);
-        Topic[] topics = {topic};
-        subscribeConnection.subscribe(topics);
-        final CountDownLatch latch = new CountDownLatch(numberOfMessages);
-
-        Thread thread = new Thread(new Runnable() {
-            public void run() {
-                for (int i = 0; i < numberOfMessages; i++){
-                    try {
-                        Message message = subscribeConnection.receive(5, TimeUnit.SECONDS);
-                        assertNotNull("Should get a message", message);
-                        message.ack();
-                        latch.countDown();
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                        break;
-                    }
-
-                }
-            }
-        });
-        thread.start();
-
-        BlockingConnection publisherConnection = mqtt.blockingConnection();
-        publisherConnection.connect();
-        for (int i = 0; i < numberOfMessages; i++){
-            String payload = "Message " + i;
-            publisherConnection.publish(topic.name().toString(),payload.getBytes(),QoS.AT_LEAST_ONCE,false);
-        }
-
-        latch.await(10, TimeUnit.SECONDS);
-        assertEquals(0, latch.getCount());
-        subscribeConnection.disconnect();
-        publisherConnection.disconnect();
-    }
-
-    @Test
-    public void testSendAndReceiveAtMostOnce() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setKeepAlive(Short.MAX_VALUE);
-        BlockingConnection connection = mqtt.blockingConnection();
-
-        connection.connect();
-
-        Topic[] topics = {new Topic(utf8("foo"), QoS.AT_MOST_ONCE)};
-        connection.subscribe(topics);
-        for (int i = 0; i < numberOfMessages; i++) {
-            String payload = "Test Message: " + i;
-            connection.publish("foo", payload.getBytes(), QoS.AT_MOST_ONCE, false);
-            Message message = connection.receive(5, TimeUnit.SECONDS);
-            assertNotNull("Should get a message", message);
-            assertEquals(payload, new String(message.getPayload()));
-        }
-        connection.disconnect();
-    }
-
-    @Test
-    public void testSendAndReceiveAtLeastOnce() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setKeepAlive(Short.MAX_VALUE);
-        BlockingConnection connection = mqtt.blockingConnection();
-
-        connection.connect();
-
-        Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
-        connection.subscribe(topics);
-        for (int i = 0; i < numberOfMessages; i++) {
-            String payload = "Test Message: " + i;
-            connection.publish("foo", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
-            Message message = connection.receive(5, TimeUnit.SECONDS);
-            assertNotNull("Should get a message", message);
-            message.ack();
-            assertEquals(payload, new String(message.getPayload()));
-        }
-        connection.disconnect();
-    }
-
-    @Test
-    public void testSendAndReceiveExactlyOnce() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-        MQTT publisher = createMQTTConnection();
-        BlockingConnection pubConnection = publisher.blockingConnection();
-
-        pubConnection.connect();
-
-        MQTT subscriber = createMQTTConnection();
-        BlockingConnection subConnection = subscriber.blockingConnection();
-
-        subConnection.connect();
-
-        Topic[] topics = {new Topic(utf8("foo"), QoS.EXACTLY_ONCE)};
-        subConnection.subscribe(topics);
-        for (int i = 0; i < numberOfMessages; i++) {
-            String payload = "Test Message: " + i;
-            pubConnection.publish("foo", payload.getBytes(), QoS.EXACTLY_ONCE, false);
-            Message message = subConnection.receive(5, TimeUnit.SECONDS);
-            assertNotNull("Should get a message", message);
-            LOG.debug(payload);
-            message.ack();
-            //System.err.println("Sent " + payload + " GOT " + new String(message.getPayload()));
-            assertEquals(payload, new String(message.getPayload()));
-        }
-        subConnection.disconnect();
-        pubConnection.disconnect();
-    }
-
-    @Test
-    public void testSendAndReceiveLargeMessages() throws Exception {
-        byte[] payload = new byte[1024 * 32];
-        for (int i = 0; i < payload.length; i++){
-            payload[i] = '2';
-        }
-        addMQTTConnector();
-        brokerService.start();
-
-        MQTT publisher = createMQTTConnection();
-        BlockingConnection pubConnection = publisher.blockingConnection();
-
-        pubConnection.connect();
-
-        MQTT subscriber = createMQTTConnection();
-        BlockingConnection subConnection = subscriber.blockingConnection();
-
-        subConnection.connect();
-
-        Topic[] topics = {new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)};
-        subConnection.subscribe(topics);
-        for (int i = 0; i < 10; i++) {
-            pubConnection.publish("foo", payload, QoS.AT_LEAST_ONCE, false);
-            Message message = subConnection.receive(5, TimeUnit.SECONDS);
-            assertNotNull("Should get a message", message);
-            message.ack();
-            assertArrayEquals(payload, message.getPayload());
-        }
-        subConnection.disconnect();
-        pubConnection.disconnect();
-    }
-
-
-    @Test
-    public void testSendMQTTReceiveJMS() throws Exception {
-        addMQTTConnector();
-        TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
-        brokerService.start();
-        MQTT mqtt = createMQTTConnection();
-        BlockingConnection connection = mqtt.blockingConnection();
-        final String DESTINATION_NAME = "foo.*";
-        connection.connect();
-
-        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
-        activeMQConnection.start();
-        Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME);
-        MessageConsumer consumer = s.createConsumer(jmsTopic);
-
-        for (int i = 0; i < numberOfMessages; i++) {
-            String payload = "Test Message: " + i;
-            connection.publish("foo/bah", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
-            ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
-            assertNotNull("Should get a message", message);
-            ByteSequence bs = message.getContent();
-            assertEquals(payload, new String(bs.data, bs.offset, bs.length));
-        }
-
-
-        activeMQConnection.close();
-        connection.disconnect();
-    }
-
-    @Test
-    public void testSendJMSReceiveMQTT() throws Exception {
-        addMQTTConnector();
-        TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
-        brokerService.start();
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setKeepAlive(Short.MAX_VALUE);
-        BlockingConnection connection = mqtt.blockingConnection();
-        connection.connect();
-
-        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection();
-        activeMQConnection.start();
-        Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        javax.jms.Topic jmsTopic = s.createTopic("foo.far");
-        MessageProducer producer = s.createProducer(jmsTopic);
-
-        Topic[] topics = {new Topic(utf8("foo/+"), QoS.AT_MOST_ONCE)};
-        connection.subscribe(topics);
-        for (int i = 0; i < numberOfMessages; i++) {
-            String payload = "This is Test Message: " + i;
-            TextMessage sendMessage = s.createTextMessage(payload);
-            producer.send(sendMessage);
-            Message message = connection.receive(5, TimeUnit.SECONDS);
-            assertNotNull("Should get a message", message);
-            message.ack();
-            assertEquals(payload, new String(message.getPayload()));
-        }
-        connection.disconnect();
-    }
-
-    public void testInactivityTimeoutDisconnectsClient() throws Exception{
-
-        addMQTTConnector();
-        brokerService.start();
-
-        // manually need to create the client so we don't send keep alive (PINGREQ) frames to keep the conn
-        // from timing out
-        Transport clientTransport = createManualMQTTClient();
-        clientTransport.start();
-        CONNECT connectFrame = new CONNECT().clientId(new UTF8Buffer("testClient")).keepAlive((short)2);
-        clientTransport.oneway(connectFrame.encode());
-
-        // wait for broker to register the MQTT connection
-        assertTrue("MQTT Connection should be registered.", Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return mqttConnector.getConnections().size() > 0;
-            }
-        }));
-
-        // wait for broker to time out the MQTT connection due to inactivity
-        assertTrue("MQTT Connection should be timed out.", Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return mqttConnector.getConnections().size() == 0;
-            }
-        }));
-
-        assertTrue("Should have seen client transport exception", exceptions.size() > 0);
-
-        clientTransport.stop();
-    }
-
-    private Transport createManualMQTTClient() throws IOException, URISyntaxException {
-        Transport clientTransport = new TcpTransport(new MQTTWireFormat(), SocketFactory.getDefault(),
-                new URI("tcp://localhost:"+mqttConnector.getConnectUri().getPort()), null);
-        clientTransport.setTransportListener(new TransportListener() {
-            @Override
-            public void onCommand(Object command) {
-            }
-
-            @Override
-            public void onException(IOException error) {
-                exceptions.add(error);
-            }
-
-            @Override
-            public void transportInterupted() {
-            }
-
-            @Override
-            public void transportResumed() {
-            }
-        });
-        return clientTransport;
-    }
+public class MQTTTest extends AbstractMQTTTest {
 
     @Test
     public void testPingKeepsInactivityMonitorAlive() throws Exception {
@@ -419,7 +85,6 @@ public class MQTTTest {
         }));
     }
 
-    TransportConnector mqttConnector;
 
     protected String getProtocolScheme() {
         return "mqtt";
@@ -433,6 +98,11 @@ public class MQTTTest {
         mqttConnector= brokerService.addConnector(getProtocolScheme()+"://localhost:0" + config);
     }
 
+    @Override
+    protected MQTTClientProvider getMQTTClientProvider() {
+        return new FuseMQQTTClientProvider();
+    }
+
     protected MQTT createMQTTConnection() throws Exception {
         MQTT mqtt = new MQTT();
         mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());