You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 17:45:59 UTC
[1/2] beam git commit: This closes #2237
Repository: beam
Updated Branches:
refs/heads/master e362e6b49 -> 30033ccba
This closes #2237
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30033ccb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30033ccb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30033ccb
Branch: refs/heads/master
Commit: 30033ccba4343be1519d3b8afa8ecd77ba9ad887
Parents: e362e6b d56d745
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 14 10:45:48 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 10:45:48 2017 -0700
----------------------------------------------------------------------
.../runners/direct/UnboundedReadEvaluatorFactory.java | 3 ---
.../direct/UnboundedReadEvaluatorFactoryTest.java | 13 ++++++++-----
2 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Stop Double-finalizing checkpoints in the
DirectRunner
Posted by tg...@apache.org.
Stop Double-finalizing checkpoints in the DirectRunner
Checkpoints don't need to be finalized before we restore from them.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d56d7451
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d56d7451
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d56d7451
Branch: refs/heads/master
Commit: d56d7451a5d7c304d3cb1b5551d918773aec1c65
Parents: e362e6b
Author: Thomas Groh <tg...@google.com>
Authored: Mon Mar 13 15:22:24 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 10:45:48 2017 -0700
----------------------------------------------------------------------
.../runners/direct/UnboundedReadEvaluatorFactory.java | 3 ---
.../direct/UnboundedReadEvaluatorFactoryTest.java | 13 ++++++++-----
2 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d56d7451/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index ff59390..69e6920 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -170,9 +170,6 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
UnboundedReader<OutputT> existing = shard.getExistingReader();
if (existing == null) {
CheckpointMarkT checkpoint = shard.getCheckpoint();
- if (checkpoint != null) {
- checkpoint.finalizeCheckpoint();
- }
return shard
.getSource()
.createReader(evaluationContext.getPipelineOptions(), checkpoint);
http://git-wip-us.apache.org/repos/asf/beam/blob/d56d7451/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 987f927..7e2d85d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;
+import static com.google.common.base.Preconditions.checkState;
import static org.apache.beam.runners.direct.DirectGraphs.getProducer;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
@@ -373,6 +374,9 @@ public class UnboundedReadEvaluatorFactoryTest {
secondEvaluator.finishBundle();
assertThat(TestUnboundedSource.readerClosedCount, equalTo(2));
+ assertThat(
+ Iterables.getOnlyElement(residual.getElements()).getValue().getCheckpoint().isFinalized(),
+ is(true));
}
/**
@@ -415,9 +419,6 @@ public class UnboundedReadEvaluatorFactoryTest {
@Override
public UnboundedSource.UnboundedReader<T> createReader(
PipelineOptions options, @Nullable TestCheckpointMark checkpointMark) {
- if (checkpointMark != null) {
- assertThat(checkpointMark.isFinalized(), is(true));
- }
return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index);
}
@@ -517,6 +518,8 @@ public class UnboundedReadEvaluatorFactoryTest {
@Override
public void finalizeCheckpoint() throws IOException {
+ checkState(
+ !finalized, "%s was finalized more than once", TestCheckpointMark.class.getSimpleName());
finalized = true;
}
@@ -530,14 +533,14 @@ public class UnboundedReadEvaluatorFactoryTest {
TestCheckpointMark value,
OutputStream outStream,
org.apache.beam.sdk.coders.Coder.Context context)
- throws CoderException, IOException {
+ throws IOException {
VarInt.encode(value.index, outStream);
}
@Override
public TestCheckpointMark decode(
InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
- throws CoderException, IOException {
+ throws IOException {
return new TestCheckpointMark(VarInt.decodeInt(inStream));
}
}