You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/01/09 10:21:00 UTC

[jira] [Commented] (BEAM-3282) MqttIO reader should use receive with timeout

    [ https://issues.apache.org/jira/browse/BEAM-3282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16318206#comment-16318206 ] 

ASF GitHub Bot commented on BEAM-3282:
--------------------------------------

jbonofre closed pull request #4206: [BEAM-3282] MqttIO reader now use receive for timeout in order to ret…
URL: https://github.com/apache/beam/pull/4206
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index f9083bb5e7e..ef9c9d293d9 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -20,6 +20,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -27,7 +28,9 @@
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -291,7 +294,8 @@ public void populateDisplayData(DisplayData.Builder builder) {
    * Checkpoint for an unbounded MQTT source. Consists of the MQTT messages waiting to be
    * acknowledged and oldest pending message timestamp.
    */
-  private static class MqttCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {
+  @VisibleForTesting
+  static class MqttCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {
 
     private String clientId;
     private Instant oldestMessageTimestamp = Instant.now();
@@ -329,8 +333,8 @@ private void readObject(java.io.ObjectInputStream stream)
 
   }
 
-  private static class UnboundedMqttSource
-      extends UnboundedSource<byte[], MqttCheckpointMark> {
+  @VisibleForTesting
+  static class UnboundedMqttSource extends UnboundedSource<byte[], MqttCheckpointMark> {
 
     private final Read spec;
 
@@ -370,7 +374,8 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
-  private static class UnboundedMqttReader extends UnboundedSource.UnboundedReader<byte[]> {
+  @VisibleForTesting
+  static class UnboundedMqttReader extends UnboundedSource.UnboundedReader<byte[]> {
 
     private final UnboundedMqttSource source;
 
@@ -411,8 +416,11 @@ public boolean start() throws IOException {
     @Override
     public boolean advance() throws IOException {
       try {
-        LOG.debug("MQTT reader (client ID {}) waiting message ...", client.getClientId());
-        Message message = connection.receive();
+        LOG.trace("MQTT reader (client ID {}) waiting message ...", client.getClientId());
+        Message message = connection.receive(1, TimeUnit.SECONDS);
+        if (message == null) {
+          return false;
+        }
         current = message.getPayload();
         currentTimestamp = Instant.now();
         checkpointMark.add(message, currentTimestamp);
diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index 1b3d2da3542..d7baf3bf989 100644
--- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -24,6 +24,7 @@
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.Connection;
 import org.apache.beam.sdk.io.mqtt.MqttIO.Read;
@@ -37,6 +38,7 @@
 import org.fusesource.mqtt.client.Message;
 import org.fusesource.mqtt.client.QoS;
 import org.fusesource.mqtt.client.Topic;
+import org.joda.time.Duration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -134,7 +136,7 @@ public void run() {
     publisherThread.join();
   }
 
-  @Test(timeout = 60 * 1000)
+  @Test(timeout = 5 * 1000)
   public void testRead() throws Exception {
     PCollection<byte[]> output = pipeline.apply(
         MqttIO.read()
@@ -143,7 +145,7 @@ public void testRead() throws Exception {
                     "tcp://localhost:" + port,
                     "READ_TOPIC",
                     "READ_PIPELINE"))
-          .withMaxNumRecords(10));
+            .withMaxReadTime(Duration.standardSeconds(3)));
     PAssert.that(output).containsInAnyOrder(
         "This is test 0".getBytes(),
         "This is test 1".getBytes(),
@@ -193,6 +195,22 @@ public void run() {
     publisherThread.join();
   }
 
+  /**
+   * Test for BEAM-3282: this test should not timeout.
+   */
+  @Test(timeout = 5 * 1000)
+  public void testReceiveWithTimeoutAndNoData() throws Exception {
+    pipeline.apply(MqttIO.read()
+        .withConnectionConfiguration(
+            MqttIO.ConnectionConfiguration.create(
+                "tcp://localhost:" + port,
+                "READ_TOPIC",
+                "READ_PIPELINE")).withMaxReadTime(Duration.standardSeconds(2)));
+
+    // should stop before the test timeout
+    pipeline.run();
+  }
+
   @Test
   public void testWrite() throws Exception {
     MQTT client = new MQTT();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> MqttIO reader should use receive with timeout
> ---------------------------------------------
>
>                 Key: BEAM-3282
>                 URL: https://issues.apache.org/jira/browse/BEAM-3282
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Jean-Baptiste Onofré
>             Fix For: 2.3.0
>
>
> Now, {{MqttIO}} reader uses {{connection.receive()}} which is a blocking method, waiting for messages. This prevents the reader to return {{false}} when there's no message and so to advance the processing.
> Instead, the reader should use {{connection.receive(timeout, timeunit)}} and return {{false}} in {{advance()}} when message is null.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)