You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/05/26 11:07:31 UTC

[04/13] git commit: Fixed AMQ-5160, polished MQTT tests

Fixed AMQ-5160, polished MQTT tests


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0a39782b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0a39782b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0a39782b

Branch: refs/heads/trunk
Commit: 0a39782bf5d95fc0ae6d54a7fa1469b230621358
Parents: 88c6ee9
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Tue May 13 00:30:11 2014 -0700
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon May 26 11:07:19 2014 +0200

----------------------------------------------------------------------
 .../activemq/transport/mqtt/MQTTTest.java       | 31 ++++++++++++++------
 1 file changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0a39782b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index e11f6e9..3c0701e 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -379,7 +379,7 @@ public class MQTTTest extends AbstractMQTTTest {
             connection.publish(topic, (RETAINED + topic).getBytes(), QoS.AT_LEAST_ONCE, true);
 
             connection.subscribe(new Topic[] { new Topic(topic, QoS.AT_LEAST_ONCE) });
-            Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+            Message msg = connection.receive(5, TimeUnit.SECONDS);
             assertNotNull("No message for " + topic, msg);
             assertEquals(RETAINED + topic, new String(msg.getPayload()));
             msg.ack();
@@ -406,7 +406,7 @@ public class MQTTTest extends AbstractMQTTTest {
             assertNotEquals("Subscribe failed " + wildcard, (byte)0x80, qos[0]);
 
             // test retained messages
-            Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+            Message msg = connection.receive(5, TimeUnit.SECONDS);
             do {
                 assertNotNull("RETAINED null " + wildcard, msg);
                 assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
@@ -459,7 +459,7 @@ public class MQTTTest extends AbstractMQTTTest {
             final BlockingConnection connection = mqtt.blockingConnection();
             connection.connect();
             connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, true);
-            connection.subscribe(new Topic[] { new Topic(topic, QoS.valueOf(topic)) });
+            connection.subscribe(new Topic[]{new Topic(topic, QoS.valueOf(topic))});
 
             final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
             assertNotNull(msg);
@@ -472,7 +472,7 @@ public class MQTTTest extends AbstractMQTTTest {
             assertEquals(i, actualQoS[0]);
             msg.ack();
 
-            connection.unsubscribe(new String[] { topic });
+            connection.unsubscribe(new String[]{topic});
             connection.disconnect();
         }
 
@@ -1341,10 +1341,9 @@ public class MQTTTest extends AbstractMQTTTest {
         BlockingConnection connectionSub = mqttSub.blockingConnection();
         connectionSub.connect();
         connectionSub.subscribe(topics);
-        connectionSub.unsubscribe(new String[] { "TopicA" });
         connectionSub.disconnect();
 
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < 5; i++) {
             String payload = "Message " + i;
             connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false);
         }
@@ -1353,14 +1352,28 @@ public class MQTTTest extends AbstractMQTTTest {
         connectionSub.connect();
 
         int received = 0;
-        for (int i = 0; i < 10; ++i) {
+        for (int i = 0; i < 5; ++i) {
             Message message = connectionSub.receive(5, TimeUnit.SECONDS);
-            assertNotNull(message);
+            assertNotNull("Missing message " + i, message);
             LOG.info("Message is " + new String(message.getPayload()));
             received++;
             message.ack();
         }
-        assertEquals(10, received);
+        assertEquals(5, received);
+
+        // unsubscribe from topic
+        connectionSub.unsubscribe(new String[]{"TopicA"});
+
+        // send more messages
+        for (int i = 0; i < 5; i++) {
+            String payload = "Message " + i;
+            connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false);
+        }
+
+        // these should not be received
+        connectionSub = mqttSub.blockingConnection();
+        connectionSub.connect();
+        assertNull(connectionSub.receive(5, TimeUnit.SECONDS));
 
         connectionSub.disconnect();
         connectionPub.disconnect();