You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/01/13 06:14:06 UTC
[beam] branch master updated: [BEAM-3336] Fix thread safety issues
of MqttIOTest (#4406)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e15d82b [BEAM-3336] Fix thread safety issues of MqttIOTest (#4406)
e15d82b is described below
commit e15d82b37932eebf32d7f49032625390e0f471a2
Author: Lukasz Cwik <lc...@google.com>
AuthorDate: Fri Jan 12 22:14:01 2018 -0800
[BEAM-3336] Fix thread safety issues of MqttIOTest (#4406)
* [BEAM-3336] Fix thread safety issues of MqttIOTest.
Ran the tests on my desktop 50 times with zero flakes.
* fixup! Adjust test timeouts
---
.../org/apache/beam/sdk/io/mqtt/MqttIOTest.java | 44 +++++++++++-----------
1 file changed, 21 insertions(+), 23 deletions(-)
diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index d7baf3b..94ab0ab 100644
--- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -22,9 +22,8 @@ import static org.junit.Assert.assertTrue;
import java.net.ServerSocket;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Set;
-
+import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.beam.sdk.io.mqtt.MqttIO.Read;
@@ -53,9 +52,9 @@ public class MqttIOTest {
private static final Logger LOG = LoggerFactory.getLogger(MqttIOTest.class);
- private static transient BrokerService brokerService;
+ private BrokerService brokerService;
- private static int port;
+ private int port;
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@@ -63,9 +62,9 @@ public class MqttIOTest {
@Before
public void startBroker() throws Exception {
LOG.info("Finding free network port");
- ServerSocket socket = new ServerSocket(0);
- port = socket.getLocalPort();
- socket.close();
+ try (ServerSocket socket = new ServerSocket(0)) {
+ port = socket.getLocalPort();
+ }
LOG.info("Starting ActiveMQ brokerService on {}", port);
brokerService = new BrokerService();
@@ -81,11 +80,9 @@ public class MqttIOTest {
public void testReadNoClientId() throws Exception {
final String topicName = "READ_TOPIC_NO_CLIENT_ID";
Read mqttReader = MqttIO.read()
- .withConnectionConfiguration(
- MqttIO.ConnectionConfiguration.create(
- "tcp://localhost:" + port,
- topicName))
- .withMaxNumRecords(10);
+ .withConnectionConfiguration(
+ MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, topicName))
+ .withMaxNumRecords(10);
PCollection<byte[]> output = pipeline.apply(mqttReader);
PAssert.that(output).containsInAnyOrder(
"This is test 0".getBytes(),
@@ -122,7 +119,7 @@ public class MqttIOTest {
}
for (int i = 0; i < 10; i++) {
publishConnection.publish(topicName, ("This is test " + i).getBytes(),
- QoS.AT_LEAST_ONCE, false);
+ QoS.EXACTLY_ONCE, false);
}
} catch (Exception e) {
// nothing to do
@@ -136,7 +133,7 @@ public class MqttIOTest {
publisherThread.join();
}
- @Test(timeout = 5 * 1000)
+ @Test(timeout = 30 * 1000)
public void testRead() throws Exception {
PCollection<byte[]> output = pipeline.apply(
MqttIO.read()
@@ -181,7 +178,7 @@ public class MqttIOTest {
}
for (int i = 0; i < 10; i++) {
publishConnection.publish("READ_TOPIC", ("This is test " + i).getBytes(),
- QoS.AT_LEAST_ONCE, false);
+ QoS.EXACTLY_ONCE, false);
}
} catch (Exception e) {
// nothing to do
@@ -191,14 +188,14 @@ public class MqttIOTest {
publisherThread.start();
pipeline.run();
- publishConnection.disconnect();
publisherThread.join();
+ publishConnection.disconnect();
}
/**
* Test for BEAM-3282: this test should not timeout.
*/
- @Test(timeout = 5 * 1000)
+ @Test(timeout = 30 * 1000)
public void testReceiveWithTimeoutAndNoData() throws Exception {
pipeline.apply(MqttIO.read()
.withConnectionConfiguration(
@@ -213,18 +210,19 @@ public class MqttIOTest {
@Test
public void testWrite() throws Exception {
+ final int numberOfTestMessages = 200;
MQTT client = new MQTT();
client.setHost("tcp://localhost:" + port);
final BlockingConnection connection = client.blockingConnection();
connection.connect();
- connection.subscribe(new Topic[]{new Topic(Buffer.utf8("WRITE_TOPIC"), QoS.AT_LEAST_ONCE)});
+ connection.subscribe(new Topic[]{new Topic(Buffer.utf8("WRITE_TOPIC"), QoS.EXACTLY_ONCE)});
- final Set<String> messages = new HashSet<>();
+ final Set<String> messages = new ConcurrentSkipListSet<>();
Thread subscriber = new Thread() {
public void run() {
try {
- for (int i = 0; i < 200; i++) {
+ for (int i = 0; i < numberOfTestMessages; i++) {
Message message = connection.receive();
messages.add(new String(message.getPayload()));
message.ack();
@@ -237,7 +235,7 @@ public class MqttIOTest {
subscriber.start();
ArrayList<byte[]> data = new ArrayList<>();
- for (int i = 0; i < 200; i++) {
+ for (int i = 0; i < numberOfTestMessages; i++) {
data.add(("Test " + i).getBytes());
}
pipeline.apply(Create.of(data))
@@ -251,8 +249,8 @@ public class MqttIOTest {
connection.disconnect();
- assertEquals(200, messages.size());
- for (int i = 0; i < 200; i++) {
+ assertEquals(numberOfTestMessages, messages.size());
+ for (int i = 0; i < numberOfTestMessages; i++) {
assertTrue(messages.contains("Test " + i));
}
}
--
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].