You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/26 01:16:28 UTC
[2/6] incubator-beam git commit: Respect checkpointing contract
Respect checkpointing contract
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0236bc18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0236bc18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0236bc18
Branch: refs/heads/master
Commit: 0236bc187fb5b633eee14728fc41b6a0b81a3b65
Parents: e953cb0
Author: Mark Shields <ma...@google.com>
Authored: Fri Apr 22 15:53:01 2016 -0700
Committer: Mark Shields <ma...@google.com>
Committed: Mon Apr 25 14:10:50 2016 -0700
----------------------------------------------------------------------
.../runners/dataflow/TestCountingSource.java | 20 +++++++++++++-------
1 file changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0236bc18/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
index a985a31..1ea3521 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
@@ -164,21 +164,20 @@ public class TestCountingSource
@Override
public boolean start() {
- return true;
+ return advance();
}
@Override
public boolean advance() {
- if (current < numMessagesPerShard - 1) {
+ if (current >= numMessagesPerShard) {
+ return false;
+ }
// If testing dedup, occasionally insert a duplicate value;
if (dedup && ThreadLocalRandom.current().nextInt(5) == 0) {
return true;
}
current++;
- return true;
- } else {
- return false;
- }
+ return current < numMessagesPerShard;
}
@Override
@@ -222,6 +221,8 @@ public class TestCountingSource
LOG.error("Throwing exception while checkpointing counter");
throw new RuntimeException("failed during checkpoint");
}
+ // The checkpoint can assume all records read, including the current, have
+ // been commited.
return new CounterMark(current);
}
@@ -234,7 +235,12 @@ public class TestCountingSource
@Override
public CountingSourceReader createReader(
PipelineOptions options, @Nullable CounterMark checkpointMark) {
- return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : 0);
+ if (checkpointMark == null) {
+ LOG.debug("creating reader");
+ } else {
+ LOG.debug("restoring reader from checkpoint with current = {}", checkpointMark.current);
+ }
+ return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : -1);
}
@Override