You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/10/21 23:59:38 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5390

Repository: activemq
Updated Branches:
  refs/heads/trunk d9d9d5b66 -> 74d2c2425


https://issues.apache.org/jira/browse/AMQ-5390

Adds a test case to show that things work as expected.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/74d2c242
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/74d2c242
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/74d2c242

Branch: refs/heads/trunk
Commit: 74d2c2425fbcbdf38f910f3c321d89780ba2ab6c
Parents: d9d9d5b
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 21 17:59:32 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Oct 21 17:59:32 2014 -0400

----------------------------------------------------------------------
 .../activemq/transport/mqtt/MQTTTest.java       | 51 ++++++++++++++++++++
 1 file changed, 51 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/74d2c242/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 32f8167..c9f106d 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -1408,6 +1408,57 @@ public class MQTTTest extends MQTTTestSupport {
         assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
     }
 
+    @Test(timeout = 60 * 1000)
+    public void testReceiveMessageSentWhileOfflineAndBrokerRestart() throws Exception {
+        stopBroker();
+        this.persistent = true;
+        startBroker();
+
+        final byte[] payload = new byte[1024 * 32];
+        for (int i = 0; i < payload.length; i++) {
+            payload[i] = '2';
+        }
+
+        int messagesPerRun = 10;
+
+        Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
+
+        {
+            // Establish a durable subscription.
+            MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
+            BlockingConnection connectionSub = mqttSub.blockingConnection();
+            connectionSub.connect();
+            connectionSub.subscribe(topics);
+            connectionSub.disconnect();
+        }
+
+        MQTT mqttPubLoop = createMQTTConnection("MQTT-Pub-Client", true);
+        BlockingConnection connectionPub = mqttPubLoop.blockingConnection();
+        connectionPub.connect();
+
+        for (int i = 0; i < messagesPerRun; ++i) {
+            connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
+        }
+
+        connectionPub.disconnect();
+
+        stopBroker();
+        startBroker();
+
+        MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
+        BlockingConnection connectionSub = mqttSub.blockingConnection();
+        connectionSub.connect();
+        connectionSub.subscribe(topics);
+
+        for (int i = 0; i < messagesPerRun; ++i) {
+            Message message = connectionSub.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            assertTrue(Arrays.equals(payload, message.getPayload()));
+            message.ack();
+        }
+        connectionSub.disconnect();
+    }
+
     @Test(timeout = 30 * 1000)
     public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
         stopBroker();