You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/18 05:02:10 UTC

[1/2] incubator-beam git commit: Close Readers in InProcess Read Evaluators

Repository: incubator-beam
Updated Branches:
  refs/heads/master d39346823 -> 5b5c0e28f


Close Readers in InProcess Read Evaluators

The readers were formerly left open, which prevents release of any
resources that should be released.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fad6da89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fad6da89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fad6da89

Branch: refs/heads/master
Commit: fad6da89079791952a937aed257f0d2db1467053
Parents: d393468
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 15 11:50:38 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Mar 17 21:01:18 2016 -0700

----------------------------------------------------------------------
 .../inprocess/BoundedReadEvaluatorFactory.java  |  49 ++++--
 .../UnboundedReadEvaluatorFactory.java          |  53 +++---
 .../BoundedReadEvaluatorFactoryTest.java        | 136 ++++++++++++++-
 .../UnboundedReadEvaluatorFactoryTest.java      | 168 +++++++++++++++++++
 4 files changed, 366 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fad6da89/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
index 2a164c3..eaea3ed 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
@@ -15,6 +15,8 @@
  */
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
+import com.google.cloud.dataflow.sdk.io.BoundedSource;
+import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
 import com.google.cloud.dataflow.sdk.io.Read.Bounded;
 import com.google.cloud.dataflow.sdk.io.Source.Reader;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
@@ -78,8 +80,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
   @SuppressWarnings("unchecked")
   private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
       final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
-      final InProcessEvaluationContext evaluationContext)
-      throws IOException {
+      final InProcessEvaluationContext evaluationContext) {
     // Key by the application and the context the evaluation is occurring in (which call to
     // Pipeline#run).
     EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
@@ -101,21 +102,25 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     return evaluatorQueue;
   }
 
+  /**
+   * A {@link BoundedReadEvaluator} produces elements from an underlying {@link BoundedSource},
+   * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
+   * creates the {@link BoundedReader} and consumes all available input.
+   *
+   * <p>A {@link BoundedReadEvaluator} should only be created once per {@link BoundedSource}, and
+   * each evaluator should only be called once per evaluation of the pipeline. Otherwise, the source
+   * may produce duplicate elements.
+   */
   private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
     private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform;
     private final InProcessEvaluationContext evaluationContext;
-    private final Reader<OutputT> reader;
     private boolean contentsRemaining;
 
     public BoundedReadEvaluator(
         AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
-        InProcessEvaluationContext evaluationContext)
-        throws IOException {
+        InProcessEvaluationContext evaluationContext) {
       this.transform = transform;
       this.evaluationContext = evaluationContext;
-      reader =
-          transform.getTransform().getSource().createReader(evaluationContext.getPipelineOptions());
-      contentsRemaining = reader.start();
     }
 
     @Override
@@ -123,17 +128,25 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
 
     @Override
     public InProcessTransformResult finishBundle() throws IOException {
-      UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
-      while (contentsRemaining) {
-        output.add(
-            WindowedValue.timestampedValueInGlobalWindow(
-                reader.getCurrent(), reader.getCurrentTimestamp()));
-        contentsRemaining = reader.advance();
+      try (final Reader<OutputT> reader =
+              transform
+                  .getTransform()
+                  .getSource()
+                  .createReader(evaluationContext.getPipelineOptions());) {
+        contentsRemaining = reader.start();
+        UncommittedBundle<OutputT> output =
+            evaluationContext.createRootBundle(transform.getOutput());
+        while (contentsRemaining) {
+          output.add(
+              WindowedValue.timestampedValueInGlobalWindow(
+                  reader.getCurrent(), reader.getCurrentTimestamp()));
+          contentsRemaining = reader.advance();
+        }
+        reader.close();
+        return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
+            .addOutput(output)
+            .build();
       }
-      return StepTransformResult
-          .withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE)
-          .addOutput(output)
-          .build();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fad6da89/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
index 97f0e25..549afab 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
@@ -99,6 +99,16 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     return evaluatorQueue;
   }
 
+  /**
+   * A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource},
+   * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
+   * creates the {@link UnboundedReader} and consumes some currently available input.
+   *
+   * <p>Calls to {@link UnboundedReadEvaluator} are not internally thread-safe, and should only be
+   * used by a single thread at a time. Each {@link UnboundedReadEvaluator} maintains its own
+   * checkpoint, and constructs its reader from the current checkpoint in each call to
+   * {@link #finishBundle()}.
+   */
   private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
     private static final int ARBITRARY_MAX_ELEMENTS = 10;
     private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
@@ -122,28 +132,29 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     @Override
     public InProcessTransformResult finishBundle() throws IOException {
       UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
-      UnboundedReader<OutputT> reader =
-          createReader(
-              transform.getTransform().getSource(), evaluationContext.getPipelineOptions());
-      int numElements = 0;
-      if (reader.start()) {
-        do {
-          output.add(
-              WindowedValue.timestampedValueInGlobalWindow(
-                  reader.getCurrent(), reader.getCurrentTimestamp()));
-          numElements++;
-        } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
+      try (UnboundedReader<OutputT> reader =
+              createReader(
+                  transform.getTransform().getSource(), evaluationContext.getPipelineOptions());) {
+        int numElements = 0;
+        if (reader.start()) {
+          do {
+            output.add(
+                WindowedValue.timestampedValueInGlobalWindow(
+                    reader.getCurrent(), reader.getCurrentTimestamp()));
+            numElements++;
+          } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
+        }
+        checkpointMark = reader.getCheckpointMark();
+        checkpointMark.finalizeCheckpoint();
+        // TODO: When exercising create initial splits, make this the minimum watermark across all
+        // existing readers
+        StepTransformResult result =
+            StepTransformResult.withHold(transform, reader.getWatermark())
+                .addOutput(output)
+                .build();
+        evaluatorQueue.offer(this);
+        return result;
       }
-      checkpointMark = reader.getCheckpointMark();
-      checkpointMark.finalizeCheckpoint();
-      // TODO: When exercising create initial splits, make this the minimum watermark across all
-      // existing readers
-      StepTransformResult result =
-          StepTransformResult.withHold(transform, reader.getWatermark())
-              .addOutput(output)
-              .build();
-      evaluatorQueue.offer(this);
-      return result;
     }
 
     private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT> createReader(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fad6da89/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
index 4395514..e641dd6 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
@@ -18,24 +18,39 @@ package com.google.cloud.dataflow.sdk.runners.inprocess;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.io.BoundedSource;
+import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader;
 import com.google.cloud.dataflow.sdk.io.CountingSource;
 import com.google.cloud.dataflow.sdk.io.Read;
 import com.google.cloud.dataflow.sdk.io.Read.Bounded;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.ImmutableList;
 
+import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NoSuchElementException;
 
 /**
  * Tests for {@link BoundedReadEvaluatorFactory}.
@@ -45,7 +60,7 @@ public class BoundedReadEvaluatorFactoryTest {
   private BoundedSource<Long> source;
   private PCollection<Long> longs;
   private TransformEvaluatorFactory factory;
-  private InProcessEvaluationContext context;
+  @Mock private InProcessEvaluationContext context;
 
   @Before
   public void setup() {
@@ -146,6 +161,125 @@ public class BoundedReadEvaluatorFactoryTest {
             gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
   }
 
+  @Test
+  public void boundedSourceEvaluatorClosesReader() throws Exception {
+    TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Long> pcollection = p.apply(Read.from(source));
+    AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+
+    UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
+    when(context.createRootBundle(pcollection)).thenReturn(output);
+
+    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+    evaluator.finishBundle();
+    CommittedBundle<Long> committed = output.commit(Instant.now());
+    assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), gw(1L)));
+    assertThat(TestSource.readerClosed, is(true));
+  }
+
+  @Test
+  public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
+    TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of());
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Long> pcollection = p.apply(Read.from(source));
+    AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+
+    UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
+    when(context.createRootBundle(pcollection)).thenReturn(output);
+
+    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+    evaluator.finishBundle();
+    CommittedBundle<Long> committed = output.commit(Instant.now());
+    assertThat(committed.getElements(), emptyIterable());
+    assertThat(TestSource.readerClosed, is(true));
+  }
+
+  private static class TestSource<T> extends BoundedSource<T> {
+    private static boolean readerClosed;
+    private final Coder<T> coder;
+    private final T[] elems;
+
+    public TestSource(Coder<T> coder, T... elems) {
+      this.elems = elems;
+      this.coder = coder;
+      readerClosed = false;
+    }
+
+    @Override
+    public List<? extends BoundedSource<T>> splitIntoBundles(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      return ImmutableList.of(this);
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      return 0;
+    }
+
+    @Override
+    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+      return false;
+    }
+
+    @Override
+    public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
+      return new TestReader<>(this, elems);
+    }
+
+    @Override
+    public void validate() {
+    }
+
+    @Override
+    public Coder<T> getDefaultOutputCoder() {
+      return coder;
+    }
+  }
+
+  private static class TestReader<T> extends BoundedReader<T> {
+    private final BoundedSource<T> source;
+    private final List<T> elems;
+    private int index;
+
+    public TestReader(BoundedSource<T> source, T... elems) {
+      this.source = source;
+      this.elems = Arrays.asList(elems);
+      this.index = -1;
+    }
+
+    @Override
+    public BoundedSource<T> getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (elems.size() > index + 1) {
+        index++;
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      return elems.get(index);
+    }
+
+    @Override
+    public void close() throws IOException {
+      TestSource.readerClosed = true;
+    }
+  }
+
   private static WindowedValue<Long> gw(Long elem) {
     return WindowedValue.valueInGlobalWindow(elem);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fad6da89/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
index a9bbcc8..20a7d60 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
@@ -18,20 +18,30 @@ package com.google.cloud.dataflow.sdk.runners.inprocess;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
+import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
 import com.google.cloud.dataflow.sdk.io.CountingSource;
 import com.google.cloud.dataflow.sdk.io.Read;
 import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource.CheckpointMark;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.ImmutableList;
 
 import org.hamcrest.Matchers;
 import org.joda.time.DateTime;
@@ -41,6 +51,15 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
 /**
  * Tests for {@link UnboundedReadEvaluatorFactory}.
  */
@@ -111,6 +130,41 @@ public class UnboundedReadEvaluatorFactoryTest {
             tgw(15L), tgw(13L), tgw(10L)));
   }
 
+  @Test
+  public void boundedSourceEvaluatorClosesReader() throws Exception {
+    TestUnboundedSource<Long> source =
+        new TestUnboundedSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Long> pcollection = p.apply(Read.from(source));
+    AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+
+    when(context.createRootBundle(pcollection)).thenReturn(output);
+
+    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+    evaluator.finishBundle();
+    CommittedBundle<Long> committed = output.commit(Instant.now());
+    assertThat(ImmutableList.copyOf(committed.getElements()), hasSize(3));
+    assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+  }
+
+  @Test
+  public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
+    TestUnboundedSource<Long> source = new TestUnboundedSource<>(BigEndianLongCoder.of());
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Long> pcollection = p.apply(Read.from(source));
+    AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+
+    when(context.createRootBundle(pcollection)).thenReturn(output);
+
+    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+    evaluator.finishBundle();
+    CommittedBundle<Long> committed = output.commit(Instant.now());
+    assertThat(committed.getElements(), emptyIterable());
+    assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+  }
+
   // TODO: Once the source is split into multiple sources before evaluating, this test will have to
   // be updated.
   /**
@@ -156,4 +210,118 @@ public class UnboundedReadEvaluatorFactoryTest {
       return new Instant(input);
     }
   }
+
+  private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
+    static int readerClosedCount;
+    private final Coder<T> coder;
+    private final List<T> elems;
+
+    public TestUnboundedSource(Coder<T> coder, T... elems) {
+      readerClosedCount = 0;
+      this.coder = coder;
+      this.elems = Arrays.asList(elems);
+    }
+
+    @Override
+    public List<? extends UnboundedSource<T, TestCheckpointMark>> generateInitialSplits(
+        int desiredNumSplits, PipelineOptions options) throws Exception {
+      return ImmutableList.of(this);
+    }
+
+    @Override
+    public UnboundedSource.UnboundedReader<T> createReader(
+        PipelineOptions options, TestCheckpointMark checkpointMark) {
+      return new TestUnboundedReader(elems);
+    }
+
+    @Override
+    @Nullable
+    public Coder<TestCheckpointMark> getCheckpointMarkCoder() {
+      return new TestCheckpointMark.Coder();
+    }
+
+    @Override
+    public void validate() {}
+
+    @Override
+    public Coder<T> getDefaultOutputCoder() {
+      return coder;
+    }
+
+    private class TestUnboundedReader extends UnboundedReader<T> {
+      private final List<T> elems;
+      private int index;
+
+      public TestUnboundedReader(List<T> elems) {
+        this.elems = elems;
+        this.index = -1;
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        return advance();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        if (index + 1 < elems.size()) {
+          index++;
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public Instant getWatermark() {
+        return Instant.now();
+      }
+
+      @Override
+      public CheckpointMark getCheckpointMark() {
+        return new TestCheckpointMark();
+      }
+
+      @Override
+      public UnboundedSource<T, ?> getCurrentSource() {
+        TestUnboundedSource<T> source = TestUnboundedSource.this;
+        return source;
+      }
+
+      @Override
+      public T getCurrent() throws NoSuchElementException {
+        return elems.get(index);
+      }
+
+      @Override
+      public Instant getCurrentTimestamp() throws NoSuchElementException {
+        return Instant.now();
+      }
+
+      @Override
+      public void close() throws IOException {
+        readerClosedCount++;
+      }
+    }
+  }
+
+  private static class TestCheckpointMark implements CheckpointMark {
+    @Override
+    public void finalizeCheckpoint() throws IOException {}
+
+    public static class Coder extends AtomicCoder<TestCheckpointMark> {
+      @Override
+      public void encode(
+          TestCheckpointMark value,
+          OutputStream outStream,
+          com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+          throws CoderException, IOException {}
+
+      @Override
+      public TestCheckpointMark decode(
+          InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+          throws CoderException, IOException {
+        return new TestCheckpointMark();
+      }
+    }
+  }
 }


[2/2] incubator-beam git commit: This closes #52

Posted by ke...@apache.org.
This closes #52


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5b5c0e28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5b5c0e28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5b5c0e28

Branch: refs/heads/master
Commit: 5b5c0e28ff2326b28db6e9bec4eae2af44eb92b9
Parents: d393468 fad6da8
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Mar 17 21:02:00 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Mar 17 21:02:00 2016 -0700

----------------------------------------------------------------------
 .../inprocess/BoundedReadEvaluatorFactory.java  |  49 ++++--
 .../UnboundedReadEvaluatorFactory.java          |  53 +++---
 .../BoundedReadEvaluatorFactoryTest.java        | 136 ++++++++++++++-
 .../UnboundedReadEvaluatorFactoryTest.java      | 168 +++++++++++++++++++
 4 files changed, 366 insertions(+), 40 deletions(-)
----------------------------------------------------------------------