You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/05/13 22:31:00 UTC

activemq git commit: Refactor test which has some flawed assumptions about the incoming messages based on subscriptions that it makes. Adds better thread safety as well.

Repository: activemq
Updated Branches:
  refs/heads/master 15405af2e -> ab434ee77


Refactor test which has some flawed assumptions about the incoming
messages based on subscriptions that it makes.  Adds better thread
safety as well.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ab434ee7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ab434ee7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ab434ee7

Branch: refs/heads/master
Commit: ab434ee776bda7afeb6c1ae26f66559940afbf6d
Parents: 15405af
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri May 13 18:30:44 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri May 13 18:30:44 2016 -0400

----------------------------------------------------------------------
 .../activemq/transport/mqtt/PahoMQTTTest.java   | 223 ++++++++-----------
 1 file changed, 99 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ab434ee7/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
index c105b5f..343c0c8 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,13 +16,14 @@
  */
 package org.apache.activemq.transport.mqtt;
 
-
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -41,6 +42,7 @@ import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,7 +55,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
         return s.createConsumer(s.createTopic(topic));
     }
 
-    @Test(timeout = 300000)
+    @Test(timeout = 90000)
     public void testLotsOfClients() throws Exception {
 
         final int CLIENTS = Integer.getInteger("PahoMQTTTest.CLIENTS", 100);
@@ -88,7 +90,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
                         sendBarrier.await();
                         for (int i = 0; i < 10; i++) {
                             Thread.sleep(1000);
-                            client.publish("test", "hello".getBytes(), 1, false);
+                            client.publish("test", "hello".getBytes(StandardCharsets.UTF_8), 1, false);
                         }
                         client.disconnect();
                         client.close();
@@ -122,7 +124,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
         assertNull("Async error: " + asyncError.get(), asyncError.get());
     }
 
-    @Test(timeout = 300000)
+    @Test(timeout = 90000)
     public void testSendAndReceiveMQTT() throws Exception {
 
         ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
@@ -132,15 +134,16 @@ public class PahoMQTTTest extends MQTTTestSupport {
 
         MqttClient client = new MqttClient("tcp://localhost:" + getPort(), "clientid", new MemoryPersistence());
         client.connect();
-        client.publish("test", "hello".getBytes(), 1, false);
+        client.publish("test", "hello".getBytes(StandardCharsets.UTF_8), 1, false);
 
         Message msg = consumer.receive(100 * 5);
         assertNotNull(msg);
 
         client.disconnect();
+        client.close();
     }
 
-    @Test(timeout = 300000)
+    @Test(timeout = 90000)
     public void testSubs() throws Exception {
 
         final DefaultListener listener = new DefaultListener();
@@ -155,29 +158,22 @@ public class PahoMQTTTest extends MQTTTestSupport {
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
         String expectedResult = "should get everything";
-        client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(), 0, false);
-
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return listener.result != null;
-            }
-        }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200));
-
+        client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
 
+        // One delivery for topic  ACCOUNT_PREFIX + "#"
+        String result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
         assertTrue(client.getPendingDeliveryTokens().length == 0);
-        assertEquals(expectedResult, listener.result);
+        assertEquals(expectedResult, result);
 
         expectedResult = "should get everything";
-        listener.result = null;
-        client.publish(ACCOUNT_PREFIX + "a/1/2", expectedResult.getBytes(), 0, false);
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return listener.result != null;
-            }
-        }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200));
-        assertEquals(expectedResult, listener.result);
+        client.publish(ACCOUNT_PREFIX + "a/1/2", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
+
+        // One delivery for topic  ACCOUNT_PREFIX + "a/1/2"
+        result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
+        assertEquals(expectedResult, result);
+        // One delivery for topic  ACCOUNT_PREFIX + "#"
+        result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
+        assertEquals(expectedResult, result);
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
         client.unsubscribe(ACCOUNT_PREFIX + "a/+/#");
@@ -185,19 +181,27 @@ public class PahoMQTTTest extends MQTTTestSupport {
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
         expectedResult = "should still get 1/2/3";
-        listener.result = null;
-        client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(), 0, false);
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return listener.result != null;
-            }
-        }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200));
-        assertEquals(expectedResult, listener.result);
+        client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
+
+        // One delivery for topic  ACCOUNT_PREFIX + "1/2/3"
+        result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
+        assertEquals(expectedResult, result);
         assertTrue(client.getPendingDeliveryTokens().length == 0);
+
+        client.disconnect();
+        client.close();
     }
 
+    @Ignore
     @Test(timeout = 300000)
+    public void testOverlappingTopicsLooped() throws Exception {
+        for (int i = 0; i < 100; ++i) {
+            LOG.info("Running test iteration: {}", i);
+            testOverlappingTopics();
+        }
+    }
+
+    @Test(timeout = 90000)
     public void testOverlappingTopics() throws Exception {
 
         final DefaultListener listener = new DefaultListener();
@@ -212,26 +216,16 @@ public class PahoMQTTTest extends MQTTTestSupport {
         client.subscribe(ACCOUNT_PREFIX + "#");
         assertTrue(client.getPendingDeliveryTokens().length == 0);
         String expectedResult = "hello mqtt broker on hash";
-        client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(), 0, false);
-        assertTrue(Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return listener.result != null;
-            }
-        }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
-        assertEquals(expectedResult, listener.result);
+        client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
+
+        String result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
+        assertEquals(expectedResult, result);
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
         expectedResult = "hello mqtt broker on a different topic";
-        listener.result = null;
-        client.publish(ACCOUNT_PREFIX + "1/2/3/4/5/6", expectedResult.getBytes(), 0, false);
-        assertTrue(Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return listener.result != null;
-            }
-        }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
-        assertEquals(expectedResult, listener.result);
+        client.publish(ACCOUNT_PREFIX + "1/2/3/4/5/6", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
+        result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
+        assertEquals(expectedResult, result);
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
         // *****************************************
@@ -242,27 +236,22 @@ public class PahoMQTTTest extends MQTTTestSupport {
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
         expectedResult = "hello mqtt broker on explicit topic";
-        listener.result = null;
-        client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(), 0, false);
-        assertTrue(Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return listener.result != null;
-            }
-        }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
-        assertEquals(expectedResult, listener.result);
+        client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
+
+        // One message from topic subscription on ACCOUNT_PREFIX + "#"
+        result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
+        assertEquals(expectedResult, result);
+
+        // One message from topic subscription on ACCOUNT_PREFIX + "1/2/3"
+        result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
+        assertEquals(expectedResult, result);
+
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
         expectedResult = "hello mqtt broker on some other topic";
-        listener.result = null;
-        client.publish(ACCOUNT_PREFIX + "a/b/c/d/e", expectedResult.getBytes(), 0, false);
-        assertTrue(Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return listener.result != null;
-            }
-        }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
-        assertEquals(expectedResult, listener.result);
+        client.publish(ACCOUNT_PREFIX + "a/b/c/d/e", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
+        result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS);
+        assertEquals(expectedResult, result);
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
         // *****************************************
@@ -272,31 +261,22 @@ public class PahoMQTTTest extends MQTTTestSupport {
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
         expectedResult = "this should not come back...";
-        listener.result = null;
-        client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(), 0, false);
-        assertFalse(Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return listener.result != null;
-            }
-        }, TimeUnit.SECONDS.toMillis(5)));
-        assertNull(listener.result);
+        client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
+        result = listener.messageQ.poll(3, TimeUnit.SECONDS);
+        assertNull(result);
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
         expectedResult = "this should not come back either...";
-        listener.result = null;
-        client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(), 0, false);
-        assertFalse(Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return listener.result != null;
-            }
-        }, TimeUnit.SECONDS.toMillis(5)));
-        assertNull(listener.result);
+        client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
+        result = listener.messageQ.poll(3, TimeUnit.SECONDS);
+        assertNull(result);
         assertTrue(client.getPendingDeliveryTokens().length == 0);
+
+        client.disconnect();
+        client.close();
     }
 
-    @Test(timeout = 300000)
+    @Test(timeout = 90000)
     public void testCleanSession() throws Exception {
         String topic = "test";
         final DefaultListener listener = new DefaultListener();
@@ -316,13 +296,13 @@ public class PahoMQTTTest extends MQTTTestSupport {
 
         LOG.info("Publish message with QoS 1...");
         String expectedResult = "QOS 1 message";
-        client2.publish(topic, expectedResult.getBytes(), 1, false);
+        client2.publish(topic, expectedResult.getBytes(StandardCharsets.UTF_8), 1, false);
         waitForDelivery(client2);
 
         // Publish message with QoS 0
         LOG.info("Publish message with QoS 0...");
         expectedResult = "QOS 0 message";
-        client2.publish(topic, expectedResult.getBytes(), 0, false);
+        client2.publish(topic, expectedResult.getBytes(StandardCharsets.UTF_8), 0, false);
         waitForDelivery(client2);
 
         // subscriber reconnects
@@ -335,26 +315,31 @@ public class PahoMQTTTest extends MQTTTestSupport {
         assertTrue(Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
-                return listener.received == 2;
+                return listener.received.get() == 2;
             }
         }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(100)));
-        assertEquals(2, listener.received);
+        assertEquals(2, listener.received.get());
         disconnect(client3);
         LOG.info("Disconnected durable subscriber.");
 
         // make sure we consumed everything
-        listener.received = 0;
+        assertTrue(listener.received.compareAndSet(2, 0));
 
         LOG.info("Reconnecting durable subscriber...");
         MqttClient client4 = createClient(false, "receive", listener);
 
         LOG.info("Subscribing durable subscriber...");
         client4.subscribe(topic, 1);
-        Thread.sleep(3 * 1000);
-        assertEquals(0, listener.received);
+        TimeUnit.SECONDS.sleep(3);
+        assertEquals(0, listener.received.get());
+
+        client2.disconnect();
+        client2.close();
+        client4.disconnect();
+        client4.close();
     }
 
-    @Test(timeout = 300000)
+    @Test(timeout = 90000)
     public void testClientIdSpecialChars() throws Exception {
         testClientIdSpecialChars(MqttConnectOptions.MQTT_VERSION_3_1);
         testClientIdSpecialChars(MqttConnectOptions.MQTT_VERSION_3_1_1);
@@ -374,28 +359,18 @@ public class PahoMQTTTest extends MQTTTestSupport {
         client1.subscribe(topic, 1);
 
         String message = "Message from client: " + clientId;
-        client1.publish(topic, message.getBytes(), 1, false);
+        client1.publish(topic, message.getBytes(StandardCharsets.UTF_8), 1, false);
 
+        String result = client1MqttCallback.messageQ.poll(45, TimeUnit.MILLISECONDS);
+        assertEquals(message, result);
+        assertEquals(1, client1MqttCallback.received.get());
 
-        assertTrue(Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return client1MqttCallback.result != null;
-            }
-        }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
-        assertEquals(message, client1MqttCallback.result);
-        assertEquals(1, client1MqttCallback.received);
-
-        assertTrue(Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return clientAdminMqttCallback.result != null;
-            }
-        }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)));
-        assertEquals(message, clientAdminMqttCallback.result);
+        result = clientAdminMqttCallback.messageQ.poll(45, TimeUnit.MILLISECONDS);
+        assertEquals(message, result);
 
         assertTrue(client1.isConnected());
         client1.disconnect();
+        client1.close();
     }
 
     protected void testClientIdSpecialChars(int mqttVersion) throws Exception {
@@ -414,8 +389,10 @@ public class PahoMQTTTest extends MQTTTestSupport {
         testClientId("Consumer:id:AT_LEAST_ONCE", mqttVersion, clientAdminMqttCallback);
         testClientId("Consumer:qid:EXACTLY_ONCE:VirtualTopic", mqttVersion, clientAdminMqttCallback);
         testClientId("Consumertestmin:testst:AT_LEAST_ONCE.VirtualTopic::AT_LEAST_ONCE", mqttVersion, clientAdminMqttCallback);
-    }
 
+        clientAdmin.disconnect();
+        clientAdmin.close();
+    }
 
     protected MqttClient createClient(boolean cleanSession, String clientId, MqttCallback listener) throws Exception {
         MqttConnectOptions options = new MqttConnectOptions();
@@ -453,30 +430,28 @@ public class PahoMQTTTest extends MQTTTestSupport {
             public boolean isSatisified() throws Exception {
                 return client.getPendingDeliveryTokens().length == 0;
             }
-        }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(200));
+        }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100));
         assertTrue(client.getPendingDeliveryTokens().length == 0);
     }
 
     static class DefaultListener implements MqttCallback {
 
-        int received = 0;
-        volatile String result;
+        final AtomicInteger received = new AtomicInteger();
+        final BlockingQueue<String> messageQ = new ArrayBlockingQueue<String>(10);
 
         @Override
         public void connectionLost(Throwable cause) {
-
         }
 
         @Override
         public void messageArrived(String topic, MqttMessage message) throws Exception {
-            LOG.debug("Received: " + message);
-            received++;
-            result = new String(message.getPayload());
+            LOG.info("Received: {}", message);
+            received.incrementAndGet();
+            messageQ.put(new String(message.getPayload(), StandardCharsets.UTF_8));
         }
 
         @Override
         public void deliveryComplete(IMqttDeliveryToken token) {
-
         }
     }
 }
\ No newline at end of file