You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 17:46:26 UTC
[1/2] beam git commit: This closes #2242
Repository: beam
Updated Branches:
refs/heads/master 30033ccba -> 843b663cf
This closes #2242
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/843b663c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/843b663c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/843b663c
Branch: refs/heads/master
Commit: 843b663cfde305bc622f8fe5a587da855417d253
Parents: 30033cc 869002c
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 14 10:46:16 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 10:46:16 2017 -0700
----------------------------------------------------------------------
.../direct/UnboundedReadEvaluatorFactory.java | 30 ++++++----
.../UnboundedReadEvaluatorFactoryTest.java | 61 ++++++++++++++++++--
2 files changed, 77 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Prevent Double-Close in
UnboundedReadEvaluatorFactory
Posted by tg...@apache.org.
Prevent Double-Close in UnboundedReadEvaluatorFactory
Move the actual "close-and-resume" to the overall try block, to ensure
that the reader cannot be double-closed if the first call to close()
throws an IOException.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/869002c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/869002c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/869002c3
Branch: refs/heads/master
Commit: 869002c397b3a360ab9a9afe0a342a6ac2fe7f9e
Parents: 30033cc
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 14 09:18:57 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 10:46:16 2017 -0700
----------------------------------------------------------------------
.../direct/UnboundedReadEvaluatorFactory.java | 30 ++++++----
.../UnboundedReadEvaluatorFactoryTest.java | 61 ++++++++++++++++++--
2 files changed, 77 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/869002c3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 69e6920..7c3d50a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -139,7 +139,24 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
numElements++;
} while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
Instant watermark = reader.getWatermark();
- UnboundedSourceShard<OutputT, CheckpointMarkT> residual = finishRead(reader, shard);
+
+ CheckpointMarkT finishedCheckpoint = finishRead(reader, shard);
+ UnboundedSourceShard<OutputT, CheckpointMarkT> residual;
+ // Sometimes resume from a checkpoint even if it's not required
+ if (ThreadLocalRandom.current().nextDouble(1.0) >= readerReuseChance) {
+ UnboundedReader<OutputT> toClose = reader;
+ // Prevent double-close. UnboundedReader is AutoCloseable, which does not require
+ // idempotency of close. Nulling out the reader here prevents trying to re-close it
+ // if the call to close throws an IOException.
+ reader = null;
+ toClose.close();
+ residual =
+ UnboundedSourceShard.of(
+ shard.getSource(), shard.getDeduplicator(), null, finishedCheckpoint);
+ } else {
+ residual = shard.withCheckpoint(finishedCheckpoint);
+ }
+
resultBuilder
.addOutput(output)
.addUnprocessedElements(
@@ -192,7 +209,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
* Checkpoint the current reader, finalize the previous checkpoint, and return the residual
* {@link UnboundedSourceShard}.
*/
- private UnboundedSourceShard<OutputT, CheckpointMarkT> finishRead(
+ private CheckpointMarkT finishRead(
UnboundedReader<OutputT> reader, UnboundedSourceShard<OutputT, CheckpointMarkT> shard)
throws IOException {
final CheckpointMark oldMark = shard.getCheckpoint();
@@ -223,14 +240,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
}
});
}
-
- // Sometimes resume from a checkpoint even if it's not required
- if (ThreadLocalRandom.current().nextDouble(1.0) >= readerReuseChance) {
- reader.close();
- return UnboundedSourceShard.of(shard.getSource(), shard.getDeduplicator(), null, mark);
- } else {
- return shard.withCheckpoint(mark);
- }
+ return mark;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/869002c3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 7e2d85d..cdb362f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -76,6 +76,7 @@ import org.joda.time.ReadableInstant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.invocation.InvocationOnMock;
@@ -96,8 +97,8 @@ public class UnboundedReadEvaluatorFactoryTest {
private UnboundedSource<Long, ?> source;
private DirectGraph graph;
- @Rule
- public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+ @Rule public ExpectedException thrown = ExpectedException.none();
+ @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
@Before
public void setup() {
@@ -379,6 +380,38 @@ public class UnboundedReadEvaluatorFactoryTest {
is(true));
}
+ @Test
+ public void evaluatorThrowsInCloseRethrows() throws Exception {
+ ContiguousSet<Long> elems = ContiguousSet.create(Range.closed(0L, 20L), DiscreteDomain.longs());
+ TestUnboundedSource<Long> source =
+ new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0]))
+ .throwsOnClose();
+
+ PCollection<Long> pcollection = p.apply(Read.from(source));
+ AppliedPTransform<?, ?, ?> sourceTransform =
+ DirectGraphs.getGraph(p).getProducer(pcollection);
+
+ when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
+ UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
+ when(context.createBundle(pcollection)).thenReturn(output);
+
+ WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> shard =
+ WindowedValue.valueInGlobalWindow(
+ UnboundedSourceShard.unstarted(source, NeverDeduplicator.create()));
+ CommittedBundle<UnboundedSourceShard<Long, TestCheckpointMark>> inputBundle =
+ bundleFactory
+ .<UnboundedSourceShard<Long, TestCheckpointMark>>createRootBundle()
+ .add(shard)
+ .commit(Instant.now());
+ UnboundedReadEvaluatorFactory factory =
+ new UnboundedReadEvaluatorFactory(context, 0.0 /* never reuse */);
+ TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
+ factory.forApplication(sourceTransform, inputBundle);
+ thrown.expect(IOException.class);
+ thrown.expectMessage("throws on close");
+ evaluator.processElement(shard);
+ }
+
/**
* A terse alias for producing timestamped longs in the {@link GlobalWindow}, where
* the timestamp is the epoch offset by the value of the element.
@@ -402,12 +435,18 @@ public class UnboundedReadEvaluatorFactoryTest {
private final Coder<T> coder;
private final List<T> elems;
private boolean dedupes = false;
+ private boolean throwOnClose;
public TestUnboundedSource(Coder<T> coder, T... elems) {
+ this(coder, false, Arrays.asList(elems));
+ }
+
+ private TestUnboundedSource(Coder<T> coder, boolean throwOnClose, List<T> elems) {
readerAdvancedCount = 0;
readerClosedCount = 0;
this.coder = coder;
- this.elems = Arrays.asList(elems);
+ this.elems = elems;
+ this.throwOnClose = throwOnClose;
}
@Override
@@ -441,9 +480,14 @@ public class UnboundedReadEvaluatorFactoryTest {
return coder;
}
+ public TestUnboundedSource<T> throwsOnClose() {
+ return new TestUnboundedSource<>(coder, true, elems);
+ }
+
private class TestUnboundedReader extends UnboundedReader<T> {
private final List<T> elems;
private int index;
+ private boolean closed = false;
public TestUnboundedReader(List<T> elems, int startIndex) {
this.elems = elems;
@@ -503,7 +547,16 @@ public class UnboundedReadEvaluatorFactoryTest {
@Override
public void close() throws IOException {
- readerClosedCount++;
+ try {
+ readerClosedCount++;
+ // Enforce the AutoCloseable contract. Close is not idempotent.
+ assertThat(closed, is(false));
+ if (throwOnClose) {
+ throw new IOException(String.format("%s throws on close", TestUnboundedSource.this));
+ }
+ } finally {
+ closed = true;
+ }
}
}
}