You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/23 21:05:15 UTC

[1/2] beam git commit: Finalize Checkpoints before resuming from them

Repository: beam
Updated Branches:
  refs/heads/master daed01a69 -> a1a022d6b


Finalize Checkpoints before resuming from them

This moves checkpoint finalization in the DirectRunner to occur before
the call to createReader instead of between that call and the call to
reader.start().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ee34d94
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ee34d94
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ee34d94

Branch: refs/heads/master
Commit: 0ee34d9436574adecd276759cdfec62e60ba7b66
Parents: daed01a
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jan 23 10:25:04 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jan 23 13:05:00 2017 -0800

----------------------------------------------------------------------
 .../runners/direct/UnboundedReadEvaluatorFactory.java  |  9 +++++----
 .../direct/UnboundedReadEvaluatorFactoryTest.java      | 13 ++++++++++++-
 2 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0ee34d94/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index a4aebc9..013e929 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -164,9 +164,13 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
         throws IOException {
       UnboundedReader<OutputT> existing = shard.getExistingReader();
       if (existing == null) {
+        CheckpointMarkT checkpoint = shard.getCheckpoint();
+        if (checkpoint != null) {
+          checkpoint.finalizeCheckpoint();
+        }
         return shard
             .getSource()
-            .createReader(evaluationContext.getPipelineOptions(), shard.getCheckpoint());
+            .createReader(evaluationContext.getPipelineOptions(), checkpoint);
       } else {
         return existing;
       }
@@ -176,9 +180,6 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
         UnboundedReader<OutputT> reader, UnboundedSourceShard<OutputT, CheckpointMarkT> shard)
         throws IOException {
       if (shard.getExistingReader() == null) {
-        if (shard.getCheckpoint() != null) {
-          shard.getCheckpoint().finalizeCheckpoint();
-        }
         return reader.start();
       } else {
         return shard.getExistingReader().advance();

http://git-wip-us.apache.org/repos/asf/beam/blob/0ee34d94/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 92d668e..987f927 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.runners.direct.DirectGraphs.getProducer;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
@@ -414,6 +415,9 @@ public class UnboundedReadEvaluatorFactoryTest {
     @Override
     public UnboundedSource.UnboundedReader<T> createReader(
         PipelineOptions options, @Nullable TestCheckpointMark checkpointMark) {
+      if (checkpointMark != null) {
+        assertThat(checkpointMark.isFinalized(), is(true));
+      }
       return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index);
     }
 
@@ -505,13 +509,20 @@ public class UnboundedReadEvaluatorFactoryTest {
 
   private static class TestCheckpointMark implements CheckpointMark {
     final int index;
+    private boolean finalized = false;
 
     private TestCheckpointMark(int index) {
       this.index = index;
     }
 
     @Override
-    public void finalizeCheckpoint() throws IOException {}
+    public void finalizeCheckpoint() throws IOException {
+      finalized = true;
+    }
+
+    boolean isFinalized() {
+      return finalized;
+    }
 
     public static class Coder extends AtomicCoder<TestCheckpointMark> {
       @Override


[2/2] beam git commit: This closes #1819

Posted by dh...@apache.org.
This closes #1819


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a1a022d6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a1a022d6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a1a022d6

Branch: refs/heads/master
Commit: a1a022d6bc08f50c1815b1c129858840d0deae5d
Parents: daed01a 0ee34d9
Author: Dan Halperin <dh...@google.com>
Authored: Mon Jan 23 13:05:03 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Jan 23 13:05:03 2017 -0800

----------------------------------------------------------------------
 .../runners/direct/UnboundedReadEvaluatorFactory.java  |  9 +++++----
 .../direct/UnboundedReadEvaluatorFactoryTest.java      | 13 ++++++++++++-
 2 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------