You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/10/17 14:40:45 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5399 - mqtt -
out of order acks
Repository: activemq
Updated Branches:
refs/heads/trunk 1a0bd45a4 -> 28b45341d
https://issues.apache.org/jira/browse/AMQ-5399 - mqtt - out of order acks
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/28b45341
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/28b45341
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/28b45341
Branch: refs/heads/trunk
Commit: 28b45341d19e26d098565ec365f5fb0400ab6161
Parents: 1a0bd45
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Fri Oct 17 14:40:11 2014 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Fri Oct 17 14:40:33 2014 +0200
----------------------------------------------------------------------
.../transport/mqtt/MQTTSubscription.java | 2 +-
.../activemq/transport/mqtt/PahoMQTTTest.java | 135 ++++++++++++++++++-
2 files changed, 131 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/28b45341/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
index d265335..7b632e3 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
@@ -56,7 +56,7 @@ public class MQTTSubscription {
* @return a new {@link MessageAck} command to acknowledge the message.
*/
public MessageAck createMessageAck(MessageDispatch md) {
- return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
+ return new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
}
/**
http://git-wip-us.apache.org/repos/asf/activemq/blob/28b45341/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
index 498ea6d..e5e5fe5 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
@@ -16,10 +16,7 @@
*/
package org.apache.activemq.transport.mqtt;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
+import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -31,16 +28,28 @@ import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
-import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.apache.activemq.util.Wait;
+import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
public class PahoMQTTTest extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class);
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ protocolConfig = "transport.activeMQSubscriptionPrefetch=32766";
+ super.setUp();
+ }
+
@Test(timeout = 300000)
public void testLotsOfClients() throws Exception {
@@ -130,4 +139,120 @@ public class PahoMQTTTest extends MQTTTestSupport {
client.disconnect();
client.close();
}
+
+ @Test(timeout = 300000)
+ public void testCleanSession() throws Exception {
+ String topic = "test";
+ final DefaultListener listener = new DefaultListener();
+
+ // subscriber connects and creates durable sub
+ LOG.info("Connecting durable subscriber...");
+ MqttClient client = createClient(false, "receive", listener);
+ // subscribe and wait for the retain message to arrive
+ LOG.info("Subscribing durable subscriber...");
+ client.subscribe(topic, 1);
+ assertTrue(client.getPendingDeliveryTokens().length == 0);
+ disconnect(client);
+ LOG.info("Disconnected durable subscriber.");
+
+ // Publish message with QoS 1
+ MqttClient client2 = createClient(true, "publish", listener);
+
+ LOG.info("Publish message with QoS 1...");
+ String expectedResult = "QOS 1 message";
+ client2.publish(topic, expectedResult.getBytes(), 1, false);
+ waitForDelivery(client2);
+
+ // Publish message with QoS 0
+ LOG.info("Publish message with QoS 0...");
+ expectedResult = "QOS 0 message";
+ client2.publish(topic, expectedResult.getBytes(), 0, false);
+ waitForDelivery(client2);
+
+ // subscriber reconnects
+ LOG.info("Reconnecting durable subscriber...");
+ MqttClient client3 = createClient(false, "receive", listener);
+
+ LOG.info("Subscribing durable subscriber...");
+ client3.subscribe(topic, 1);
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return listener.received == 2;
+ }
+ });
+ assertEquals(2, listener.received);
+ disconnect(client3);
+ LOG.info("Disconnected durable subscriber.");
+
+ // make sure we consumed everything
+ listener.received = 0;
+
+ LOG.info("Reconnecting durable subscriber...");
+ MqttClient client4 = createClient(false, "receive", listener);
+
+ LOG.info("Subscribing durable subscriber...");
+ client4.subscribe(topic, 1);
+ Thread.sleep(3 * 1000);
+ assertEquals(0, listener.received);
+ }
+
+ protected MqttClient createClient(boolean cleanSession, String clientId, MqttCallback listener) throws Exception {
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setCleanSession(cleanSession);
+ final MqttClient client = new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence());
+ client.setCallback(listener);
+ client.connect(options);
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return client.isConnected();
+ }
+ });
+ return client;
+ }
+
+ protected void disconnect(final MqttClient client) throws Exception {
+ client.disconnect();
+ client.close();
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !client.isConnected();
+ }
+ });
+ }
+
+ protected void waitForDelivery(final MqttClient client) throws Exception {
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return client.getPendingDeliveryTokens().length == 0;
+ }
+ });
+ assertTrue(client.getPendingDeliveryTokens().length == 0);
+ }
+
+ static class DefaultListener implements MqttCallback {
+
+ int received = 0;
+
+ @Override
+ public void connectionLost(Throwable cause) {
+
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
+ LOG.info("Received: " + message);
+ received++;
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+
+ }
+ }
+
}
\ No newline at end of file