You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 17:45:59 UTC

[1/2] beam git commit: This closes #2237

Repository: beam
Updated Branches:
  refs/heads/master e362e6b49 -> 30033ccba


This closes #2237


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30033ccb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30033ccb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30033ccb

Branch: refs/heads/master
Commit: 30033ccba4343be1519d3b8afa8ecd77ba9ad887
Parents: e362e6b d56d745
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 14 10:45:48 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 10:45:48 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/UnboundedReadEvaluatorFactory.java  |  3 ---
 .../direct/UnboundedReadEvaluatorFactoryTest.java      | 13 ++++++++-----
 2 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Stop Double-finalizing checkpoints in the DirectRunner

Posted by tg...@apache.org.
Stop Double-finalizing checkpoints in the DirectRunner

Checkpoints don't need to be finalized before we restore from them.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d56d7451
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d56d7451
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d56d7451

Branch: refs/heads/master
Commit: d56d7451a5d7c304d3cb1b5551d918773aec1c65
Parents: e362e6b
Author: Thomas Groh <tg...@google.com>
Authored: Mon Mar 13 15:22:24 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 10:45:48 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/UnboundedReadEvaluatorFactory.java  |  3 ---
 .../direct/UnboundedReadEvaluatorFactoryTest.java      | 13 ++++++++-----
 2 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d56d7451/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index ff59390..69e6920 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -170,9 +170,6 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
       UnboundedReader<OutputT> existing = shard.getExistingReader();
       if (existing == null) {
         CheckpointMarkT checkpoint = shard.getCheckpoint();
-        if (checkpoint != null) {
-          checkpoint.finalizeCheckpoint();
-        }
         return shard
             .getSource()
             .createReader(evaluationContext.getPipelineOptions(), checkpoint);

http://git-wip-us.apache.org/repos/asf/beam/blob/d56d7451/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 987f927..7e2d85d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.runners.direct.DirectGraphs.getProducer;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
@@ -373,6 +374,9 @@ public class UnboundedReadEvaluatorFactoryTest {
     secondEvaluator.finishBundle();
 
     assertThat(TestUnboundedSource.readerClosedCount, equalTo(2));
+    assertThat(
+        Iterables.getOnlyElement(residual.getElements()).getValue().getCheckpoint().isFinalized(),
+        is(true));
   }
 
   /**
@@ -415,9 +419,6 @@ public class UnboundedReadEvaluatorFactoryTest {
     @Override
     public UnboundedSource.UnboundedReader<T> createReader(
         PipelineOptions options, @Nullable TestCheckpointMark checkpointMark) {
-      if (checkpointMark != null) {
-        assertThat(checkpointMark.isFinalized(), is(true));
-      }
       return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index);
     }
 
@@ -517,6 +518,8 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     @Override
     public void finalizeCheckpoint() throws IOException {
+      checkState(
+          !finalized, "%s was finalized more than once", TestCheckpointMark.class.getSimpleName());
       finalized = true;
     }
 
@@ -530,14 +533,14 @@ public class UnboundedReadEvaluatorFactoryTest {
           TestCheckpointMark value,
           OutputStream outStream,
           org.apache.beam.sdk.coders.Coder.Context context)
-          throws CoderException, IOException {
+          throws IOException {
         VarInt.encode(value.index, outStream);
       }
 
       @Override
       public TestCheckpointMark decode(
           InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
-          throws CoderException, IOException {
+          throws IOException {
         return new TestCheckpointMark(VarInt.decodeInt(inStream));
       }
     }