You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/10/19 18:43:42 UTC
[1/2] incubator-beam git commit: [BEAM-744] UnboundedKafkaReader
should return as soon as it can.
Repository: incubator-beam
Updated Branches:
refs/heads/master ea04e618e -> b0cb2e87b
[BEAM-744] UnboundedKafkaReader should return as soon as it can.
Use timeout directly in nextBatch()
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84c6649c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84c6649c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84c6649c
Branch: refs/heads/master
Commit: 84c6649cd63c33ca79ad43e8973dbf765e27a5d0
Parents: ea04e61
Author: Sela <an...@paypal.com>
Authored: Tue Oct 18 22:03:25 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Oct 19 21:34:07 2016 +0300
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 14 +++++---------
1 file changed, 5 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84c6649c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 2030789..834104e 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -756,9 +756,6 @@ public class KafkaIO {
private Iterator<PartitionState> curBatch = Collections.emptyIterator();
private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
- // how long to wait for new records from kafka consumer inside start()
- private static final Duration START_NEW_RECORDS_POLL_TIMEOUT = Duration.standardSeconds(5);
- // how long to wait for new records from kafka consumer inside advance()
private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10);
// Use a separate thread to read Kafka messages. Kafka Consumer does all its work including
@@ -888,12 +885,13 @@ public class KafkaIO {
LOG.info("{}: Returning from consumer pool loop", this);
}
- private void nextBatch(Duration timeout) {
+ private void nextBatch() {
curBatch = Collections.emptyIterator();
ConsumerRecords<byte[], byte[]> records;
try {
- records = availableRecordsQueue.poll(timeout.getMillis(),
+ // poll available records, wait (if necessary) up to the specified timeout.
+ records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -966,9 +964,7 @@ public class KafkaIO {
}
}, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
- // Wait for longer than normal when fetching a batch to improve chances a record is available
- // when start() returns.
- nextBatch(START_NEW_RECORDS_POLL_TIMEOUT);
+ nextBatch();
return advance();
}
@@ -1032,7 +1028,7 @@ public class KafkaIO {
return true;
} else { // -- (b)
- nextBatch(NEW_RECORDS_POLL_TIMEOUT);
+ nextBatch();
if (!curBatch.hasNext()) {
return false;
[2/2] incubator-beam git commit: This closes #1125
Posted by am...@apache.org.
This closes #1125
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b0cb2e87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b0cb2e87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b0cb2e87
Branch: refs/heads/master
Commit: b0cb2e87b14182c9950974204a345a17181ff55c
Parents: ea04e61 84c6649
Author: Sela <an...@paypal.com>
Authored: Wed Oct 19 21:35:20 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Oct 19 21:35:20 2016 +0300
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 14 +++++---------
1 file changed, 5 insertions(+), 9 deletions(-)
----------------------------------------------------------------------