You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/26 01:16:28 UTC

[2/6] incubator-beam git commit: Respect checkpointing contract

Respect checkpointing contract


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0236bc18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0236bc18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0236bc18

Branch: refs/heads/master
Commit: 0236bc187fb5b633eee14728fc41b6a0b81a3b65
Parents: e953cb0
Author: Mark Shields <ma...@google.com>
Authored: Fri Apr 22 15:53:01 2016 -0700
Committer: Mark Shields <ma...@google.com>
Committed: Mon Apr 25 14:10:50 2016 -0700

----------------------------------------------------------------------
 .../runners/dataflow/TestCountingSource.java    | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0236bc18/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
index a985a31..1ea3521 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
@@ -164,21 +164,20 @@ public class TestCountingSource
 
     @Override
     public boolean start() {
-      return true;
+      return advance();
     }
 
     @Override
     public boolean advance() {
-      if (current < numMessagesPerShard - 1) {
+      if (current >= numMessagesPerShard) {
+        return false;
+      }
         // If testing dedup, occasionally insert a duplicate value;
         if (dedup && ThreadLocalRandom.current().nextInt(5) == 0) {
           return true;
         }
         current++;
-        return true;
-      } else {
-        return false;
-      }
+      return current < numMessagesPerShard;
     }
 
     @Override
@@ -222,6 +221,8 @@ public class TestCountingSource
         LOG.error("Throwing exception while checkpointing counter");
         throw new RuntimeException("failed during checkpoint");
       }
+      // The checkpoint can assume all records read, including the current, have
+      // been commited.
       return new CounterMark(current);
     }
 
@@ -234,7 +235,12 @@ public class TestCountingSource
   @Override
   public CountingSourceReader createReader(
       PipelineOptions options, @Nullable CounterMark checkpointMark) {
-    return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : 0);
+    if (checkpointMark == null) {
+      LOG.debug("creating reader");
+    } else {
+      LOG.debug("restoring reader from checkpoint with current = {}", checkpointMark.current);
+    }
+    return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : -1);
   }
 
   @Override