You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/01/13 06:14:06 UTC

[beam] branch master updated: [BEAM-3336] Fix thread safety issues of MqttIOTest (#4406)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e15d82b  [BEAM-3336] Fix thread safety issues of MqttIOTest (#4406)
e15d82b is described below

commit e15d82b37932eebf32d7f49032625390e0f471a2
Author: Lukasz Cwik <lc...@google.com>
AuthorDate: Fri Jan 12 22:14:01 2018 -0800

    [BEAM-3336] Fix thread safety issues of MqttIOTest (#4406)
    
    * [BEAM-3336] Fix thread safety issues of MqttIOTest.
    
    Ran the tests on my desktop 50 times with zero flakes.
    
    * fixup! Adjust test timeouts
---
 .../org/apache/beam/sdk/io/mqtt/MqttIOTest.java    | 44 +++++++++++-----------
 1 file changed, 21 insertions(+), 23 deletions(-)

diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index d7baf3b..94ab0ab 100644
--- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -22,9 +22,8 @@ import static org.junit.Assert.assertTrue;
 
 import java.net.ServerSocket;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Set;
-
+import java.util.concurrent.ConcurrentSkipListSet;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.Connection;
 import org.apache.beam.sdk.io.mqtt.MqttIO.Read;
@@ -53,9 +52,9 @@ public class MqttIOTest {
 
   private static final Logger LOG = LoggerFactory.getLogger(MqttIOTest.class);
 
-  private static transient BrokerService brokerService;
+  private BrokerService brokerService;
 
-  private static int port;
+  private int port;
 
   @Rule
   public final transient TestPipeline pipeline = TestPipeline.create();
@@ -63,9 +62,9 @@ public class MqttIOTest {
   @Before
   public void startBroker() throws Exception {
     LOG.info("Finding free network port");
-    ServerSocket socket = new ServerSocket(0);
-    port = socket.getLocalPort();
-    socket.close();
+    try (ServerSocket socket = new ServerSocket(0)) {
+      port = socket.getLocalPort();
+    }
 
     LOG.info("Starting ActiveMQ brokerService on {}", port);
     brokerService = new BrokerService();
@@ -81,11 +80,9 @@ public class MqttIOTest {
   public void testReadNoClientId() throws Exception {
     final String topicName = "READ_TOPIC_NO_CLIENT_ID";
     Read mqttReader = MqttIO.read()
-    .withConnectionConfiguration(
-        MqttIO.ConnectionConfiguration.create(
-            "tcp://localhost:" + port,
-            topicName))
-  .withMaxNumRecords(10);
+        .withConnectionConfiguration(
+            MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, topicName))
+        .withMaxNumRecords(10);
     PCollection<byte[]> output = pipeline.apply(mqttReader);
     PAssert.that(output).containsInAnyOrder(
         "This is test 0".getBytes(),
@@ -122,7 +119,7 @@ public class MqttIOTest {
           }
           for (int i = 0; i < 10; i++) {
             publishConnection.publish(topicName, ("This is test " + i).getBytes(),
-                QoS.AT_LEAST_ONCE, false);
+                QoS.EXACTLY_ONCE, false);
           }
         } catch (Exception e) {
           // nothing to do
@@ -136,7 +133,7 @@ public class MqttIOTest {
     publisherThread.join();
   }
 
-  @Test(timeout = 5 * 1000)
+  @Test(timeout = 30 * 1000)
   public void testRead() throws Exception {
     PCollection<byte[]> output = pipeline.apply(
         MqttIO.read()
@@ -181,7 +178,7 @@ public class MqttIOTest {
           }
           for (int i = 0; i < 10; i++) {
             publishConnection.publish("READ_TOPIC", ("This is test " + i).getBytes(),
-                QoS.AT_LEAST_ONCE, false);
+                QoS.EXACTLY_ONCE, false);
           }
         } catch (Exception e) {
           // nothing to do
@@ -191,14 +188,14 @@ public class MqttIOTest {
     publisherThread.start();
     pipeline.run();
 
-    publishConnection.disconnect();
     publisherThread.join();
+    publishConnection.disconnect();
   }
 
   /**
    * Test for BEAM-3282: this test should not timeout.
    */
-  @Test(timeout = 5 * 1000)
+  @Test(timeout = 30 * 1000)
   public void testReceiveWithTimeoutAndNoData() throws Exception {
     pipeline.apply(MqttIO.read()
         .withConnectionConfiguration(
@@ -213,18 +210,19 @@ public class MqttIOTest {
 
   @Test
   public void testWrite() throws Exception {
+    final int numberOfTestMessages = 200;
     MQTT client = new MQTT();
     client.setHost("tcp://localhost:" + port);
     final BlockingConnection connection = client.blockingConnection();
     connection.connect();
-    connection.subscribe(new Topic[]{new Topic(Buffer.utf8("WRITE_TOPIC"), QoS.AT_LEAST_ONCE)});
+    connection.subscribe(new Topic[]{new Topic(Buffer.utf8("WRITE_TOPIC"), QoS.EXACTLY_ONCE)});
 
-    final Set<String> messages = new HashSet<>();
+    final Set<String> messages = new ConcurrentSkipListSet<>();
 
     Thread subscriber = new Thread() {
       public void run() {
         try {
-          for (int i = 0; i < 200; i++) {
+          for (int i = 0; i < numberOfTestMessages; i++) {
             Message message = connection.receive();
             messages.add(new String(message.getPayload()));
             message.ack();
@@ -237,7 +235,7 @@ public class MqttIOTest {
     subscriber.start();
 
     ArrayList<byte[]> data = new ArrayList<>();
-    for (int i = 0; i < 200; i++) {
+    for (int i = 0; i < numberOfTestMessages; i++) {
       data.add(("Test " + i).getBytes());
     }
     pipeline.apply(Create.of(data))
@@ -251,8 +249,8 @@ public class MqttIOTest {
 
     connection.disconnect();
 
-    assertEquals(200, messages.size());
-    for (int i = 0; i < 200; i++) {
+    assertEquals(numberOfTestMessages, messages.size());
+    for (int i = 0; i < numberOfTestMessages; i++) {
       assertTrue(messages.contains("Test " + i));
     }
   }

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].