You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/03/23 23:08:26 UTC

[1/2] incubator-beam git commit: Schedule roots less aggressively

Repository: incubator-beam
Updated Branches:
  refs/heads/master 619db54db -> 9247ad78d


Schedule roots less aggressively

The excess scheduling of known-empty bundles can consume excessive
resources.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35df1668
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35df1668
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35df1668

Branch: refs/heads/master
Commit: 35df16686940b7dae7bcfb2a42a104bc8ea79f11
Parents: 619db54
Author: Thomas Groh <tg...@google.com>
Authored: Mon Mar 21 17:00:35 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Mar 23 14:56:09 2016 -0700

----------------------------------------------------------------------
 .../ExecutorServiceParallelExecutor.java        |  76 +++++++++----
 .../inprocess/InMemoryWatermarkManager.java     |  16 ---
 .../inprocess/InProcessEvaluationContext.java   |  46 ++++++--
 .../InProcessEvaluationContextTest.java         | 110 ++++++++++++++++++-
 4 files changed, 200 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35df1668/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
index c72a115..68a1b8c 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
@@ -304,8 +304,8 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
             visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
           }
         }
-        fireTimers();
-        mightNeedMoreWork();
+        boolean timersFired = fireTimers();
+        addWorkIfNecessary(timersFired);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         LOG.error("Monitor died due to being interrupted");
@@ -326,8 +326,12 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
       }
     }
 
-    private void fireTimers() throws Exception {
+    /**
+     * Fires any available timers. Returns true if at least one timer was fired.
+     */
+    private boolean fireTimers() throws Exception {
       try {
+        boolean firedTimers = false;
         for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> transformTimers :
             evaluationContext.extractFiredTimers().entrySet()) {
           AppliedPTransform<?, ?, ?> transform = transformTimers.getKey();
@@ -346,9 +350,11 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
                       .add(WindowedValue.valueInEmptyWindows(work))
                       .commit(Instant.now());
               scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery));
+              firedTimers = true;
             }
           }
         }
+        return firedTimers;
       } catch (Exception e) {
         LOG.error("Internal Error while delivering timers", e);
         throw e;
@@ -367,26 +373,58 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
       return false;
     }
 
-    private void mightNeedMoreWork() {
-      synchronized (scheduledExecutors) {
-        for (TransformExecutor<?> executor : scheduledExecutors.keySet()) {
-          Thread thread = executor.getThread();
-          if (thread != null) {
-            switch (thread.getState()) {
-              case BLOCKED:
-              case WAITING:
-              case TERMINATED:
-              case TIMED_WAITING:
-                break;
-              default:
-                return;
-            }
-          }
+    /**
+     * If all active {@link TransformExecutor TransformExecutors} are in a blocked state,
+     * add more work from root nodes that may have additional work. This ensures that if a pipeline
+     * has elements available from the root nodes it will add those elements when necessary.
+     */
+    private void addWorkIfNecessary(boolean firedTimers) {
+      // If any timers have fired, they will add more work; We don't need to add more
+      if (firedTimers) {
+        return;
+      }
+      for (TransformExecutor<?> executor : scheduledExecutors.keySet()) {
+        if (!isExecutorBlocked(executor)) {
+          // We have at least one executor that can proceed without adding additional work
+          return;
         }
       }
       // All current TransformExecutors are blocked; add more work from the roots.
       for (AppliedPTransform<?, ?, ?> root : rootNodes) {
-        scheduleConsumption(root, null, defaultCompletionCallback);
+        if (!evaluationContext.isDone(root)) {
+          scheduleConsumption(root, null, defaultCompletionCallback);
+        }
+      }
+    }
+
+    /**
+     * Return true if the provided executor might make more progress if no action is taken.
+     *
+     * <p>May return false even if all executor threads are currently blocked or cleaning up, as
+     * these can cause more work to be scheduled. If this does not occur, after these calls
+     * terminate, future calls will return true if all executors are waiting.
+     */
+    private boolean isExecutorBlocked(TransformExecutor<?> executor) {
+      Thread thread = executor.getThread();
+      if (thread == null) {
+        return false;
+      }
+      switch (thread.getState()) {
+        case TERMINATED:
+          throw new IllegalStateException(String.format(
+              "Unexpectedly encountered a Terminated TransformExecutor %s", executor));
+        case WAITING:
+        case TIMED_WAITING:
+          // The thread is waiting for some external input. Adding more work may cause the thread
+          // to stop waiting (e.g. the thread is waiting on an unbounded side input)
+          return true;
+        case BLOCKED:
+          // The executor is blocked on acquisition of a java monitor. This usually means it is
+          // making a call to the EvaluationContext, but not a model-blocking call - and will
+          // eventually complete, at which point we may reevaluate.
+        default:
+          // NEW and RUNNABLE threads can make progress
+          return false;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35df1668/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
index 094526d..a9a62a6 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
@@ -862,22 +862,6 @@ public class InMemoryWatermarkManager {
   }
 
   /**
-   * Returns true if, for any {@link TransformWatermarks} returned by
-   * {@link #getWatermarks(AppliedPTransform)}, the output watermark will be equal to
-   * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
-   */
-  public boolean allWatermarksAtPositiveInfinity() {
-    for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry :
-        transformToWatermarks.entrySet()) {
-      Instant endOfTime = THE_END_OF_TIME.get();
-      if (watermarksEntry.getValue().getOutputWatermark().isBefore(endOfTime)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
    * A (key, Instant) pair that holds the watermark. Holds are per-key, but the watermark is global,
    * and as such the watermark manager must track holds and the release of holds on a per-key basis.
    *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35df1668/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
index 2908fba..4aeb0d3 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
@@ -357,27 +357,49 @@ class InProcessEvaluationContext {
   }
 
   /**
-   * Returns true if all steps are done.
+   * Returns true if the step will not produce additional output.
+   *
+   * <p>If the provided transform produces only {@link IsBounded#BOUNDED}
+   * {@link PCollection PCollections}, returns true if the watermark is at
+   * {@link BoundedWindow#TIMESTAMP_MAX_VALUE positive infinity}.
+   *
+   * <p>If the provided transform produces any {@link IsBounded#UNBOUNDED}
+   * {@link PCollection PCollections}, returns the value of
+   * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()}.
    */
-  public boolean isDone() {
-    if (!options.isShutdownUnboundedProducersWithMaxWatermark() && containsUnboundedPCollection()) {
+  public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
+    // if the PTransform's watermark isn't at the max value, it isn't done
+    if (watermarkManager
+        .getWatermarks(transform)
+        .getOutputWatermark()
+        .isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
       return false;
     }
-    if (!watermarkManager.allWatermarksAtPositiveInfinity()) {
-      return false;
+    // If the PTransform has any unbounded outputs, and unbounded producers should not be shut down,
+    // the PTransform may produce additional output. It is not done.
+    for (PValue output : transform.getOutput().expand()) {
+      if (output instanceof PCollection) {
+        IsBounded bounded = ((PCollection<?>) output).isBounded();
+        if (bounded.equals(IsBounded.UNBOUNDED)
+            && !options.isShutdownUnboundedProducersWithMaxWatermark()) {
+          return false;
+        }
+      }
     }
+    // The PTransform's watermark was at positive infinity and all of its outputs are known to be
+    // done. It is done.
     return true;
   }
 
-  private boolean containsUnboundedPCollection() {
+  /**
+   * Returns true if all steps are done.
+   */
+  public boolean isDone() {
     for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
-      for (PValue value : transform.getInput().expand()) {
-        if (value instanceof PCollection
-            && ((PCollection<?>) value).isBounded().equals(IsBounded.UNBOUNDED)) {
-          return true;
-        }
+      if (!isDone(transform)) {
+        return false;
       }
     }
-    return false;
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35df1668/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
index 1490960..1a0f505 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
@@ -24,12 +24,14 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
 import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import com.google.cloud.dataflow.sdk.io.CountingInput;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.Create;
@@ -58,6 +60,7 @@ import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
 import com.google.cloud.dataflow.sdk.values.PCollectionView;
 import com.google.cloud.dataflow.sdk.values.PValue;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
@@ -80,25 +83,33 @@ import java.util.concurrent.TimeUnit;
 public class InProcessEvaluationContextTest {
   private TestPipeline p;
   private InProcessEvaluationContext context;
+
   private PCollection<Integer> created;
   private PCollection<KV<String, Integer>> downstream;
   private PCollectionView<Iterable<Integer>> view;
+  private PCollection<Long> unbounded;
+
 
   @Before
   public void setup() {
     InProcessPipelineRunner runner =
         InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create());
+
     p = TestPipeline.create();
+
     created = p.apply(Create.of(1, 2, 3));
     downstream = created.apply(WithKeys.<String, Integer>of("foo"));
     view = created.apply(View.<Integer>asIterable());
+    unbounded = p.apply(CountingInput.unbounded());
     Collection<AppliedPTransform<?, ?, ?>> rootTransforms =
-        ImmutableList.<AppliedPTransform<?, ?, ?>>of(created.getProducingTransformInternal());
+        ImmutableList.<AppliedPTransform<?, ?, ?>>of(
+            created.getProducingTransformInternal(), unbounded.getProducingTransformInternal());
     Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers = new HashMap<>();
     valueToConsumers.put(
         created,
         ImmutableList.<AppliedPTransform<?, ?, ?>>of(
             downstream.getProducingTransformInternal(), view.getProducingTransformInternal()));
+    valueToConsumers.put(unbounded, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
     valueToConsumers.put(downstream, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
     valueToConsumers.put(view, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
 
@@ -106,6 +117,7 @@ public class InProcessEvaluationContextTest {
     stepNames.put(created.getProducingTransformInternal(), "s1");
     stepNames.put(downstream.getProducingTransformInternal(), "s2");
     stepNames.put(view.getProducingTransformInternal(), "s3");
+    stepNames.put(unbounded.getProducingTransformInternal(), "s4");
 
     Collection<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(view);
     context = InProcessEvaluationContext.create(
@@ -421,6 +433,102 @@ public class InProcessEvaluationContextTest {
     assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo"));
   }
 
+  @Test
+  public void isDoneWithUnboundedPCollectionAndShutdown() {
+    context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
+    assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
+
+    context.handleResult(
+        null,
+        ImmutableList.<TimerData>of(),
+        StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+    assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(true));
+  }
+
+  @Test
+  public void isDoneWithUnboundedPCollectionAndNotShutdown() {
+    context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
+    assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
+
+    context.handleResult(
+        null,
+        ImmutableList.<TimerData>of(),
+        StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+    assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
+  }
+
+  @Test
+  public void isDoneWithOnlyBoundedPCollections() {
+    context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
+    assertThat(context.isDone(created.getProducingTransformInternal()), is(false));
+
+    context.handleResult(
+        null,
+        ImmutableList.<TimerData>of(),
+        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
+    assertThat(context.isDone(created.getProducingTransformInternal()), is(true));
+  }
+
+  @Test
+  public void isDoneWithPartiallyDone() {
+    context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
+    assertThat(context.isDone(), is(false));
+
+    UncommittedBundle<Integer> rootBundle = context.createRootBundle(created);
+    rootBundle.add(WindowedValue.valueInGlobalWindow(1));
+    Iterable<? extends CommittedBundle<?>> handleResult =
+        context.handleResult(
+            null,
+            ImmutableList.<TimerData>of(),
+            StepTransformResult.withoutHold(created.getProducingTransformInternal())
+                .addOutput(rootBundle)
+                .build());
+    @SuppressWarnings("unchecked")
+    CommittedBundle<Integer> committedBundle =
+        (CommittedBundle<Integer>) Iterables.getOnlyElement(handleResult);
+    context.handleResult(
+        null,
+        ImmutableList.<TimerData>of(),
+        StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+    context.handleResult(
+        committedBundle,
+        ImmutableList.<TimerData>of(),
+        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
+    assertThat(context.isDone(), is(false));
+
+    context.handleResult(
+        committedBundle,
+        ImmutableList.<TimerData>of(),
+        StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
+    assertThat(context.isDone(), is(true));
+  }
+
+  @Test
+  public void isDoneWithUnboundedAndNotShutdown() {
+    context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
+    assertThat(context.isDone(), is(false));
+
+    context.handleResult(
+        null,
+        ImmutableList.<TimerData>of(),
+        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
+    context.handleResult(
+        null,
+        ImmutableList.<TimerData>of(),
+        StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+    context.handleResult(
+        context.createRootBundle(created).commit(Instant.now()),
+        ImmutableList.<TimerData>of(),
+        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
+    assertThat(context.isDone(), is(false));
+
+    context.handleResult(
+        context.createRootBundle(created).commit(Instant.now()),
+        ImmutableList.<TimerData>of(),
+        StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
+    assertThat(context.isDone(), is(false));
+  }
+
   private static class TestBoundedWindow extends BoundedWindow {
     private final Instant ts;
 


[2/2] incubator-beam git commit: This closes #64

Posted by bc...@apache.org.
This closes #64


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9247ad78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9247ad78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9247ad78

Branch: refs/heads/master
Commit: 9247ad78dc1b63605b52c34f67cf292ac8dedb98
Parents: 619db54 35df166
Author: bchambers <bc...@google.com>
Authored: Wed Mar 23 14:56:32 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Mar 23 14:56:32 2016 -0700

----------------------------------------------------------------------
 .../ExecutorServiceParallelExecutor.java        |  76 +++++++++----
 .../inprocess/InMemoryWatermarkManager.java     |  16 ---
 .../inprocess/InProcessEvaluationContext.java   |  46 ++++++--
 .../InProcessEvaluationContextTest.java         | 110 ++++++++++++++++++-
 4 files changed, 200 insertions(+), 48 deletions(-)
----------------------------------------------------------------------