You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/03/10 16:03:04 UTC
[1/2] beam git commit: [BEAM-1686] Use random MQTT clientID when not
defined to avoid NPE
Repository: beam
Updated Branches:
refs/heads/master fdba784a8 -> 818fc9412
[BEAM-1686] Use random MQTT clientID when not defined to avoid NPE
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/752ad8a0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/752ad8a0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/752ad8a0
Branch: refs/heads/master
Commit: 752ad8a0ec5597ca84c27cb21862e32d05a81420
Parents: fdba784
Author: Borisa Zivkovic <bo...@huawei.com>
Authored: Fri Mar 10 12:20:13 2017 +0000
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Fri Mar 10 17:00:57 2017 +0100
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/mqtt/MqttIO.java | 4 ++
.../org/apache/beam/sdk/io/mqtt/MqttIOTest.java | 61 ++++++++++++++++++++
2 files changed, 65 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/752ad8a0/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index 26234cf..46f2dcc 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -209,6 +209,10 @@ public class MqttIO {
String clientId = getClientId() + "-" + UUID.randomUUID().toString();
LOG.debug("MQTT client id set to {}", clientId);
client.setClientId(clientId);
+ } else {
+ String clientId = UUID.randomUUID().toString();
+ LOG.debug("MQTT client id set to random value {}", clientId);
+ client.setClientId(clientId);
}
return client;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/752ad8a0/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index 8a82f40..28ca5f7 100644
--- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -27,6 +27,7 @@ import java.util.Set;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
+import org.apache.beam.sdk.io.mqtt.MqttIO.Read;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -79,6 +80,66 @@ public class MqttIOTest {
@Test(timeout = 60 * 1000)
@Category(RunnableOnService.class)
+ public void testReadNoClientId() throws Exception {
+ final String topicName = "READ_TOPIC_NO_CLIENT_ID";
+ Read mqttReader = MqttIO.read()
+ .withConnectionConfiguration(
+ MqttIO.ConnectionConfiguration.create(
+ "tcp://localhost:" + port,
+ topicName))
+ .withMaxNumRecords(10);
+ PCollection<byte[]> output = pipeline.apply(mqttReader);
+ PAssert.that(output).containsInAnyOrder(
+ "This is test 0".getBytes(),
+ "This is test 1".getBytes(),
+ "This is test 2".getBytes(),
+ "This is test 3".getBytes(),
+ "This is test 4".getBytes(),
+ "This is test 5".getBytes(),
+ "This is test 6".getBytes(),
+ "This is test 7".getBytes(),
+ "This is test 8".getBytes(),
+ "This is test 9".getBytes()
+ );
+
+ // produce messages on the brokerService in another thread
+ // This thread prevents to block the pipeline waiting for new messages
+ MQTT client = new MQTT();
+ client.setHost("tcp://localhost:" + port);
+ final BlockingConnection publishConnection = client.blockingConnection();
+ publishConnection.connect();
+ Thread publisherThread = new Thread() {
+ public void run() {
+ try {
+ LOG.info("Waiting pipeline connected to the MQTT broker before sending "
+ + "messages ...");
+ boolean pipelineConnected = false;
+ while (!pipelineConnected) {
+ Thread.sleep(1000);
+ for (Connection connection : brokerService.getBroker().getClients()) {
+ if (!connection.getConnectionId().isEmpty()) {
+ pipelineConnected = true;
+ }
+ }
+ }
+ for (int i = 0; i < 10; i++) {
+ publishConnection.publish(topicName, ("This is test " + i).getBytes(),
+ QoS.AT_LEAST_ONCE, false);
+ }
+ } catch (Exception e) {
+ // nothing to do
+ }
+ }
+ };
+ publisherThread.start();
+ pipeline.run();
+
+ publishConnection.disconnect();
+ publisherThread.join();
+ }
+
+ @Test(timeout = 60 * 1000)
+ @Category(RunnableOnService.class)
public void testRead() throws Exception {
PCollection<byte[]> output = pipeline.apply(
MqttIO.read()
[2/2] beam git commit: [BEAM-1686] This closes #2219
Posted by jb...@apache.org.
[BEAM-1686] This closes #2219
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/818fc941
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/818fc941
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/818fc941
Branch: refs/heads/master
Commit: 818fc9412f0b5604c6c6ae4ba4ed957bafcb0535
Parents: fdba784 752ad8a
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Fri Mar 10 17:02:54 2017 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Fri Mar 10 17:02:54 2017 +0100
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/mqtt/MqttIO.java | 4 ++
.../org/apache/beam/sdk/io/mqtt/MqttIOTest.java | 61 ++++++++++++++++++++
2 files changed, 65 insertions(+)
----------------------------------------------------------------------