You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/15 20:58:52 UTC
[1/2] incubator-beam git commit: Reuse UnboundedReaders in the
InProcessRunner
Repository: incubator-beam
Updated Branches:
refs/heads/master 462453d23 -> 2f46bc004
Reuse UnboundedReaders in the InProcessRunner
Reuse up to a point, and then discard the reader to exercise resume from
checkpoint.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0edf462a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0edf462a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0edf462a
Branch: refs/heads/master
Commit: 0edf462a9d85810331621b0ae67fdd9902fc3f95
Parents: 462453d
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jun 13 18:34:49 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 15 13:58:45 2016 -0700
----------------------------------------------------------------------
.../direct/UnboundedReadEvaluatorFactory.java | 145 +++++++++++++++----
.../UnboundedReadEvaluatorFactoryTest.java | 54 +++++--
2 files changed, 158 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0edf462a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index bdb8f37..fceb20c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -23,12 +23,17 @@ import org.apache.beam.sdk.io.Read.Unbounded;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
+import com.google.common.annotations.VisibleForTesting;
+
+import org.joda.time.Instant;
+
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
@@ -42,6 +47,10 @@ import javax.annotation.Nullable;
* for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}.
*/
class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
+ // Resume from a checkpoint every nth invocation, to ensure close-and-resume is exercised
+ @VisibleForTesting
+ static final int MAX_READER_REUSE_COUNT = 20;
+
/*
* An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted.
* Evaluators are cached here to ensure that the checkpoint mark is appropriately reused
@@ -53,7 +62,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
* an arbitrary Queue implementation does not, so the concrete type is used explicitly.
*/
private final ConcurrentMap<
- EvaluatorKey, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?>>>
+ EvaluatorKey, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?, ?>>>
sourceEvaluators = new ConcurrentHashMap<>();
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -78,29 +87,33 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
* already done so.
*/
@SuppressWarnings("unchecked")
- private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
+ private <OutputT, CheckpointMarkT extends CheckpointMark>
+ Queue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> getTransformEvaluatorQueue(
final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
final InProcessEvaluationContext evaluationContext) {
// Key by the application and the context the evaluation is occurring in (which call to
// Pipeline#run).
EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
@SuppressWarnings("unchecked")
- ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue =
- (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+ ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue =
+ (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>)
+ sourceEvaluators.get(key);
if (evaluatorQueue == null) {
evaluatorQueue = new ConcurrentLinkedQueue<>();
if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
// If no queue existed in the evaluators, add an evaluator to initialize the evaluator
// factory for this transform
- UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
- UnboundedReadEvaluator<OutputT> evaluator =
- new UnboundedReadEvaluator<OutputT>(
+ UnboundedSource<OutputT, CheckpointMarkT> source =
+ (UnboundedSource<OutputT, CheckpointMarkT>) transform.getTransform().getSource();
+ UnboundedReadEvaluator<OutputT, CheckpointMarkT> evaluator =
+ new UnboundedReadEvaluator<>(
transform, evaluationContext, source, evaluatorQueue);
evaluatorQueue.offer(evaluator);
} else {
// otherwise return the existing Queue that arrived before us
evaluatorQueue =
- (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+ (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>)
+ sourceEvaluators.get(key);
}
}
return evaluatorQueue;
@@ -116,27 +129,38 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
* checkpoint, and constructs its reader from the current checkpoint in each call to
* {@link #finishBundle()}.
*/
- private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
+ private static class UnboundedReadEvaluator<OutputT, CheckpointMarkT extends CheckpointMark>
+ implements TransformEvaluator<Object> {
private static final int ARBITRARY_MAX_ELEMENTS = 10;
+
private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
private final InProcessEvaluationContext evaluationContext;
- private final ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
+ private final ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>
+ evaluatorQueue;
/**
* The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same
* source as derived from {@link #transform} due to splitting.
*/
- private final UnboundedSource<OutputT, ?> source;
- private CheckpointMark checkpointMark;
+ private final UnboundedSource<OutputT, CheckpointMarkT> source;
+ private UnboundedReader<OutputT> currentReader;
+ private CheckpointMarkT checkpointMark;
+
+ /**
+ * The count of bundles output from this {@link UnboundedReadEvaluator}. Used to exercise
+ * {@link UnboundedReader#close()}.
+ */
+ private int outputBundles = 0;
public UnboundedReadEvaluator(
AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
InProcessEvaluationContext evaluationContext,
- UnboundedSource<OutputT, ?> source,
- ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
+ UnboundedSource<OutputT, CheckpointMarkT> source,
+ ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue) {
this.transform = transform;
this.evaluationContext = evaluationContext;
this.evaluatorQueue = evaluatorQueue;
this.source = source;
+ this.currentReader = null;
this.checkpointMark = null;
}
@@ -146,35 +170,92 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public InProcessTransformResult finishBundle() throws IOException {
UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
- try (UnboundedReader<OutputT> reader =
- createReader(source, evaluationContext.getPipelineOptions());) {
- int numElements = 0;
- if (reader.start()) {
+ try {
+ boolean elementAvailable = startReader();
+
+ Instant watermark = currentReader.getWatermark();
+ if (elementAvailable) {
+ int numElements = 0;
do {
- output.add(
- WindowedValue.timestampedValueInGlobalWindow(
- reader.getCurrent(), reader.getCurrentTimestamp()));
+ output.add(WindowedValue.timestampedValueInGlobalWindow(currentReader.getCurrent(),
+ currentReader.getCurrentTimestamp()));
numElements++;
- } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
+ } while (numElements < ARBITRARY_MAX_ELEMENTS && currentReader.advance());
+ watermark = currentReader.getWatermark();
+ // Only take a checkpoint if we did any work
+ finishRead();
}
- checkpointMark = reader.getCheckpointMark();
- checkpointMark.finalizeCheckpoint();
// TODO: When exercising create initial splits, make this the minimum watermark across all
// existing readers
StepTransformResult result =
- StepTransformResult.withHold(transform, reader.getWatermark())
- .addOutput(output)
- .build();
+ StepTransformResult.withHold(transform, watermark).addOutput(output).build();
evaluatorQueue.offer(this);
return result;
+ } catch (IOException e) {
+ closeReader();
+ throw e;
+ }
+ }
+
+ private boolean startReader() throws IOException {
+ if (currentReader == null) {
+ if (checkpointMark != null) {
+ checkpointMark.finalizeCheckpoint();
+ }
+ currentReader = source.createReader(evaluationContext.getPipelineOptions(), checkpointMark);
+ checkpointMark = null;
+ return currentReader.start();
+ } else {
+ return currentReader.advance();
}
}
- private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT> createReader(
- UnboundedSource<OutputT, CheckpointMarkT> source, PipelineOptions options) {
+ /**
+ * Checkpoint the current reader, finalize the previous checkpoint, and update the state of this
+ * evaluator.
+ */
+ private void finishRead() throws IOException {
+ final CheckpointMark oldMark = checkpointMark;
@SuppressWarnings("unchecked")
- CheckpointMarkT mark = (CheckpointMarkT) checkpointMark;
- return source.createReader(options, mark);
+ final CheckpointMarkT mark = (CheckpointMarkT) currentReader.getCheckpointMark();
+ checkpointMark = mark;
+ if (oldMark != null) {
+ oldMark.finalizeCheckpoint();
+ }
+
+ // If the watermark is the max value, this source may not be invoked again. Finalize after
+ // committing the output.
+ if (!currentReader.getWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+ evaluationContext.scheduleAfterOutputWouldBeProduced(transform.getOutput(),
+ GlobalWindow.INSTANCE,
+ transform.getOutput().getWindowingStrategy(),
+ new Runnable() {
+ public void run() {
+ try {
+ mark.finalizeCheckpoint();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Couldn't finalize checkpoint after the end of the Global Window",
+ e);
+ }
+ }
+ });
+ }
+ // Sometimes resume from a checkpoint even if it's not required
+
+ if (outputBundles >= MAX_READER_REUSE_COUNT) {
+ closeReader();
+ outputBundles = 0;
+ } else {
+ outputBundles++;
+ }
+ }
+
+ private void closeReader() throws IOException {
+ if (currentReader != null) {
+ currentReader.close();
+ currentReader = null;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0edf462a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 768f5a9..05656eb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.direct;
import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.nullValue;
@@ -44,7 +43,10 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Range;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
@@ -136,7 +138,29 @@ public class UnboundedReadEvaluatorFactoryTest {
}
@Test
- public void evaluatorClosesReader() throws Exception {
+ public void evaluatorClosesReaderAfterOutputCount() throws Exception {
+ ContiguousSet<Long> elems = ContiguousSet.create(
+ Range.closed(0L, 20L * UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT),
+ DiscreteDomain.longs());
+ TestUnboundedSource<Long> source =
+ new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0]));
+
+ TestPipeline p = TestPipeline.create();
+ PCollection<Long> pcollection = p.apply(Read.from(source));
+ AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+
+ UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
+ when(context.createRootBundle(pcollection)).thenReturn(output);
+
+ for (int i = 0; i < UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT + 1; i++) {
+ TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+ evaluator.finishBundle();
+ }
+ assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+ }
+
+ @Test
+ public void evaluatorReusesReaderBeforeCount() throws Exception {
TestUnboundedSource<Long> source =
new TestUnboundedSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
@@ -151,11 +175,18 @@ public class UnboundedReadEvaluatorFactoryTest {
evaluator.finishBundle();
CommittedBundle<Long> committed = output.commit(Instant.now());
assertThat(ImmutableList.copyOf(committed.getElements()), hasSize(3));
- assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+ assertThat(TestUnboundedSource.readerClosedCount, equalTo(0));
+ assertThat(TestUnboundedSource.readerAdvancedCount, equalTo(4));
+
+ evaluator = factory.forApplication(sourceTransform, null, context);
+ evaluator.finishBundle();
+ assertThat(TestUnboundedSource.readerClosedCount, equalTo(0));
+ // Tried to advance again, even with no elements
+ assertThat(TestUnboundedSource.readerAdvancedCount, equalTo(5));
}
@Test
- public void evaluatorNoElementsClosesReader() throws Exception {
+ public void evaluatorNoElementsReusesReaderAlways() throws Exception {
TestUnboundedSource<Long> source = new TestUnboundedSource<>(BigEndianLongCoder.of());
TestPipeline p = TestPipeline.create();
@@ -165,11 +196,13 @@ public class UnboundedReadEvaluatorFactoryTest {
UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
when(context.createRootBundle(pcollection)).thenReturn(output);
- TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
- evaluator.finishBundle();
- CommittedBundle<Long> committed = output.commit(Instant.now());
- assertThat(committed.getElements(), emptyIterable());
- assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+ for (int i = 0; i < 2 * UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT; i++) {
+ TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+ evaluator.finishBundle();
+ }
+ assertThat(TestUnboundedSource.readerClosedCount, equalTo(0));
+ assertThat(TestUnboundedSource.readerAdvancedCount,
+ equalTo(2 * UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT));
}
// TODO: Once the source is split into multiple sources before evaluating, this test will have to
@@ -215,10 +248,12 @@ public class UnboundedReadEvaluatorFactoryTest {
private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
static int readerClosedCount;
+ static int readerAdvancedCount;
private final Coder<T> coder;
private final List<T> elems;
public TestUnboundedSource(Coder<T> coder, T... elems) {
+ readerAdvancedCount = 0;
readerClosedCount = 0;
this.coder = coder;
this.elems = Arrays.asList(elems);
@@ -266,6 +301,7 @@ public class UnboundedReadEvaluatorFactoryTest {
@Override
public boolean advance() throws IOException {
+ readerAdvancedCount++;
if (index + 1 < elems.size()) {
index++;
return true;
[2/2] incubator-beam git commit: Closes #460
Posted by dh...@apache.org.
Closes #460
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2f46bc00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2f46bc00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2f46bc00
Branch: refs/heads/master
Commit: 2f46bc00430dee96b92087f17819003620039b92
Parents: 462453d 0edf462
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jun 15 13:58:46 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 15 13:58:46 2016 -0700
----------------------------------------------------------------------
.../direct/UnboundedReadEvaluatorFactory.java | 145 +++++++++++++++----
.../UnboundedReadEvaluatorFactoryTest.java | 54 +++++--
2 files changed, 158 insertions(+), 41 deletions(-)
----------------------------------------------------------------------