You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/10/19 18:43:42 UTC

[1/2] incubator-beam git commit: [BEAM-744] UnboundedKafkaReader should return as soon as it can.

Repository: incubator-beam
Updated Branches:
  refs/heads/master ea04e618e -> b0cb2e87b


[BEAM-744] UnboundedKafkaReader should return as soon as it can.

Use timeout directly in nextBatch()


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84c6649c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84c6649c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84c6649c

Branch: refs/heads/master
Commit: 84c6649cd63c33ca79ad43e8973dbf765e27a5d0
Parents: ea04e61
Author: Sela <an...@paypal.com>
Authored: Tue Oct 18 22:03:25 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Oct 19 21:34:07 2016 +0300

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java    | 14 +++++---------
 1 file changed, 5 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84c6649c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 2030789..834104e 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -756,9 +756,6 @@ public class KafkaIO {
     private Iterator<PartitionState> curBatch = Collections.emptyIterator();
 
     private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
-    // how long to wait for new records from kafka consumer inside start()
-    private static final Duration START_NEW_RECORDS_POLL_TIMEOUT = Duration.standardSeconds(5);
-    // how long to wait for new records from kafka consumer inside advance()
     private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10);
 
     // Use a separate thread to read Kafka messages. Kafka Consumer does all its work including
@@ -888,12 +885,13 @@ public class KafkaIO {
       LOG.info("{}: Returning from consumer pool loop", this);
     }
 
-    private void nextBatch(Duration timeout) {
+    private void nextBatch() {
       curBatch = Collections.emptyIterator();
 
       ConsumerRecords<byte[], byte[]> records;
       try {
-        records = availableRecordsQueue.poll(timeout.getMillis(),
+        // poll available records, wait (if necessary) up to the specified timeout.
+        records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(),
                                              TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
@@ -966,9 +964,7 @@ public class KafkaIO {
             }
           }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
 
-      // Wait for longer than normal when fetching a batch to improve chances a record is available
-      // when start() returns.
-      nextBatch(START_NEW_RECORDS_POLL_TIMEOUT);
+      nextBatch();
       return advance();
     }
 
@@ -1032,7 +1028,7 @@ public class KafkaIO {
           return true;
 
         } else { // -- (b)
-          nextBatch(NEW_RECORDS_POLL_TIMEOUT);
+          nextBatch();
 
           if (!curBatch.hasNext()) {
             return false;


[2/2] incubator-beam git commit: This closes #1125

Posted by am...@apache.org.
This closes #1125


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b0cb2e87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b0cb2e87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b0cb2e87

Branch: refs/heads/master
Commit: b0cb2e87b14182c9950974204a345a17181ff55c
Parents: ea04e61 84c6649
Author: Sela <an...@paypal.com>
Authored: Wed Oct 19 21:35:20 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Oct 19 21:35:20 2016 +0300

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java    | 14 +++++---------
 1 file changed, 5 insertions(+), 9 deletions(-)
----------------------------------------------------------------------