You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/18 05:02:10 UTC
[1/2] incubator-beam git commit: Close Readers in InProcess Read
Evaluators
Repository: incubator-beam
Updated Branches:
refs/heads/master d39346823 -> 5b5c0e28f
Close Readers in InProcess Read Evaluators
The readers were formerly left open, which prevents release of any
resources that should be released.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fad6da89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fad6da89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fad6da89
Branch: refs/heads/master
Commit: fad6da89079791952a937aed257f0d2db1467053
Parents: d393468
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 15 11:50:38 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Mar 17 21:01:18 2016 -0700
----------------------------------------------------------------------
.../inprocess/BoundedReadEvaluatorFactory.java | 49 ++++--
.../UnboundedReadEvaluatorFactory.java | 53 +++---
.../BoundedReadEvaluatorFactoryTest.java | 136 ++++++++++++++-
.../UnboundedReadEvaluatorFactoryTest.java | 168 +++++++++++++++++++
4 files changed, 366 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fad6da89/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
index 2a164c3..eaea3ed 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
@@ -15,6 +15,8 @@
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;
+import com.google.cloud.dataflow.sdk.io.BoundedSource;
+import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
import com.google.cloud.dataflow.sdk.io.Read.Bounded;
import com.google.cloud.dataflow.sdk.io.Source.Reader;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
@@ -78,8 +80,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
@SuppressWarnings("unchecked")
private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
- final InProcessEvaluationContext evaluationContext)
- throws IOException {
+ final InProcessEvaluationContext evaluationContext) {
// Key by the application and the context the evaluation is occurring in (which call to
// Pipeline#run).
EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
@@ -101,21 +102,25 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
return evaluatorQueue;
}
+ /**
+ * A {@link BoundedReadEvaluator} produces elements from an underlying {@link BoundedSource},
+ * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
+ * creates the {@link BoundedReader} and consumes all available input.
+ *
+ * <p>A {@link BoundedReadEvaluator} should only be created once per {@link BoundedSource}, and
+ * each evaluator should only be called once per evaluation of the pipeline. Otherwise, the source
+ * may produce duplicate elements.
+ */
private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform;
private final InProcessEvaluationContext evaluationContext;
- private final Reader<OutputT> reader;
private boolean contentsRemaining;
public BoundedReadEvaluator(
AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
- InProcessEvaluationContext evaluationContext)
- throws IOException {
+ InProcessEvaluationContext evaluationContext) {
this.transform = transform;
this.evaluationContext = evaluationContext;
- reader =
- transform.getTransform().getSource().createReader(evaluationContext.getPipelineOptions());
- contentsRemaining = reader.start();
}
@Override
@@ -123,17 +128,25 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public InProcessTransformResult finishBundle() throws IOException {
- UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
- while (contentsRemaining) {
- output.add(
- WindowedValue.timestampedValueInGlobalWindow(
- reader.getCurrent(), reader.getCurrentTimestamp()));
- contentsRemaining = reader.advance();
+ try (final Reader<OutputT> reader =
+ transform
+ .getTransform()
+ .getSource()
+ .createReader(evaluationContext.getPipelineOptions());) {
+ contentsRemaining = reader.start();
+ UncommittedBundle<OutputT> output =
+ evaluationContext.createRootBundle(transform.getOutput());
+ while (contentsRemaining) {
+ output.add(
+ WindowedValue.timestampedValueInGlobalWindow(
+ reader.getCurrent(), reader.getCurrentTimestamp()));
+ contentsRemaining = reader.advance();
+ }
+ reader.close();
+ return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
+ .addOutput(output)
+ .build();
}
- return StepTransformResult
- .withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
- .addOutput(output)
- .build();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fad6da89/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
index 97f0e25..549afab 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
@@ -99,6 +99,16 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
return evaluatorQueue;
}
+ /**
+ * A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource},
+ * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
+ * creates the {@link UnboundedReader} and consumes some currently available input.
+ *
+ * <p>Calls to {@link UnboundedReadEvaluator} are not internally thread-safe, and should only be
+ * used by a single thread at a time. Each {@link UnboundedReadEvaluator} maintains its own
+ * checkpoint, and constructs its reader from the current checkpoint in each call to
+ * {@link #finishBundle()}.
+ */
private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
private static final int ARBITRARY_MAX_ELEMENTS = 10;
private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
@@ -122,28 +132,29 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public InProcessTransformResult finishBundle() throws IOException {
UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
- UnboundedReader<OutputT> reader =
- createReader(
- transform.getTransform().getSource(), evaluationContext.getPipelineOptions());
- int numElements = 0;
- if (reader.start()) {
- do {
- output.add(
- WindowedValue.timestampedValueInGlobalWindow(
- reader.getCurrent(), reader.getCurrentTimestamp()));
- numElements++;
- } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
+ try (UnboundedReader<OutputT> reader =
+ createReader(
+ transform.getTransform().getSource(), evaluationContext.getPipelineOptions());) {
+ int numElements = 0;
+ if (reader.start()) {
+ do {
+ output.add(
+ WindowedValue.timestampedValueInGlobalWindow(
+ reader.getCurrent(), reader.getCurrentTimestamp()));
+ numElements++;
+ } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
+ }
+ checkpointMark = reader.getCheckpointMark();
+ checkpointMark.finalizeCheckpoint();
+ // TODO: When exercising create initial splits, make this the minimum watermark across all
+ // existing readers
+ StepTransformResult result =
+ StepTransformResult.withHold(transform, reader.getWatermark())
+ .addOutput(output)
+ .build();
+ evaluatorQueue.offer(this);
+ return result;
}
- checkpointMark = reader.getCheckpointMark();
- checkpointMark.finalizeCheckpoint();
- // TODO: When exercising create initial splits, make this the minimum watermark across all
- // existing readers
- StepTransformResult result =
- StepTransformResult.withHold(transform, reader.getWatermark())
- .addOutput(output)
- .build();
- evaluatorQueue.offer(this);
- return result;
}
private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT> createReader(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fad6da89/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
index 4395514..e641dd6 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
@@ -18,24 +18,39 @@ package com.google.cloud.dataflow.sdk.runners.inprocess;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
+import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
import com.google.cloud.dataflow.sdk.io.CountingSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.Read.Bounded;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.ImmutableList;
+import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NoSuchElementException;
/**
* Tests for {@link BoundedReadEvaluatorFactory}.
@@ -45,7 +60,7 @@ public class BoundedReadEvaluatorFactoryTest {
private BoundedSource<Long> source;
private PCollection<Long> longs;
private TransformEvaluatorFactory factory;
- private InProcessEvaluationContext context;
+ @Mock private InProcessEvaluationContext context;
@Before
public void setup() {
@@ -146,6 +161,125 @@ public class BoundedReadEvaluatorFactoryTest {
gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
}
+ @Test
+ public void boundedSourceEvaluatorClosesReader() throws Exception {
+ TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
+
+ TestPipeline p = TestPipeline.create();
+ PCollection<Long> pcollection = p.apply(Read.from(source));
+ AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+
+ UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
+ when(context.createRootBundle(pcollection)).thenReturn(output);
+
+ TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+ evaluator.finishBundle();
+ CommittedBundle<Long> committed = output.commit(Instant.now());
+ assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), gw(1L)));
+ assertThat(TestSource.readerClosed, is(true));
+ }
+
+ @Test
+ public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
+ TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of());
+
+ TestPipeline p = TestPipeline.create();
+ PCollection<Long> pcollection = p.apply(Read.from(source));
+ AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+
+ UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
+ when(context.createRootBundle(pcollection)).thenReturn(output);
+
+ TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+ evaluator.finishBundle();
+ CommittedBundle<Long> committed = output.commit(Instant.now());
+ assertThat(committed.getElements(), emptyIterable());
+ assertThat(TestSource.readerClosed, is(true));
+ }
+
+ private static class TestSource<T> extends BoundedSource<T> {
+ private static boolean readerClosed;
+ private final Coder<T> coder;
+ private final T[] elems;
+
+ public TestSource(Coder<T> coder, T... elems) {
+ this.elems = elems;
+ this.coder = coder;
+ readerClosed = false;
+ }
+
+ @Override
+ public List<? extends BoundedSource<T>> splitIntoBundles(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ return ImmutableList.of(this);
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+ return false;
+ }
+
+ @Override
+ public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
+ return new TestReader<>(this, elems);
+ }
+
+ @Override
+ public void validate() {
+ }
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ return coder;
+ }
+ }
+
+ private static class TestReader<T> extends BoundedReader<T> {
+ private final BoundedSource<T> source;
+ private final List<T> elems;
+ private int index;
+
+ public TestReader(BoundedSource<T> source, T... elems) {
+ this.source = source;
+ this.elems = Arrays.asList(elems);
+ this.index = -1;
+ }
+
+ @Override
+ public BoundedSource<T> getCurrentSource() {
+ return source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (elems.size() > index + 1) {
+ index++;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ return elems.get(index);
+ }
+
+ @Override
+ public void close() throws IOException {
+ TestSource.readerClosed = true;
+ }
+ }
+
private static WindowedValue<Long> gw(Long elem) {
return WindowedValue.valueInGlobalWindow(elem);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fad6da89/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
index a9bbcc8..20a7d60 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
@@ -18,20 +18,30 @@ package com.google.cloud.dataflow.sdk.runners.inprocess;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
+import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.io.CountingSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource.CheckpointMark;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.ImmutableList;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
@@ -41,6 +51,15 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
/**
* Tests for {@link UnboundedReadEvaluatorFactory}.
*/
@@ -111,6 +130,41 @@ public class UnboundedReadEvaluatorFactoryTest {
tgw(15L), tgw(13L), tgw(10L)));
}
+ @Test
+ public void boundedSourceEvaluatorClosesReader() throws Exception {
+ TestUnboundedSource<Long> source =
+ new TestUnboundedSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
+
+ TestPipeline p = TestPipeline.create();
+ PCollection<Long> pcollection = p.apply(Read.from(source));
+ AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+
+ when(context.createRootBundle(pcollection)).thenReturn(output);
+
+ TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+ evaluator.finishBundle();
+ CommittedBundle<Long> committed = output.commit(Instant.now());
+ assertThat(ImmutableList.copyOf(committed.getElements()), hasSize(3));
+ assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+ }
+
+ @Test
+ public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
+ TestUnboundedSource<Long> source = new TestUnboundedSource<>(BigEndianLongCoder.of());
+
+ TestPipeline p = TestPipeline.create();
+ PCollection<Long> pcollection = p.apply(Read.from(source));
+ AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+
+ when(context.createRootBundle(pcollection)).thenReturn(output);
+
+ TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+ evaluator.finishBundle();
+ CommittedBundle<Long> committed = output.commit(Instant.now());
+ assertThat(committed.getElements(), emptyIterable());
+ assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+ }
+
// TODO: Once the source is split into multiple sources before evaluating, this test will have to
// be updated.
/**
@@ -156,4 +210,118 @@ public class UnboundedReadEvaluatorFactoryTest {
return new Instant(input);
}
}
+
+ private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
+ static int readerClosedCount;
+ private final Coder<T> coder;
+ private final List<T> elems;
+
+ public TestUnboundedSource(Coder<T> coder, T... elems) {
+ readerClosedCount = 0;
+ this.coder = coder;
+ this.elems = Arrays.asList(elems);
+ }
+
+ @Override
+ public List<? extends UnboundedSource<T, TestCheckpointMark>> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ return ImmutableList.of(this);
+ }
+
+ @Override
+ public UnboundedSource.UnboundedReader<T> createReader(
+ PipelineOptions options, TestCheckpointMark checkpointMark) {
+ return new TestUnboundedReader(elems);
+ }
+
+ @Override
+ @Nullable
+ public Coder<TestCheckpointMark> getCheckpointMarkCoder() {
+ return new TestCheckpointMark.Coder();
+ }
+
+ @Override
+ public void validate() {}
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ return coder;
+ }
+
+ private class TestUnboundedReader extends UnboundedReader<T> {
+ private final List<T> elems;
+ private int index;
+
+ public TestUnboundedReader(List<T> elems) {
+ this.elems = elems;
+ this.index = -1;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (index + 1 < elems.size()) {
+ index++;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return Instant.now();
+ }
+
+ @Override
+ public CheckpointMark getCheckpointMark() {
+ return new TestCheckpointMark();
+ }
+
+ @Override
+ public UnboundedSource<T, ?> getCurrentSource() {
+ TestUnboundedSource<T> source = TestUnboundedSource.this;
+ return source;
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ return elems.get(index);
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return Instant.now();
+ }
+
+ @Override
+ public void close() throws IOException {
+ readerClosedCount++;
+ }
+ }
+ }
+
+ private static class TestCheckpointMark implements CheckpointMark {
+ @Override
+ public void finalizeCheckpoint() throws IOException {}
+
+ public static class Coder extends AtomicCoder<TestCheckpointMark> {
+ @Override
+ public void encode(
+ TestCheckpointMark value,
+ OutputStream outStream,
+ com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {}
+
+ @Override
+ public TestCheckpointMark decode(
+ InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ return new TestCheckpointMark();
+ }
+ }
+ }
}
[2/2] incubator-beam git commit: This closes #52
Posted by ke...@apache.org.
This closes #52
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5b5c0e28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5b5c0e28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5b5c0e28
Branch: refs/heads/master
Commit: 5b5c0e28ff2326b28db6e9bec4eae2af44eb92b9
Parents: d393468 fad6da8
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Mar 17 21:02:00 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Mar 17 21:02:00 2016 -0700
----------------------------------------------------------------------
.../inprocess/BoundedReadEvaluatorFactory.java | 49 ++++--
.../UnboundedReadEvaluatorFactory.java | 53 +++---
.../BoundedReadEvaluatorFactoryTest.java | 136 ++++++++++++++-
.../UnboundedReadEvaluatorFactoryTest.java | 168 +++++++++++++++++++
4 files changed, 366 insertions(+), 40 deletions(-)
----------------------------------------------------------------------