You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/10/19 07:20:04 UTC
[1/2] incubator-beam git commit: [BEAM-777] KafkaIO Test should
handle reader.start() better.
Repository: incubator-beam
Updated Branches:
refs/heads/master dde8e35ca -> ea04e618e
[BEAM-777] KafkaIO Test should handle reader.start() better.
KafkaIOTest : start() can return false
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c8ade83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c8ade83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c8ade83
Branch: refs/heads/master
Commit: 2c8ade83b2104ecd7f8098b18dd45a0fd8b6cc9f
Parents: dde8e35
Author: Raghu Angadi <ra...@google.com>
Authored: Tue Oct 18 21:46:18 2016 -0700
Committer: Sela <an...@paypal.com>
Committed: Wed Oct 19 10:06:45 2016 +0300
----------------------------------------------------------------------
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 26 ++++++++------------
1 file changed, 10 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8ade83/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 67aa675..2f3c524 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -389,7 +389,10 @@ public class KafkaIOTest {
// Kafka records are read in a separate thread inside the reader. As a result advance() might not
// read any records even from the mock consumer, especially for the first record.
// This is a helper method to loop until we read a record.
- private static void advanceOnce(UnboundedReader<?> reader) throws IOException {
+ private static void advanceOnce(UnboundedReader<?> reader, boolean isStarted) throws IOException {
+ if (!isStarted && reader.start()) {
+ return;
+ }
while (!reader.advance()) {
// very rarely will there be more than one attempts.
// In case of a bug we might end up looping forever, and test will fail with a timeout.
@@ -418,9 +421,8 @@ public class KafkaIOTest {
final int numToSkip = 20; // one from each partition.
// advance numToSkip elements
- reader.start();
- for (int l = 1; l < numToSkip; ++l) {
- advanceOnce(reader);
+ for (int i = 0; i < numToSkip; ++i) {
+ advanceOnce(reader, i > 0);
}
// Confirm that we get the expected element in sequence before checkpointing.
@@ -435,13 +437,10 @@ public class KafkaIOTest {
// Confirm that we get the next elements in sequence.
// This also confirms that Reader interleaves records from each partitions by the reader.
- reader.start();
for (int i = numToSkip; i < numElements; i++) {
+ advanceOnce(reader, i > numToSkip);
assertEquals(i, (long) reader.getCurrent().getKV().getValue());
assertEquals(i, reader.getCurrentTimestamp().getMillis());
- if ((i + 1) < numElements) {
- advanceOnce(reader);
- }
}
}
@@ -460,9 +459,8 @@ public class KafkaIOTest {
UnboundedReader<KafkaRecord<Integer, Long>> reader = source.createReader(null, null);
- reader.start();
- for (int l = 1; l < initialNumElements; ++l) {
- advanceOnce(reader);
+ for (int l = 0; l < initialNumElements; ++l) {
+ advanceOnce(reader, l > 0);
}
// Checkpoint and restart, and confirm that the source continues correctly.
@@ -490,19 +488,15 @@ public class KafkaIOTest {
reader = source.createReader(null, mark);
- reader.start();
-
// Verify in any order. As the partitions are unevenly read, the returned records are not in a
// simple order. Note that testUnboundedSourceCheckpointMark() verifies round-robin oder.
List<Long> expected = new ArrayList<>();
List<Long> actual = new ArrayList<>();
for (long i = initialNumElements; i < numElements; i++) {
+ advanceOnce(reader, i > initialNumElements);
expected.add(i);
actual.add(reader.getCurrent().getKV().getValue());
- if ((i + 1) < numElements) {
- advanceOnce(reader);
- }
}
assertThat(actual, IsIterableContainingInAnyOrder.containsInAnyOrder(expected.toArray()));
}
[2/2] incubator-beam git commit: This closes #1133
Posted by am...@apache.org.
This closes #1133
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ea04e618
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ea04e618
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ea04e618
Branch: refs/heads/master
Commit: ea04e618eae8e20e21a1db5b8367cf972446d9b6
Parents: dde8e35 2c8ade8
Author: Sela <an...@paypal.com>
Authored: Wed Oct 19 10:08:05 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Oct 19 10:08:05 2016 +0300
----------------------------------------------------------------------
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 26 ++++++++------------
1 file changed, 10 insertions(+), 16 deletions(-)
----------------------------------------------------------------------