You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/10/17 14:40:45 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5399 - mqtt - out of order acks

Repository: activemq
Updated Branches:
  refs/heads/trunk 1a0bd45a4 -> 28b45341d


https://issues.apache.org/jira/browse/AMQ-5399 - mqtt - out of order acks


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/28b45341
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/28b45341
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/28b45341

Branch: refs/heads/trunk
Commit: 28b45341d19e26d098565ec365f5fb0400ab6161
Parents: 1a0bd45
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Fri Oct 17 14:40:11 2014 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Fri Oct 17 14:40:33 2014 +0200

----------------------------------------------------------------------
 .../transport/mqtt/MQTTSubscription.java        |   2 +-
 .../activemq/transport/mqtt/PahoMQTTTest.java   | 135 ++++++++++++++++++-
 2 files changed, 131 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/28b45341/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
index d265335..7b632e3 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSubscription.java
@@ -56,7 +56,7 @@ public class MQTTSubscription {
      * @return a new {@link MessageAck} command to acknowledge the message.
      */
     public MessageAck createMessageAck(MessageDispatch md) {
-        return new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
+        return new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/28b45341/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
index 498ea6d..e5e5fe5 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
@@ -16,10 +16,7 @@
  */
 package org.apache.activemq.transport.mqtt;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -31,16 +28,28 @@ import javax.jms.MessageListener;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnection;
-import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.apache.activemq.util.Wait;
+import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.Assert.*;
+
 public class PahoMQTTTest extends MQTTTestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class);
 
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        protocolConfig = "transport.activeMQSubscriptionPrefetch=32766";
+        super.setUp();
+    }
+
     @Test(timeout = 300000)
     public void testLotsOfClients() throws Exception {
 
@@ -130,4 +139,120 @@ public class PahoMQTTTest extends MQTTTestSupport {
         client.disconnect();
         client.close();
     }
+
+    @Test(timeout = 300000)
+    public void testCleanSession() throws Exception {
+        String topic = "test";
+        final DefaultListener listener = new DefaultListener();
+
+        // subscriber connects and creates durable sub
+        LOG.info("Connecting durable subscriber...");
+        MqttClient client = createClient(false, "receive", listener);
+        // subscribe and wait for the retain message to arrive
+        LOG.info("Subscribing durable subscriber...");
+        client.subscribe(topic, 1);
+        assertTrue(client.getPendingDeliveryTokens().length == 0);
+        disconnect(client);
+        LOG.info("Disconnected durable subscriber.");
+
+        // Publish message with QoS 1
+        MqttClient client2 = createClient(true, "publish", listener);
+
+        LOG.info("Publish message with QoS 1...");
+        String expectedResult = "QOS 1 message";
+        client2.publish(topic, expectedResult.getBytes(), 1, false);
+        waitForDelivery(client2);
+
+        // Publish message with QoS 0
+        LOG.info("Publish message with QoS 0...");
+        expectedResult = "QOS 0 message";
+        client2.publish(topic, expectedResult.getBytes(), 0, false);
+        waitForDelivery(client2);
+
+        // subscriber reconnects
+        LOG.info("Reconnecting durable subscriber...");
+        MqttClient client3 = createClient(false, "receive", listener);
+
+        LOG.info("Subscribing durable subscriber...");
+        client3.subscribe(topic, 1);
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return listener.received == 2;
+            }
+        });
+        assertEquals(2, listener.received);
+        disconnect(client3);
+        LOG.info("Disconnected durable subscriber.");
+
+        // make sure we consumed everything
+        listener.received = 0;
+
+        LOG.info("Reconnecting durable subscriber...");
+        MqttClient client4 = createClient(false, "receive", listener);
+
+        LOG.info("Subscribing durable subscriber...");
+        client4.subscribe(topic, 1);
+        Thread.sleep(3 * 1000);
+        assertEquals(0, listener.received);
+    }
+
+    protected MqttClient createClient(boolean cleanSession, String clientId, MqttCallback listener) throws Exception {
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setCleanSession(cleanSession);
+        final MqttClient client = new MqttClient("tcp://localhost:" + getPort(), clientId, new MemoryPersistence());
+        client.setCallback(listener);
+        client.connect(options);
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return client.isConnected();
+            }
+        });
+        return client;
+    }
+
+    protected void disconnect(final MqttClient client) throws Exception {
+        client.disconnect();
+        client.close();
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return !client.isConnected();
+            }
+        });
+    }
+
+    protected void waitForDelivery(final MqttClient client) throws Exception {
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return client.getPendingDeliveryTokens().length == 0;
+            }
+        });
+        assertTrue(client.getPendingDeliveryTokens().length == 0);
+    }
+
+    static class DefaultListener implements MqttCallback {
+
+        int received = 0;
+
+        @Override
+        public void connectionLost(Throwable cause) {
+
+        }
+
+        @Override
+        public void messageArrived(String topic, MqttMessage message) throws Exception {
+            LOG.info("Received: " + message);
+            received++;
+        }
+
+        @Override
+        public void deliveryComplete(IMqttDeliveryToken token) {
+
+        }
+    }
+
 }
\ No newline at end of file