You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/10/21 23:59:38 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5390
Repository: activemq
Updated Branches:
refs/heads/trunk d9d9d5b66 -> 74d2c2425
https://issues.apache.org/jira/browse/AMQ-5390
Adds a test case to show that things work as expected.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/74d2c242
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/74d2c242
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/74d2c242
Branch: refs/heads/trunk
Commit: 74d2c2425fbcbdf38f910f3c321d89780ba2ab6c
Parents: d9d9d5b
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 21 17:59:32 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Oct 21 17:59:32 2014 -0400
----------------------------------------------------------------------
.../activemq/transport/mqtt/MQTTTest.java | 51 ++++++++++++++++++++
1 file changed, 51 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/74d2c242/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 32f8167..c9f106d 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -1408,6 +1408,57 @@ public class MQTTTest extends MQTTTestSupport {
assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received);
}
+ @Test(timeout = 60 * 1000)
+ public void testReceiveMessageSentWhileOfflineAndBrokerRestart() throws Exception {
+ stopBroker();
+ this.persistent = true;
+ startBroker();
+
+ final byte[] payload = new byte[1024 * 32];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = '2';
+ }
+
+ int messagesPerRun = 10;
+
+ Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
+
+ {
+ // Establish a durable subscription.
+ MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
+ BlockingConnection connectionSub = mqttSub.blockingConnection();
+ connectionSub.connect();
+ connectionSub.subscribe(topics);
+ connectionSub.disconnect();
+ }
+
+ MQTT mqttPubLoop = createMQTTConnection("MQTT-Pub-Client", true);
+ BlockingConnection connectionPub = mqttPubLoop.blockingConnection();
+ connectionPub.connect();
+
+ for (int i = 0; i < messagesPerRun; ++i) {
+ connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false);
+ }
+
+ connectionPub.disconnect();
+
+ stopBroker();
+ startBroker();
+
+ MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
+ BlockingConnection connectionSub = mqttSub.blockingConnection();
+ connectionSub.connect();
+ connectionSub.subscribe(topics);
+
+ for (int i = 0; i < messagesPerRun; ++i) {
+ Message message = connectionSub.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ assertTrue(Arrays.equals(payload, message.getPayload()));
+ message.ack();
+ }
+ connectionSub.disconnect();
+ }
+
@Test(timeout = 30 * 1000)
public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
stopBroker();