You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/18 04:38:11 UTC

[1/2] incubator-beam git commit: Add ExecutorServiceParallelExecutor as an InProcessExecutor

Repository: incubator-beam
Updated Branches:
  refs/heads/master c30326007 -> d39346823


Add ExecutorServiceParallelExecutor as an InProcessExecutor

This is responsible for scheduling transform evaluations and
communicating results back to the evaluation context. The executor
handle PTransforms that block arbitarily waiting for additional input.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/636d53a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/636d53a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/636d53a4

Branch: refs/heads/master
Commit: 636d53a433cc30df427199a7929f00b96d55fd1c
Parents: c303260
Author: Thomas Groh <tg...@google.com>
Authored: Fri Feb 26 17:29:43 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Mar 17 20:37:29 2016 -0700

----------------------------------------------------------------------
 .../runners/inprocess/CompletionCallback.java   |  33 ++
 .../ExecutorServiceParallelExecutor.java        | 394 +++++++++++++++++++
 .../inprocess/InMemoryWatermarkManager.java     |   2 +-
 .../inprocess/InProcessEvaluationContext.java   |  21 +-
 .../runners/inprocess/InProcessExecutor.java    |  46 +++
 .../inprocess/InProcessPipelineOptions.java     |   7 +-
 .../inprocess/InProcessPipelineRunner.java      |  25 --
 .../runners/inprocess/TransformExecutor.java    | 114 ++++++
 .../inprocess/TransformExecutorService.java     |  34 ++
 .../inprocess/TransformExecutorServices.java    | 153 +++++++
 .../TransformExecutorServicesTest.java          | 134 +++++++
 .../inprocess/TransformExecutorTest.java        | 312 +++++++++++++++
 12 files changed, 1247 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/636d53a4/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java
new file mode 100644
index 0000000..2792631
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/CompletionCallback.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+
+/**
+ * A callback for completing a bundle of input.
+ */
+interface CompletionCallback {
+  /**
+   * Handle a successful result.
+   */
+  void handleResult(CommittedBundle<?> inputBundle, InProcessTransformResult result);
+
+  /**
+   * Handle a result that terminated abnormally due to the provided {@link Throwable}.
+   */
+  void handleThrowable(CommittedBundle<?> inputBundle, Throwable t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/636d53a4/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
new file mode 100644
index 0000000..ae686f2
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
@@ -0,0 +1,394 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
+import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
+import com.google.cloud.dataflow.sdk.util.TimeDomain;
+import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+
+import javax.annotation.Nullable;
+
+/**
+ * An {@link InProcessExecutor} that uses an underlying {@link ExecutorService} and
+ * {@link InProcessEvaluationContext} to execute a {@link Pipeline}.
+ */
+final class ExecutorServiceParallelExecutor implements InProcessExecutor {
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
+
+  private final ExecutorService executorService;
+
+  private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
+  private final Set<PValue> keyedPValues;
+  private final TransformEvaluatorRegistry registry;
+  private final InProcessEvaluationContext evaluationContext;
+
+  private final ConcurrentMap<StepAndKey, TransformExecutorService> currentEvaluations;
+  private final ConcurrentMap<TransformExecutor<?>, Boolean> scheduledExecutors;
+
+  private final Queue<ExecutorUpdate> allUpdates;
+  private final BlockingQueue<VisibleExecutorUpdate> visibleUpdates;
+
+  private final TransformExecutorService parallelExecutorService;
+  private final CompletionCallback defaultCompletionCallback;
+
+  private Collection<AppliedPTransform<?, ?, ?>> rootNodes;
+
+  public static ExecutorServiceParallelExecutor create(
+      ExecutorService executorService,
+      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
+      Set<PValue> keyedPValues,
+      TransformEvaluatorRegistry registry,
+      InProcessEvaluationContext context) {
+    return new ExecutorServiceParallelExecutor(
+        executorService, valueToConsumers, keyedPValues, registry, context);
+  }
+
+  private ExecutorServiceParallelExecutor(
+      ExecutorService executorService,
+      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
+      Set<PValue> keyedPValues,
+      TransformEvaluatorRegistry registry,
+      InProcessEvaluationContext context) {
+    this.executorService = executorService;
+    this.valueToConsumers = valueToConsumers;
+    this.keyedPValues = keyedPValues;
+    this.registry = registry;
+    this.evaluationContext = context;
+
+    currentEvaluations = new ConcurrentHashMap<>();
+    scheduledExecutors = new ConcurrentHashMap<>();
+
+    this.allUpdates = new ConcurrentLinkedQueue<>();
+    this.visibleUpdates = new ArrayBlockingQueue<>(20);
+
+    parallelExecutorService =
+        TransformExecutorServices.parallel(executorService, scheduledExecutors);
+    defaultCompletionCallback = new DefaultCompletionCallback();
+  }
+
+  @Override
+  public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
+    rootNodes = ImmutableList.copyOf(roots);
+    Runnable monitorRunnable = new MonitorRunnable();
+    executorService.submit(monitorRunnable);
+  }
+
+  @SuppressWarnings("unchecked")
+  public void scheduleConsumption(
+      AppliedPTransform<?, ?, ?> consumer,
+      @Nullable CommittedBundle<?> bundle,
+      CompletionCallback onComplete) {
+    evaluateBundle(consumer, bundle, onComplete);
+  }
+
+  private <T> void evaluateBundle(
+      final AppliedPTransform<?, ?, ?> transform,
+      @Nullable final CommittedBundle<T> bundle,
+      final CompletionCallback onComplete) {
+    TransformExecutorService transformExecutor;
+    if (isKeyed(bundle.getPCollection())) {
+      final StepAndKey stepAndKey =
+          StepAndKey.of(transform, bundle == null ? null : bundle.getKey());
+      transformExecutor = getSerialExecutorService(stepAndKey);
+    } else {
+      transformExecutor = parallelExecutorService;
+    }
+    TransformExecutor<T> callable =
+        TransformExecutor.create(
+            registry, evaluationContext, bundle, transform, onComplete, transformExecutor);
+    transformExecutor.schedule(callable);
+  }
+
+  private boolean isKeyed(PValue pvalue) {
+    return keyedPValues.contains(pvalue);
+  }
+
+  private void scheduleConsumers(CommittedBundle<?> bundle) {
+    for (AppliedPTransform<?, ?, ?> consumer : valueToConsumers.get(bundle.getPCollection())) {
+      scheduleConsumption(consumer, bundle, defaultCompletionCallback);
+    }
+  }
+
+  private TransformExecutorService getSerialExecutorService(StepAndKey stepAndKey) {
+    if (!currentEvaluations.containsKey(stepAndKey)) {
+      currentEvaluations.putIfAbsent(
+          stepAndKey, TransformExecutorServices.serial(executorService, scheduledExecutors));
+    }
+    return currentEvaluations.get(stepAndKey);
+  }
+
+  @Override
+  public void awaitCompletion() throws Throwable {
+    VisibleExecutorUpdate update;
+    do {
+      update = visibleUpdates.take();
+      if (update.throwable.isPresent()) {
+        throw update.throwable.get();
+      }
+    } while (!update.isDone());
+    executorService.shutdown();
+  }
+
+  /**
+   * The default {@link CompletionCallback}. The default completion callback is used to complete
+   * transform evaluations that are triggered due to the arrival of elements from an upstream
+   * transform, or for a source transform.
+   */
+  private class DefaultCompletionCallback implements CompletionCallback {
+    @Override
+    public void handleResult(CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+      Iterable<? extends CommittedBundle<?>> resultBundles =
+          evaluationContext.handleResult(inputBundle, Collections.<TimerData>emptyList(), result);
+      for (CommittedBundle<?> outputBundle : resultBundles) {
+        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
+      }
+    }
+
+    @Override
+    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
+      allUpdates.offer(ExecutorUpdate.fromThrowable(t));
+    }
+  }
+
+  /**
+   * A {@link CompletionCallback} where the completed bundle was produced to deliver some collection
+   * of {@link TimerData timers}. When the evaluator completes successfully, reports all of the
+   * timers used to create the input to the {@link InProcessEvaluationContext evaluation context}
+   * as part of the result.
+   */
+  private class TimerCompletionCallback implements CompletionCallback {
+    private final Iterable<TimerData> timers;
+
+    private TimerCompletionCallback(Iterable<TimerData> timers) {
+      this.timers = timers;
+    }
+
+    @Override
+    public void handleResult(CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+      Iterable<? extends CommittedBundle<?>> resultBundles =
+          evaluationContext.handleResult(inputBundle, timers, result);
+      for (CommittedBundle<?> outputBundle : resultBundles) {
+        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
+      }
+    }
+
+    @Override
+    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
+      allUpdates.offer(ExecutorUpdate.fromThrowable(t));
+    }
+  }
+
+  /**
+   * An internal status update on the state of the executor.
+   *
+   * Used to signal when the executor should be shut down (due to an exception).
+   */
+  private static class ExecutorUpdate {
+    private final Optional<? extends CommittedBundle<?>> bundle;
+    private final Optional<? extends Throwable> throwable;
+
+    public static ExecutorUpdate fromBundle(CommittedBundle<?> bundle) {
+      return new ExecutorUpdate(bundle, null);
+    }
+
+    public static ExecutorUpdate fromThrowable(Throwable t) {
+      return new ExecutorUpdate(null, t);
+    }
+
+    private ExecutorUpdate(CommittedBundle<?> producedBundle, Throwable throwable) {
+      this.bundle = Optional.fromNullable(producedBundle);
+      this.throwable = Optional.fromNullable(throwable);
+    }
+
+    public Optional<? extends CommittedBundle<?>> getBundle() {
+      return bundle;
+    }
+
+    public Optional<? extends Throwable> getException() {
+      return throwable;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(ExecutorUpdate.class)
+          .add("bundle", bundle)
+          .add("exception", throwable)
+          .toString();
+    }
+  }
+
+  /**
+   * An update of interest to the user. Used in {@link #awaitCompletion} to decide whether to
+   * return normally or throw an exception.
+   */
+  private static class VisibleExecutorUpdate {
+    private final Optional<? extends Throwable> throwable;
+    private final boolean done;
+
+    public static VisibleExecutorUpdate fromThrowable(Throwable e) {
+      return new VisibleExecutorUpdate(false, e);
+    }
+
+    public static VisibleExecutorUpdate finished() {
+      return new VisibleExecutorUpdate(true, null);
+    }
+
+    private VisibleExecutorUpdate(boolean done, @Nullable Throwable exception) {
+      this.throwable = Optional.fromNullable(exception);
+      this.done = done;
+    }
+
+    public boolean isDone() {
+      return done;
+    }
+  }
+
+  private class MonitorRunnable implements Runnable {
+    private final String runnableName =
+        String.format(
+            "%s$%s-monitor",
+            evaluationContext.getPipelineOptions().getAppName(),
+            ExecutorServiceParallelExecutor.class.getSimpleName());
+
+    @Override
+    public void run() {
+      String oldName = Thread.currentThread().getName();
+      Thread.currentThread().setName(runnableName);
+      try {
+        ExecutorUpdate update = allUpdates.poll();
+        if (update != null) {
+          LOG.debug("Executor Update: {}", update);
+          if (update.getBundle().isPresent()) {
+            scheduleConsumers(update.getBundle().get());
+          } else if (update.getException().isPresent()) {
+            visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
+          }
+        }
+        fireTimers();
+        mightNeedMoreWork();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOG.error("Monitor died due to being interrupted");
+        while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(e))) {
+          visibleUpdates.poll();
+        }
+      } catch (Throwable t) {
+        LOG.error("Monitor thread died due to throwable", t);
+        while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(t))) {
+          visibleUpdates.poll();
+        }
+      } finally {
+        if (!shouldShutdown()) {
+          // The monitor thread should always be scheduled; but we only need to be scheduled once
+          executorService.submit(this);
+        }
+        Thread.currentThread().setName(oldName);
+      }
+    }
+
+    private void fireTimers() throws Exception {
+      try {
+        for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> transformTimers :
+            evaluationContext.extractFiredTimers().entrySet()) {
+          AppliedPTransform<?, ?, ?> transform = transformTimers.getKey();
+          for (Map.Entry<Object, FiredTimers> keyTimers : transformTimers.getValue().entrySet()) {
+            for (TimeDomain domain : TimeDomain.values()) {
+              Collection<TimerData> delivery = keyTimers.getValue().getTimers(domain);
+              if (delivery.isEmpty()) {
+                continue;
+              }
+              KeyedWorkItem<Object, Object> work =
+                  KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery);
+              @SuppressWarnings({"unchecked", "rawtypes"})
+              CommittedBundle<?> bundle =
+                  InProcessBundle.<KeyedWorkItem<Object, Object>>keyed(
+                          (PCollection) transform.getInput(), keyTimers.getKey())
+                      .add(WindowedValue.valueInEmptyWindows(work))
+                      .commit(Instant.now());
+              scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery));
+            }
+          }
+        }
+      } catch (Exception e) {
+        LOG.error("Internal Error while delivering timers", e);
+        throw e;
+      }
+    }
+
+    private boolean shouldShutdown() {
+      if (evaluationContext.isDone()) {
+        LOG.debug("Pipeline is finished. Shutting down. {}");
+        while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) {
+          visibleUpdates.poll();
+        }
+        executorService.shutdown();
+        return true;
+      }
+      return false;
+    }
+
+    private void mightNeedMoreWork() {
+      synchronized (scheduledExecutors) {
+        for (TransformExecutor<?> executor : scheduledExecutors.keySet()) {
+          Thread thread = executor.getThread();
+          if (thread != null) {
+            switch (thread.getState()) {
+              case BLOCKED:
+              case WAITING:
+              case TERMINATED:
+              case TIMED_WAITING:
+                break;
+              default:
+                return;
+            }
+          }
+        }
+      }
+      // All current TransformExecutors are blocked; add more work from the roots.
+      for (AppliedPTransform<?, ?, ?> root : rootNodes) {
+        scheduleConsumption(root, null, defaultCompletionCallback);
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/636d53a4/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
index 7cf53aa..094526d 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
@@ -866,7 +866,7 @@ public class InMemoryWatermarkManager {
    * {@link #getWatermarks(AppliedPTransform)}, the output watermark will be equal to
    * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
    */
-  public boolean isDone() {
+  public boolean allWatermarksAtPositiveInfinity() {
     for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry :
         transformToWatermarks.entrySet()) {
       Instant endOfTime = THE_END_OF_TIME.get();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/636d53a4/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
index 757e9e1..2908fba 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
@@ -36,6 +36,7 @@ import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
 import com.google.cloud.dataflow.sdk.util.common.CounterSet;
 import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
 import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
 import com.google.cloud.dataflow.sdk.values.PCollectionView;
 import com.google.cloud.dataflow.sdk.values.PValue;
 import com.google.common.collect.ImmutableList;
@@ -359,6 +360,24 @@ class InProcessEvaluationContext {
    * Returns true if all steps are done.
    */
   public boolean isDone() {
-    return watermarkManager.isDone();
+    if (!options.isShutdownUnboundedProducersWithMaxWatermark() && containsUnboundedPCollection()) {
+      return false;
+    }
+    if (!watermarkManager.allWatermarksAtPositiveInfinity()) {
+      return false;
+    }
+    return true;
+  }
+
+  private boolean containsUnboundedPCollection() {
+    for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
+      for (PValue value : transform.getInput().expand()) {
+        if (value instanceof PCollection
+            && ((PCollection<?>) value).isBounded().equals(IsBounded.UNBOUNDED)) {
+          return true;
+        }
+      }
+    }
+    return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/636d53a4/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutor.java
new file mode 100644
index 0000000..7b60bca
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutor.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+
+import java.util.Collection;
+
+/**
+ * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both
+ * source and intermediate {@link PTransform PTransforms}.
+ */
+interface InProcessExecutor {
+  /**
+   * Starts this executor. The provided collection is the collection of root transforms to
+   * initially schedule.
+   *
+   * @param rootTransforms
+   */
+  void start(Collection<AppliedPTransform<?, ?, ?>> rootTransforms);
+
+  /**
+   * Blocks until the job being executed enters a terminal state. A job is completed after all
+   * root {@link AppliedPTransform AppliedPTransforms} have completed, and all
+   * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally.
+   *
+   * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the
+   *                   waiting thread and rethrows it
+   */
+  void awaitCompletion() throws Throwable;
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/636d53a4/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
index 60c8543..27e9a4b 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
@@ -15,15 +15,20 @@
  */
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
+import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions;
 import com.google.cloud.dataflow.sdk.options.Default;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 
 /**
  * Options that can be used to configure the {@link InProcessPipelineRunner}.
  */
-public interface InProcessPipelineOptions extends PipelineOptions {
+public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNameOptions {
   @Default.InstanceFactory(NanosOffsetClock.Factory.class)
   Clock getClock();
 
   void setClock(Clock clock);
+
+  boolean isShutdownUnboundedProducersWithMaxWatermark();
+
+  void setShutdownUnboundedProducersWithMaxWatermark(boolean shutdown);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/636d53a4/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index 7a268ee..32859da 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -21,7 +21,6 @@ import com.google.cloud.dataflow.sdk.annotations.Experimental;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKey;
 import com.google.cloud.dataflow.sdk.runners.inprocess.ViewEvaluatorFactory.InProcessCreatePCollectionView;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
@@ -167,28 +166,4 @@ public class InProcessPipelineRunner {
   public InProcessPipelineOptions getPipelineOptions() {
     return options;
   }
-
-  /**
-   * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both
-   * source and intermediate {@link PTransform PTransforms}.
-   */
-  public static interface InProcessExecutor {
-    /**
-     * @param root the root {@link AppliedPTransform} to schedule
-     */
-    void scheduleRoot(AppliedPTransform<?, ?, ?> root);
-
-    /**
-     * @param consumer the {@link AppliedPTransform} to schedule
-     * @param bundle the input bundle to the consumer
-     */
-    void scheduleConsumption(AppliedPTransform<?, ?, ?> consumer, CommittedBundle<?> bundle);
-
-    /**
-     * Blocks until the job being executed enters a terminal state. A job is completed after all
-     * root {@link AppliedPTransform AppliedPTransforms} have completed, and all
-     * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally.
-     */
-    void awaitCompletion();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/636d53a4/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java
new file mode 100644
index 0000000..d630749
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.common.base.Throwables;
+
+import java.util.concurrent.Callable;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link Callable} responsible for constructing a {@link TransformEvaluator} from a
+ * {@link TransformEvaluatorFactory} and evaluating it on some bundle of input, and registering
+ * the result using a registered {@link CompletionCallback}.
+ *
+ * <p>A {@link TransformExecutor} that is currently executing also provides access to the thread
+ * that it is being executed on.
+ */
+class TransformExecutor<T> implements Callable<InProcessTransformResult> {
+  public static <T> TransformExecutor<T> create(
+      TransformEvaluatorFactory factory,
+      InProcessEvaluationContext evaluationContext,
+      CommittedBundle<T> inputBundle,
+      AppliedPTransform<?, ?, ?> transform,
+      CompletionCallback completionCallback,
+      TransformExecutorService transformEvaluationState) {
+    return new TransformExecutor<>(
+        factory,
+        evaluationContext,
+        inputBundle,
+        transform,
+        completionCallback,
+        transformEvaluationState);
+  }
+
+  private final TransformEvaluatorFactory evaluatorFactory;
+  private final InProcessEvaluationContext evaluationContext;
+
+  /** The transform that will be evaluated. */
+  private final AppliedPTransform<?, ?, ?> transform;
+  /** The inputs this {@link TransformExecutor} will deliver to the transform. */
+  private final CommittedBundle<T> inputBundle;
+
+  private final CompletionCallback onComplete;
+  private final TransformExecutorService transformEvaluationState;
+
+  private Thread thread;
+
+  private TransformExecutor(
+      TransformEvaluatorFactory factory,
+      InProcessEvaluationContext evaluationContext,
+      CommittedBundle<T> inputBundle,
+      AppliedPTransform<?, ?, ?> transform,
+      CompletionCallback completionCallback,
+      TransformExecutorService transformEvaluationState) {
+    this.evaluatorFactory = factory;
+    this.evaluationContext = evaluationContext;
+
+    this.inputBundle = inputBundle;
+    this.transform = transform;
+
+    this.onComplete = completionCallback;
+
+    this.transformEvaluationState = transformEvaluationState;
+  }
+
+  @Override
+  public InProcessTransformResult call() {
+    this.thread = Thread.currentThread();
+    try {
+      TransformEvaluator<T> evaluator =
+          evaluatorFactory.forApplication(transform, inputBundle, evaluationContext);
+      if (inputBundle != null) {
+        for (WindowedValue<T> value : inputBundle.getElements()) {
+          evaluator.processElement(value);
+        }
+      }
+      InProcessTransformResult result = evaluator.finishBundle();
+      onComplete.handleResult(inputBundle, result);
+      return result;
+    } catch (Throwable t) {
+      onComplete.handleThrowable(inputBundle, t);
+      throw Throwables.propagate(t);
+    } finally {
+      this.thread = null;
+      transformEvaluationState.complete(this);
+    }
+  }
+
+  /**
+   * If this {@link TransformExecutor} is currently executing, return the thread it is executing in.
+   * Otherwise, return null.
+   */
+  @Nullable
+  public Thread getThread() {
+    return this.thread;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/636d53a4/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java
new file mode 100644
index 0000000..3f00da6
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+/**
+ * Schedules and completes {@link TransformExecutor TransformExecutors}, controlling concurrency as
+ * appropriate for the {@link StepAndKey} the executor exists for.
+ */
+interface TransformExecutorService {
+  /**
+   * Schedule the provided work to be eventually executed.
+   */
+  void schedule(TransformExecutor<?> work);
+
+  /**
+   * Finish executing the provided work. This may cause additional
+   * {@link TransformExecutor TransformExecutors} to be evaluated.
+   */
+  void complete(TransformExecutor<?> completed);
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/636d53a4/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java
new file mode 100644
index 0000000..34efdf6
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.common.base.MoreObjects;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Static factory methods for constructing instances of {@link TransformExecutorService}.
+ */
+final class TransformExecutorServices {
+  private TransformExecutorServices() {
+    // Do not instantiate
+  }
+
+  /**
+   * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
+   * parallel.
+   */
+  public static TransformExecutorService parallel(
+      ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
+    return new ParallelEvaluationState(executor, scheduled);
+  }
+
+  /**
+   * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
+   * serial.
+   */
+  public static TransformExecutorService serial(
+      ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
+    return new SerialEvaluationState(executor, scheduled);
+  }
+
+  /**
+   * A {@link TransformExecutorService} with unlimited parallelism. Any {@link TransformExecutor}
+   * scheduled will be immediately submitted to the {@link ExecutorService}.
+   *
+   * <p>A principal use of this is for the evaluation of an unkeyed Step. Unkeyed computations are
+   * processed in parallel.
+   */
+  private static class ParallelEvaluationState implements TransformExecutorService {
+    private final ExecutorService executor;
+    private final Map<TransformExecutor<?>, Boolean> scheduled;
+
+    private ParallelEvaluationState(
+        ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
+      this.executor = executor;
+      this.scheduled = scheduled;
+    }
+
+    @Override
+    public void schedule(TransformExecutor<?> work) {
+      executor.submit(work);
+      scheduled.put(work, true);
+    }
+
+    @Override
+    public void complete(TransformExecutor<?> completed) {
+      scheduled.remove(completed);
+    }
+  }
+
+  /**
+   * A {@link TransformExecutorService} with a single work queue. Any {@link TransformExecutor}
+   * scheduled will be placed on the work queue. Only one item of work will be submitted to the
+   * {@link ExecutorService} at any time.
+   *
+   * <p>A principal use of this is for the serial evaluation of a (Step, Key) pair.
+   * Keyed computations are processed serially per step.
+   */
+  private static class SerialEvaluationState implements TransformExecutorService {
+    private final ExecutorService executor;
+    private final Map<TransformExecutor<?>, Boolean> scheduled;
+
+    private AtomicReference<TransformExecutor<?>> currentlyEvaluating;
+    private final Queue<TransformExecutor<?>> workQueue;
+
+    private SerialEvaluationState(
+        ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
+      this.scheduled = scheduled;
+      this.executor = executor;
+      this.currentlyEvaluating = new AtomicReference<>();
+      this.workQueue = new ConcurrentLinkedQueue<>();
+    }
+
+    /**
+     * Schedules the work, adding it to the work queue if there is a bundle currently being
+     * evaluated and scheduling it immediately otherwise.
+     */
+    @Override
+    public void schedule(TransformExecutor<?> work) {
+      workQueue.offer(work);
+      updateCurrentlyEvaluating();
+    }
+
+    @Override
+    public void complete(TransformExecutor<?> completed) {
+      if (!currentlyEvaluating.compareAndSet(completed, null)) {
+        throw new IllegalStateException(
+            "Finished work "
+                + completed
+                + " but could not complete due to unexpected currently executing "
+                + currentlyEvaluating.get());
+      }
+      scheduled.remove(completed);
+      updateCurrentlyEvaluating();
+    }
+
+    private void updateCurrentlyEvaluating() {
+      if (currentlyEvaluating.get() == null) {
+        // Only synchronize if we need to update what's currently evaluating
+        synchronized (this) {
+          TransformExecutor<?> newWork = workQueue.poll();
+          if (newWork != null) {
+            if (currentlyEvaluating.compareAndSet(null, newWork)) {
+              scheduled.put(newWork, true);
+              executor.submit(newWork);
+            } else {
+              workQueue.offer(newWork);
+            }
+          }
+        }
+      }
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(SerialEvaluationState.class)
+          .add("currentlyEvaluating", currentlyEvaluating)
+          .add("workQueue", workQueue)
+          .toString();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/636d53a4/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServicesTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServicesTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServicesTest.java
new file mode 100644
index 0000000..2c66dc2
--- /dev/null
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServicesTest.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.any;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Tests for {@link TransformExecutorServices}.
+ */
+@RunWith(JUnit4.class)
+public class TransformExecutorServicesTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private ExecutorService executorService;
+  private Map<TransformExecutor<?>, Boolean> scheduled;
+
+  @Before
+  public void setup() {
+    executorService = MoreExecutors.newDirectExecutorService();
+    scheduled = new ConcurrentHashMap<>();
+  }
+
+  @Test
+  public void parallelScheduleMultipleSchedulesBothImmediately() {
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> first = mock(TransformExecutor.class);
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> second = mock(TransformExecutor.class);
+
+    TransformExecutorService parallel =
+        TransformExecutorServices.parallel(executorService, scheduled);
+    parallel.schedule(first);
+    parallel.schedule(second);
+
+    verify(first).call();
+    verify(second).call();
+    assertThat(
+        scheduled,
+        Matchers.allOf(
+            Matchers.<TransformExecutor<?>, Boolean>hasEntry(first, true),
+            Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true)));
+
+    parallel.complete(first);
+    assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true));
+    assertThat(
+        scheduled,
+        not(
+            Matchers.<TransformExecutor<?>, Boolean>hasEntry(
+                Matchers.<TransformExecutor<?>>equalTo(first), any(Boolean.class))));
+    parallel.complete(second);
+    assertThat(scheduled.isEmpty(), is(true));
+  }
+
+  @Test
+  public void serialScheduleTwoWaitsForFirstToComplete() {
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> first = mock(TransformExecutor.class);
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> second = mock(TransformExecutor.class);
+
+    TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled);
+    serial.schedule(first);
+    verify(first).call();
+
+    serial.schedule(second);
+    verify(second, never()).call();
+
+    assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(first, true));
+    assertThat(
+        scheduled,
+        not(
+            Matchers.<TransformExecutor<?>, Boolean>hasEntry(
+                Matchers.<TransformExecutor<?>>equalTo(second), any(Boolean.class))));
+
+    serial.complete(first);
+    verify(second).call();
+    assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true));
+    assertThat(
+        scheduled,
+        not(
+            Matchers.<TransformExecutor<?>, Boolean>hasEntry(
+                Matchers.<TransformExecutor<?>>equalTo(first), any(Boolean.class))));
+
+    serial.complete(second);
+  }
+
+  @Test
+  public void serialCompleteNotExecutingTaskThrows() {
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> first = mock(TransformExecutor.class);
+    @SuppressWarnings("unchecked")
+    TransformExecutor<Object> second = mock(TransformExecutor.class);
+
+    TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled);
+    serial.schedule(first);
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("unexpected currently executing");
+
+    serial.complete(second);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/636d53a4/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
new file mode 100644
index 0000000..bd63252
--- /dev/null
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
@@ -0,0 +1,312 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.WithKeys;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Tests for {@link TransformExecutor}.
+ */
+@RunWith(JUnit4.class)
+public class TransformExecutorTest {
+  private PCollection<String> created;
+  private PCollection<KV<Integer, String>> downstream;
+
+  private CountDownLatch evaluatorCompleted;
+
+  private RegisteringCompletionCallback completionCallback;
+  private TransformExecutorService transformEvaluationState;
+  @Mock private InProcessEvaluationContext evaluationContext;
+  @Mock private TransformEvaluatorRegistry registry;
+  private Map<TransformExecutor<?>, Boolean> scheduled;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+
+    scheduled = new HashMap<>();
+    transformEvaluationState =
+        TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService(), scheduled);
+
+    evaluatorCompleted = new CountDownLatch(1);
+    completionCallback = new RegisteringCompletionCallback(evaluatorCompleted);
+
+    TestPipeline p = TestPipeline.create();
+    created = p.apply(Create.of("foo", "spam", "third"));
+    downstream = created.apply(WithKeys.<Integer, String>of(3));
+  }
+
+  @Test
+  public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception {
+    final InProcessTransformResult result =
+        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+    final AtomicBoolean finishCalled = new AtomicBoolean(false);
+    TransformEvaluator<Object> evaluator =
+        new TransformEvaluator<Object>() {
+          @Override
+          public void processElement(WindowedValue<Object> element) throws Exception {
+            throw new IllegalArgumentException("Shouldn't be called");
+          }
+
+          @Override
+          public InProcessTransformResult finishBundle() throws Exception {
+            finishCalled.set(true);
+            return result;
+          }
+        };
+
+    when(registry.forApplication(created.getProducingTransformInternal(), null, evaluationContext))
+        .thenReturn(evaluator);
+
+    TransformExecutor<Object> executor =
+        TransformExecutor.create(
+            registry,
+            evaluationContext,
+            null,
+            created.getProducingTransformInternal(),
+            completionCallback,
+            transformEvaluationState);
+    executor.call();
+
+    assertThat(finishCalled.get(), is(true));
+    assertThat(completionCallback.handledResult, equalTo(result));
+    assertThat(completionCallback.handledThrowable, is(nullValue()));
+    assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
+  }
+
+  @Test
+  public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception {
+    final InProcessTransformResult result =
+        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+    final Collection<WindowedValue<String>> elementsProcessed = new ArrayList<>();
+    TransformEvaluator<String> evaluator =
+        new TransformEvaluator<String>() {
+          @Override
+          public void processElement(WindowedValue<String> element) throws Exception {
+            elementsProcessed.add(element);
+            return;
+          }
+
+          @Override
+          public InProcessTransformResult finishBundle() throws Exception {
+            return result;
+          }
+        };
+
+    WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo");
+    WindowedValue<String> spam = WindowedValue.valueInGlobalWindow("spam");
+    WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third");
+    CommittedBundle<String> inputBundle =
+        InProcessBundle.unkeyed(created).add(foo).add(spam).add(third).commit(Instant.now());
+    when(
+            registry.<String>forApplication(
+                downstream.getProducingTransformInternal(), inputBundle, evaluationContext))
+        .thenReturn(evaluator);
+
+    TransformExecutor<String> executor =
+        TransformExecutor.create(
+            registry,
+            evaluationContext,
+            inputBundle,
+            downstream.getProducingTransformInternal(),
+            completionCallback,
+            transformEvaluationState);
+
+    Executors.newSingleThreadExecutor().submit(executor);
+
+    evaluatorCompleted.await();
+
+    assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo));
+    assertThat(completionCallback.handledResult, equalTo(result));
+    assertThat(completionCallback.handledThrowable, is(nullValue()));
+    assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
+  }
+
+  @Test
+  public void processElementThrowsExceptionCallsback() throws Exception {
+    final InProcessTransformResult result =
+        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+    final Exception exception = new Exception();
+    TransformEvaluator<String> evaluator =
+        new TransformEvaluator<String>() {
+          @Override
+          public void processElement(WindowedValue<String> element) throws Exception {
+            throw exception;
+          }
+
+          @Override
+          public InProcessTransformResult finishBundle() throws Exception {
+            return result;
+          }
+        };
+
+    WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo");
+    CommittedBundle<String> inputBundle =
+        InProcessBundle.unkeyed(created).add(foo).commit(Instant.now());
+    when(
+            registry.<String>forApplication(
+                downstream.getProducingTransformInternal(), inputBundle, evaluationContext))
+        .thenReturn(evaluator);
+
+    TransformExecutor<String> executor =
+        TransformExecutor.create(
+            registry,
+            evaluationContext,
+            inputBundle,
+            downstream.getProducingTransformInternal(),
+            completionCallback,
+            transformEvaluationState);
+    Executors.newSingleThreadExecutor().submit(executor);
+
+    evaluatorCompleted.await();
+
+    assertThat(completionCallback.handledResult, is(nullValue()));
+    assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception));
+    assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
+  }
+
+  @Test
+  public void finishBundleThrowsExceptionCallsback() throws Exception {
+    final Exception exception = new Exception();
+    TransformEvaluator<String> evaluator =
+        new TransformEvaluator<String>() {
+          @Override
+          public void processElement(WindowedValue<String> element) throws Exception {}
+
+          @Override
+          public InProcessTransformResult finishBundle() throws Exception {
+            throw exception;
+          }
+        };
+
+    CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(created).commit(Instant.now());
+    when(
+            registry.<String>forApplication(
+                downstream.getProducingTransformInternal(), inputBundle, evaluationContext))
+        .thenReturn(evaluator);
+
+    TransformExecutor<String> executor =
+        TransformExecutor.create(
+            registry,
+            evaluationContext,
+            inputBundle,
+            downstream.getProducingTransformInternal(),
+            completionCallback,
+            transformEvaluationState);
+    Executors.newSingleThreadExecutor().submit(executor);
+
+    evaluatorCompleted.await();
+
+    assertThat(completionCallback.handledResult, is(nullValue()));
+    assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception));
+    assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
+  }
+
+  @Test
+  public void duringCallGetThreadIsNonNull() throws Exception {
+    final InProcessTransformResult result =
+        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+    final CountDownLatch testLatch = new CountDownLatch(1);
+    final CountDownLatch evaluatorLatch = new CountDownLatch(1);
+    TransformEvaluator<Object> evaluator =
+        new TransformEvaluator<Object>() {
+          @Override
+          public void processElement(WindowedValue<Object> element) throws Exception {
+            throw new IllegalArgumentException("Shouldn't be called");
+          }
+
+          @Override
+          public InProcessTransformResult finishBundle() throws Exception {
+            testLatch.countDown();
+            evaluatorLatch.await();
+            return result;
+          }
+        };
+
+    when(registry.forApplication(created.getProducingTransformInternal(), null, evaluationContext))
+        .thenReturn(evaluator);
+
+    TransformExecutor<String> executor =
+        TransformExecutor.create(
+            registry,
+            evaluationContext,
+            null,
+            created.getProducingTransformInternal(),
+            completionCallback,
+            transformEvaluationState);
+
+    Executors.newSingleThreadExecutor().submit(executor);
+    testLatch.await();
+    assertThat(executor.getThread(), not(nullValue()));
+
+    // Finish the execution so everything can get closed down cleanly.
+    evaluatorLatch.countDown();
+  }
+
+  private static class RegisteringCompletionCallback implements CompletionCallback {
+    private InProcessTransformResult handledResult = null;
+    private Throwable handledThrowable = null;
+    private final CountDownLatch onMethod;
+
+    private RegisteringCompletionCallback(CountDownLatch onMethod) {
+      this.onMethod = onMethod;
+    }
+
+    @Override
+    public void handleResult(CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+      handledResult = result;
+      onMethod.countDown();
+    }
+
+    @Override
+    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
+      handledThrowable = t;
+      onMethod.countDown();
+    }
+  }
+}


[2/2] incubator-beam git commit: This closes #53

Posted by ke...@apache.org.
This closes #53


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d3934682
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d3934682
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d3934682

Branch: refs/heads/master
Commit: d3934682313b41688536599da7c19831335f3960
Parents: c303260 636d53a
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Mar 17 20:37:59 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Mar 17 20:37:59 2016 -0700

----------------------------------------------------------------------
 .../runners/inprocess/CompletionCallback.java   |  33 ++
 .../ExecutorServiceParallelExecutor.java        | 394 +++++++++++++++++++
 .../inprocess/InMemoryWatermarkManager.java     |   2 +-
 .../inprocess/InProcessEvaluationContext.java   |  21 +-
 .../runners/inprocess/InProcessExecutor.java    |  46 +++
 .../inprocess/InProcessPipelineOptions.java     |   7 +-
 .../inprocess/InProcessPipelineRunner.java      |  25 --
 .../runners/inprocess/TransformExecutor.java    | 114 ++++++
 .../inprocess/TransformExecutorService.java     |  34 ++
 .../inprocess/TransformExecutorServices.java    | 153 +++++++
 .../TransformExecutorServicesTest.java          | 134 +++++++
 .../inprocess/TransformExecutorTest.java        | 312 +++++++++++++++
 12 files changed, 1247 insertions(+), 28 deletions(-)
----------------------------------------------------------------------