You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/15 20:58:52 UTC

[1/2] incubator-beam git commit: Reuse UnboundedReaders in the InProcessRunner

Repository: incubator-beam
Updated Branches:
  refs/heads/master 462453d23 -> 2f46bc004


Reuse UnboundedReaders in the InProcessRunner

Reuse up to a point, and then discard the reader to exercise resume from
checkpoint.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0edf462a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0edf462a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0edf462a

Branch: refs/heads/master
Commit: 0edf462a9d85810331621b0ae67fdd9902fc3f95
Parents: 462453d
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jun 13 18:34:49 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 15 13:58:45 2016 -0700

----------------------------------------------------------------------
 .../direct/UnboundedReadEvaluatorFactory.java   | 145 +++++++++++++++----
 .../UnboundedReadEvaluatorFactoryTest.java      |  54 +++++--
 2 files changed, 158 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0edf462a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index bdb8f37..fceb20c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -23,12 +23,17 @@ import org.apache.beam.sdk.io.Read.Unbounded;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import org.joda.time.Instant;
+
 import java.io.IOException;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -42,6 +47,10 @@ import javax.annotation.Nullable;
  * for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}.
  */
 class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
+  // Resume from a checkpoint every nth invocation, to ensure close-and-resume is exercised
+  @VisibleForTesting
+  static final int MAX_READER_REUSE_COUNT = 20;
+
   /*
    * An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted.
    * Evaluators are cached here to ensure that the checkpoint mark is appropriately reused
@@ -53,7 +62,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    * an arbitrary Queue implementation does not, so the concrete type is used explicitly.
    */
   private final ConcurrentMap<
-      EvaluatorKey, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?>>>
+      EvaluatorKey, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?, ?>>>
       sourceEvaluators = new ConcurrentHashMap<>();
 
   @SuppressWarnings({"unchecked", "rawtypes"})
@@ -78,29 +87,33 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    * already done so.
    */
   @SuppressWarnings("unchecked")
-  private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
+  private <OutputT, CheckpointMarkT extends CheckpointMark>
+  Queue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> getTransformEvaluatorQueue(
       final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
       final InProcessEvaluationContext evaluationContext) {
     // Key by the application and the context the evaluation is occurring in (which call to
     // Pipeline#run).
     EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
     @SuppressWarnings("unchecked")
-    ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue =
-        (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+    ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue =
+        (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>)
+            sourceEvaluators.get(key);
     if (evaluatorQueue == null) {
       evaluatorQueue = new ConcurrentLinkedQueue<>();
       if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
         // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
         // factory for this transform
-        UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
-        UnboundedReadEvaluator<OutputT> evaluator =
-            new UnboundedReadEvaluator<OutputT>(
+        UnboundedSource<OutputT, CheckpointMarkT> source =
+            (UnboundedSource<OutputT, CheckpointMarkT>) transform.getTransform().getSource();
+        UnboundedReadEvaluator<OutputT, CheckpointMarkT> evaluator =
+            new UnboundedReadEvaluator<>(
                 transform, evaluationContext, source, evaluatorQueue);
         evaluatorQueue.offer(evaluator);
       } else {
         // otherwise return the existing Queue that arrived before us
         evaluatorQueue =
-            (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+            (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>)
+                sourceEvaluators.get(key);
       }
     }
     return evaluatorQueue;
@@ -116,27 +129,38 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    * checkpoint, and constructs its reader from the current checkpoint in each call to
    * {@link #finishBundle()}.
    */
-  private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
+  private static class UnboundedReadEvaluator<OutputT, CheckpointMarkT extends CheckpointMark>
+      implements TransformEvaluator<Object> {
     private static final int ARBITRARY_MAX_ELEMENTS = 10;
+
     private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
     private final InProcessEvaluationContext evaluationContext;
-    private final ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
+    private final ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>
+        evaluatorQueue;
     /**
      * The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same
      * source as derived from {@link #transform} due to splitting.
      */
-    private final UnboundedSource<OutputT, ?> source;
-    private CheckpointMark checkpointMark;
+    private final UnboundedSource<OutputT, CheckpointMarkT> source;
+    private UnboundedReader<OutputT> currentReader;
+    private CheckpointMarkT checkpointMark;
+
+    /**
+     * The count of bundles output from this {@link UnboundedReadEvaluator}. Used to exercise
+     * {@link UnboundedReader#close()}.
+     */
+    private int outputBundles = 0;
 
     public UnboundedReadEvaluator(
         AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
         InProcessEvaluationContext evaluationContext,
-        UnboundedSource<OutputT, ?> source,
-        ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
+        UnboundedSource<OutputT, CheckpointMarkT> source,
+        ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue) {
       this.transform = transform;
       this.evaluationContext = evaluationContext;
       this.evaluatorQueue = evaluatorQueue;
       this.source = source;
+      this.currentReader = null;
       this.checkpointMark = null;
     }
 
@@ -146,35 +170,92 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     @Override
     public InProcessTransformResult finishBundle() throws IOException {
       UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
-      try (UnboundedReader<OutputT> reader =
-              createReader(source, evaluationContext.getPipelineOptions());) {
-        int numElements = 0;
-        if (reader.start()) {
+      try {
+        boolean elementAvailable = startReader();
+
+        Instant watermark = currentReader.getWatermark();
+        if (elementAvailable) {
+          int numElements = 0;
           do {
-            output.add(
-                WindowedValue.timestampedValueInGlobalWindow(
-                    reader.getCurrent(), reader.getCurrentTimestamp()));
+            output.add(WindowedValue.timestampedValueInGlobalWindow(currentReader.getCurrent(),
+                currentReader.getCurrentTimestamp()));
             numElements++;
-          } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
+          } while (numElements < ARBITRARY_MAX_ELEMENTS && currentReader.advance());
+          watermark = currentReader.getWatermark();
+          // Only take a checkpoint if we did any work
+          finishRead();
         }
-        checkpointMark = reader.getCheckpointMark();
-        checkpointMark.finalizeCheckpoint();
         // TODO: When exercising create initial splits, make this the minimum watermark across all
         // existing readers
         StepTransformResult result =
-            StepTransformResult.withHold(transform, reader.getWatermark())
-                .addOutput(output)
-                .build();
+            StepTransformResult.withHold(transform, watermark).addOutput(output).build();
         evaluatorQueue.offer(this);
         return result;
+      } catch (IOException e) {
+        closeReader();
+        throw e;
+      }
+    }
+
+    private boolean startReader() throws IOException {
+      if (currentReader == null) {
+        if (checkpointMark != null) {
+          checkpointMark.finalizeCheckpoint();
+        }
+        currentReader = source.createReader(evaluationContext.getPipelineOptions(), checkpointMark);
+        checkpointMark = null;
+        return currentReader.start();
+      } else {
+        return currentReader.advance();
       }
     }
 
-    private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT> createReader(
-        UnboundedSource<OutputT, CheckpointMarkT> source, PipelineOptions options) {
+    /**
+     * Checkpoint the current reader, finalize the previous checkpoint, and update the state of this
+     * evaluator.
+     */
+    private void finishRead() throws IOException {
+      final CheckpointMark oldMark = checkpointMark;
       @SuppressWarnings("unchecked")
-      CheckpointMarkT mark = (CheckpointMarkT) checkpointMark;
-      return source.createReader(options, mark);
+      final CheckpointMarkT mark = (CheckpointMarkT) currentReader.getCheckpointMark();
+      checkpointMark = mark;
+      if (oldMark != null) {
+        oldMark.finalizeCheckpoint();
+      }
+
+      // If the watermark is the max value, this source may not be invoked again. Finalize after
+      // committing the output.
+      if (!currentReader.getWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+        evaluationContext.scheduleAfterOutputWouldBeProduced(transform.getOutput(),
+            GlobalWindow.INSTANCE,
+            transform.getOutput().getWindowingStrategy(),
+            new Runnable() {
+              public void run() {
+                try {
+                  mark.finalizeCheckpoint();
+                } catch (IOException e) {
+                  throw new RuntimeException(
+                      "Couldn't finalize checkpoint after the end of the Global Window",
+                      e);
+                }
+              }
+            });
+      }
+      // Sometimes resume from a checkpoint even if it's not required
+
+      if (outputBundles >= MAX_READER_REUSE_COUNT) {
+        closeReader();
+        outputBundles = 0;
+      } else {
+        outputBundles++;
+      }
+    }
+
+    private void closeReader() throws IOException {
+      if (currentReader != null) {
+        currentReader.close();
+        currentReader = null;
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0edf462a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 768f5a9..05656eb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.nullValue;
@@ -44,7 +43,10 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.DiscreteDomain;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Range;
 
 import org.hamcrest.Matchers;
 import org.joda.time.DateTime;
@@ -136,7 +138,29 @@ public class UnboundedReadEvaluatorFactoryTest {
   }
 
   @Test
-  public void evaluatorClosesReader() throws Exception {
+  public void evaluatorClosesReaderAfterOutputCount() throws Exception {
+    ContiguousSet<Long> elems = ContiguousSet.create(
+        Range.closed(0L, 20L * UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT),
+        DiscreteDomain.longs());
+    TestUnboundedSource<Long> source =
+        new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0]));
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Long> pcollection = p.apply(Read.from(source));
+    AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+
+    UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
+    when(context.createRootBundle(pcollection)).thenReturn(output);
+
+    for (int i = 0; i < UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT + 1; i++) {
+      TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+      evaluator.finishBundle();
+    }
+    assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+  }
+
+  @Test
+  public void evaluatorReusesReaderBeforeCount() throws Exception {
     TestUnboundedSource<Long> source =
         new TestUnboundedSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
 
@@ -151,11 +175,18 @@ public class UnboundedReadEvaluatorFactoryTest {
     evaluator.finishBundle();
     CommittedBundle<Long> committed = output.commit(Instant.now());
     assertThat(ImmutableList.copyOf(committed.getElements()), hasSize(3));
-    assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+    assertThat(TestUnboundedSource.readerClosedCount, equalTo(0));
+    assertThat(TestUnboundedSource.readerAdvancedCount, equalTo(4));
+
+    evaluator = factory.forApplication(sourceTransform, null, context);
+    evaluator.finishBundle();
+    assertThat(TestUnboundedSource.readerClosedCount, equalTo(0));
+    // Tried to advance again, even with no elements
+    assertThat(TestUnboundedSource.readerAdvancedCount, equalTo(5));
   }
 
   @Test
-  public void evaluatorNoElementsClosesReader() throws Exception {
+  public void evaluatorNoElementsReusesReaderAlways() throws Exception {
     TestUnboundedSource<Long> source = new TestUnboundedSource<>(BigEndianLongCoder.of());
 
     TestPipeline p = TestPipeline.create();
@@ -165,11 +196,13 @@ public class UnboundedReadEvaluatorFactoryTest {
     UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
     when(context.createRootBundle(pcollection)).thenReturn(output);
 
-    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
-    evaluator.finishBundle();
-    CommittedBundle<Long> committed = output.commit(Instant.now());
-    assertThat(committed.getElements(), emptyIterable());
-    assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+    for (int i = 0; i < 2 * UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT; i++) {
+      TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+      evaluator.finishBundle();
+    }
+    assertThat(TestUnboundedSource.readerClosedCount, equalTo(0));
+    assertThat(TestUnboundedSource.readerAdvancedCount,
+        equalTo(2 * UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT));
   }
 
   // TODO: Once the source is split into multiple sources before evaluating, this test will have to
@@ -215,10 +248,12 @@ public class UnboundedReadEvaluatorFactoryTest {
 
   private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
     static int readerClosedCount;
+    static int readerAdvancedCount;
     private final Coder<T> coder;
     private final List<T> elems;
 
     public TestUnboundedSource(Coder<T> coder, T... elems) {
+      readerAdvancedCount = 0;
       readerClosedCount = 0;
       this.coder = coder;
       this.elems = Arrays.asList(elems);
@@ -266,6 +301,7 @@ public class UnboundedReadEvaluatorFactoryTest {
 
       @Override
       public boolean advance() throws IOException {
+        readerAdvancedCount++;
         if (index + 1 < elems.size()) {
           index++;
           return true;


[2/2] incubator-beam git commit: Closes #460

Posted by dh...@apache.org.
Closes #460


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2f46bc00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2f46bc00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2f46bc00

Branch: refs/heads/master
Commit: 2f46bc00430dee96b92087f17819003620039b92
Parents: 462453d 0edf462
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jun 15 13:58:46 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 15 13:58:46 2016 -0700

----------------------------------------------------------------------
 .../direct/UnboundedReadEvaluatorFactory.java   | 145 +++++++++++++++----
 .../UnboundedReadEvaluatorFactoryTest.java      |  54 +++++--
 2 files changed, 158 insertions(+), 41 deletions(-)
----------------------------------------------------------------------