You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/05/26 11:07:31 UTC
[04/13] git commit: Fixed AMQ-5160, polished MQTT tests
Fixed AMQ-5160, polished MQTT tests
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0a39782b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0a39782b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0a39782b
Branch: refs/heads/trunk
Commit: 0a39782bf5d95fc0ae6d54a7fa1469b230621358
Parents: 88c6ee9
Author: Dhiraj Bokde <dh...@yahoo.com>
Authored: Tue May 13 00:30:11 2014 -0700
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Mon May 26 11:07:19 2014 +0200
----------------------------------------------------------------------
.../activemq/transport/mqtt/MQTTTest.java | 31 ++++++++++++++------
1 file changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/0a39782b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index e11f6e9..3c0701e 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -379,7 +379,7 @@ public class MQTTTest extends AbstractMQTTTest {
connection.publish(topic, (RETAINED + topic).getBytes(), QoS.AT_LEAST_ONCE, true);
connection.subscribe(new Topic[] { new Topic(topic, QoS.AT_LEAST_ONCE) });
- Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+ Message msg = connection.receive(5, TimeUnit.SECONDS);
assertNotNull("No message for " + topic, msg);
assertEquals(RETAINED + topic, new String(msg.getPayload()));
msg.ack();
@@ -406,7 +406,7 @@ public class MQTTTest extends AbstractMQTTTest {
assertNotEquals("Subscribe failed " + wildcard, (byte)0x80, qos[0]);
// test retained messages
- Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+ Message msg = connection.receive(5, TimeUnit.SECONDS);
do {
assertNotNull("RETAINED null " + wildcard, msg);
assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
@@ -459,7 +459,7 @@ public class MQTTTest extends AbstractMQTTTest {
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, true);
- connection.subscribe(new Topic[] { new Topic(topic, QoS.valueOf(topic)) });
+ connection.subscribe(new Topic[]{new Topic(topic, QoS.valueOf(topic))});
final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
@@ -472,7 +472,7 @@ public class MQTTTest extends AbstractMQTTTest {
assertEquals(i, actualQoS[0]);
msg.ack();
- connection.unsubscribe(new String[] { topic });
+ connection.unsubscribe(new String[]{topic});
connection.disconnect();
}
@@ -1341,10 +1341,9 @@ public class MQTTTest extends AbstractMQTTTest {
BlockingConnection connectionSub = mqttSub.blockingConnection();
connectionSub.connect();
connectionSub.subscribe(topics);
- connectionSub.unsubscribe(new String[] { "TopicA" });
connectionSub.disconnect();
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 5; i++) {
String payload = "Message " + i;
connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false);
}
@@ -1353,14 +1352,28 @@ public class MQTTTest extends AbstractMQTTTest {
connectionSub.connect();
int received = 0;
- for (int i = 0; i < 10; ++i) {
+ for (int i = 0; i < 5; ++i) {
Message message = connectionSub.receive(5, TimeUnit.SECONDS);
- assertNotNull(message);
+ assertNotNull("Missing message " + i, message);
LOG.info("Message is " + new String(message.getPayload()));
received++;
message.ack();
}
- assertEquals(10, received);
+ assertEquals(5, received);
+
+ // unsubscribe from topic
+ connectionSub.unsubscribe(new String[]{"TopicA"});
+
+ // send more messages
+ for (int i = 0; i < 5; i++) {
+ String payload = "Message " + i;
+ connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false);
+ }
+
+ // these should not be received
+ connectionSub = mqttSub.blockingConnection();
+ connectionSub.connect();
+ assertNull(connectionSub.receive(5, TimeUnit.SECONDS));
connectionSub.disconnect();
connectionPub.disconnect();