You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/15 19:23:57 UTC

[2/3] incubator-beam git commit: Implement InProcessEvaluationContext

Implement InProcessEvaluationContext

This is the primary "global state" object for the evaluation of a
Pipeline using the InProcessPipelineRunner, and is responsible for
properly routing information about the state of the pipeline to
transform evaluators.

Remove the InProcessEvaluationContext from the InProcessPipelineRunner
class, and implement as a class directly. Fix associated imports.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/53db1597
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/53db1597
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/53db1597

Branch: refs/heads/master
Commit: 53db1597a8f13252a8b59042201dafe061ce53ab
Parents: 7f9270e
Author: Thomas Groh <tg...@google.com>
Authored: Fri Feb 26 17:28:37 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Mar 15 11:22:38 2016 -0700

----------------------------------------------------------------------
 .../inprocess/BoundedReadEvaluatorFactory.java  |   1 -
 .../sdk/runners/inprocess/EvaluatorKey.java     |   1 -
 .../inprocess/FlattenEvaluatorFactory.java      |   1 -
 .../inprocess/GroupByKeyEvaluatorFactory.java   |   1 -
 .../inprocess/InMemoryWatermarkManager.java     |  14 +-
 .../sdk/runners/inprocess/InProcessBundle.java  |  20 +-
 .../inprocess/InProcessEvaluationContext.java   | 364 ++++++++++++++++
 .../inprocess/InProcessPipelineOptions.java     |   7 +-
 .../inprocess/InProcessPipelineRunner.java      | 106 +----
 .../inprocess/InProcessSideInputContainer.java  |  71 ++-
 .../inprocess/ParDoMultiEvaluatorFactory.java   |   1 -
 .../inprocess/ParDoSingleEvaluatorFactory.java  |   1 -
 .../sdk/runners/inprocess/StepAndKey.java       |  68 +++
 .../inprocess/TransformEvaluatorFactory.java    |   1 -
 .../inprocess/TransformEvaluatorRegistry.java   |  72 +++
 .../UnboundedReadEvaluatorFactory.java          |   1 -
 .../runners/inprocess/ViewEvaluatorFactory.java |   1 -
 .../inprocess/WatermarkCallbackExecutor.java    | 143 ++++++
 .../BoundedReadEvaluatorFactoryTest.java        |   2 +-
 .../inprocess/FlattenEvaluatorFactoryTest.java  |   1 -
 .../GroupByKeyEvaluatorFactoryTest.java         |   1 -
 .../inprocess/InMemoryWatermarkManagerTest.java |  12 +
 .../InProcessEvaluationContextTest.java         | 436 +++++++++++++++++++
 .../InProcessSideInputContainerTest.java        |  92 ++--
 .../ParDoMultiEvaluatorFactoryTest.java         |   1 -
 .../ParDoSingleEvaluatorFactoryTest.java        |   1 -
 .../UnboundedReadEvaluatorFactoryTest.java      |   1 -
 .../inprocess/ViewEvaluatorFactoryTest.java     |   1 -
 .../WatermarkCallbackExecutorTest.java          | 126 ++++++
 29 files changed, 1372 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
index 1c02798..2a164c3 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
@@ -18,7 +18,6 @@ package com.google.cloud.dataflow.sdk.runners.inprocess;
 import com.google.cloud.dataflow.sdk.io.Read.Bounded;
 import com.google.cloud.dataflow.sdk.io.Source.Reader;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java
index 745f8f2..307bc5c 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java
@@ -15,7 +15,6 @@
  */
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 
 import java.util.Objects;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
index 1442888..bde1df4 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
@@ -16,7 +16,6 @@
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.Flatten;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
index 0347281..ec63be8 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
@@ -22,7 +22,6 @@ import com.google.cloud.dataflow.sdk.coders.CoderException;
 import com.google.cloud.dataflow.sdk.coders.IterableCoder;
 import com.google.cloud.dataflow.sdk.coders.KvCoder;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult.Builder;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
index e280e22..7cf53aa 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
@@ -1209,8 +1209,11 @@ public class InMemoryWatermarkManager {
        * and deletedTimers.
        */
       public TimerUpdate build() {
-        return new TimerUpdate(key, ImmutableSet.copyOf(completedTimers),
-            ImmutableSet.copyOf(setTimers), ImmutableSet.copyOf(deletedTimers));
+        return new TimerUpdate(
+            key,
+            ImmutableSet.copyOf(completedTimers),
+            ImmutableSet.copyOf(setTimers),
+            ImmutableSet.copyOf(deletedTimers));
       }
     }
 
@@ -1245,6 +1248,13 @@ public class InMemoryWatermarkManager {
       return deletedTimers;
     }
 
+    /**
+     * Returns a {@link TimerUpdate} that is like this one, but with the specified completed timers.
+     */
+    public TimerUpdate withCompletedTimers(Iterable<TimerData> completedTimers) {
+      return new TimerUpdate(this.key, completedTimers, setTimers, deletedTimers);
+    }
+
     @Override
     public int hashCode() {
       return Objects.hash(key, completedTimers, setTimers, deletedTimers);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java
index cc20161..112ba17 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2015 Google Inc.
+ * Copyright (C) 2016 Google Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  * use this file except in compliance with the License. You may obtain a copy of
@@ -22,7 +22,6 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.U
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.common.base.MoreObjects;
-import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.collect.ImmutableList;
 
 import org.joda.time.Instant;
@@ -65,6 +64,11 @@ public final class InProcessBundle<T> implements UncommittedBundle<T> {
   }
 
   @Override
+  public PCollection<T> getPCollection() {
+    return pcollection;
+  }
+
+  @Override
   public InProcessBundle<T> add(WindowedValue<T> element) {
     checkState(!committed, "Can't add element %s to committed bundle %s", element, this);
     elements.add(element);
@@ -105,12 +109,12 @@ public final class InProcessBundle<T> implements UncommittedBundle<T> {
 
       @Override
       public String toString() {
-        ToStringHelper toStringHelper =
-            MoreObjects.toStringHelper(this).add("pcollection", pcollection);
-        if (keyed) {
-          toStringHelper = toStringHelper.add("key", key);
-        }
-        return toStringHelper.add("elements", elements).toString();
+        return MoreObjects.toStringHelper(this)
+            .omitNullValues()
+            .add("pcollection", pcollection)
+            .add("key", key)
+            .add("elements", committedElements)
+            .toString();
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
new file mode 100644
index 0000000..757e9e1
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
@@ -0,0 +1,364 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
+import com.google.cloud.dataflow.sdk.util.ExecutionContext;
+import com.google.cloud.dataflow.sdk.util.SideInputReader;
+import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.util.common.CounterSet;
+import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.annotation.Nullable;
+
+/**
+ * The evaluation context for a specific pipeline being executed by the
+ * {@link InProcessPipelineRunner}. Contains state shared within the execution across all
+ * transforms.
+ *
+ * <p>{@link InProcessEvaluationContext} contains shared state for an execution of the
+ * {@link InProcessPipelineRunner} that can be used while evaluating a {@link PTransform}. This
+ * consists of views into underlying state and watermark implementations, access to read and write
+ * {@link PCollectionView PCollectionViews}, and constructing {@link CounterSet CounterSets} and
+ * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when
+ * state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and
+ * known to be empty).
+ *
+ * <p>{@link InProcessEvaluationContext} also handles results by committing finalizing bundles based
+ * on the current global state and updating the global state appropriately. This includes updating
+ * the per-{@link StepAndKey} state, updating global watermarks, and executing any callbacks that
+ * can be executed.
+ */
+class InProcessEvaluationContext {
+  /** The step name for each {@link AppliedPTransform} in the {@link Pipeline}. */
+  private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
+
+  /** The options that were used to create this {@link Pipeline}. */
+  private final InProcessPipelineOptions options;
+
+  /** The current processing time and event time watermarks and timers. */
+  private final InMemoryWatermarkManager watermarkManager;
+
+  /** Executes callbacks based on the progression of the watermark. */
+  private final WatermarkCallbackExecutor callbackExecutor;
+
+  /** The stateInternals of the world, by applied PTransform and key. */
+  private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals<?>>
+      applicationStateInternals;
+
+  private final InProcessSideInputContainer sideInputContainer;
+
+  private final CounterSet mergedCounters;
+
+  public static InProcessEvaluationContext create(
+      InProcessPipelineOptions options,
+      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
+      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
+      Map<AppliedPTransform<?, ?, ?>, String> stepNames,
+      Collection<PCollectionView<?>> views) {
+    return new InProcessEvaluationContext(
+        options, rootTransforms, valueToConsumers, stepNames, views);
+  }
+
+  private InProcessEvaluationContext(
+      InProcessPipelineOptions options,
+      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
+      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
+      Map<AppliedPTransform<?, ?, ?>, String> stepNames,
+      Collection<PCollectionView<?>> views) {
+    this.options = checkNotNull(options);
+    checkNotNull(rootTransforms);
+    checkNotNull(valueToConsumers);
+    checkNotNull(stepNames);
+    checkNotNull(views);
+    this.stepNames = stepNames;
+
+    this.watermarkManager =
+        InMemoryWatermarkManager.create(
+            NanosOffsetClock.create(), rootTransforms, valueToConsumers);
+    this.sideInputContainer = InProcessSideInputContainer.create(this, views);
+
+    this.applicationStateInternals = new ConcurrentHashMap<>();
+    this.mergedCounters = new CounterSet();
+
+    this.callbackExecutor = WatermarkCallbackExecutor.create();
+  }
+
+  /**
+   * Handle the provided {@link InProcessTransformResult}, produced after evaluating the provided
+   * {@link CommittedBundle} (potentially null, if the result of a root {@link PTransform}).
+   *
+   * <p>The result is the output of running the transform contained in the
+   * {@link InProcessTransformResult} on the contents of the provided bundle.
+   *
+   * @param completedBundle the bundle that was processed to produce the result. Potentially
+   *                        {@code null} if the transform that produced the result is a root
+   *                        transform
+   * @param completedTimers the timers that were delivered to produce the {@code completedBundle},
+   *                        or an empty iterable if no timers were delivered
+   * @param result the result of evaluating the input bundle
+   * @return the committed bundles contained within the handled {@code result}
+   */
+  public synchronized Iterable<? extends CommittedBundle<?>> handleResult(
+      @Nullable CommittedBundle<?> completedBundle,
+      Iterable<TimerData> completedTimers,
+      InProcessTransformResult result) {
+    Iterable<? extends CommittedBundle<?>> committedBundles =
+        commitBundles(result.getOutputBundles());
+    // Update watermarks and timers
+    watermarkManager.updateWatermarks(
+        completedBundle,
+        result.getTransform(),
+        result.getTimerUpdate().withCompletedTimers(completedTimers),
+        committedBundles,
+        result.getWatermarkHold());
+    fireAllAvailableCallbacks();
+    // Update counters
+    if (result.getCounters() != null) {
+      mergedCounters.merge(result.getCounters());
+    }
+    // Update state internals
+    CopyOnAccessInMemoryStateInternals<?> theirState = result.getState();
+    if (theirState != null) {
+      CopyOnAccessInMemoryStateInternals<?> committedState = theirState.commit();
+      StepAndKey stepAndKey =
+          StepAndKey.of(
+              result.getTransform(), completedBundle == null ? null : completedBundle.getKey());
+      if (!committedState.isEmpty()) {
+        applicationStateInternals.put(stepAndKey, committedState);
+      } else {
+        applicationStateInternals.remove(stepAndKey);
+      }
+    }
+    return committedBundles;
+  }
+
+  private Iterable<? extends CommittedBundle<?>> commitBundles(
+      Iterable<? extends UncommittedBundle<?>> bundles) {
+    ImmutableList.Builder<CommittedBundle<?>> completed = ImmutableList.builder();
+    for (UncommittedBundle<?> inProgress : bundles) {
+      AppliedPTransform<?, ?, ?> producing =
+          inProgress.getPCollection().getProducingTransformInternal();
+      TransformWatermarks watermarks = watermarkManager.getWatermarks(producing);
+      CommittedBundle<?> committed =
+          inProgress.commit(watermarks.getSynchronizedProcessingOutputTime());
+      // Empty bundles don't impact watermarks and shouldn't trigger downstream execution, so
+      // filter them out
+      if (!Iterables.isEmpty(committed.getElements())) {
+        completed.add(committed);
+      }
+    }
+    return completed.build();
+  }
+
+  private void fireAllAvailableCallbacks() {
+    for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
+      fireAvailableCallbacks(transform);
+    }
+  }
+
+  private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> producingTransform) {
+    TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform);
+    callbackExecutor.fireForWatermark(producingTransform, watermarks.getOutputWatermark());
+  }
+
+  /**
+   * Create a {@link UncommittedBundle} for use by a source.
+   */
+  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
+    return InProcessBundle.unkeyed(output);
+  }
+
+  /**
+   * Create a {@link UncommittedBundle} whose elements belong to the specified {@link
+   * PCollection}.
+   */
+  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
+    return input.isKeyed()
+        ? InProcessBundle.keyed(output, input.getKey())
+        : InProcessBundle.unkeyed(output);
+  }
+
+  /**
+   * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by
+   * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}.
+   */
+  public <T> UncommittedBundle<T> createKeyedBundle(
+      CommittedBundle<?> input, Object key, PCollection<T> output) {
+    return InProcessBundle.keyed(output, key);
+  }
+
+  /**
+   * Create a {@link PCollectionViewWriter}, whose elements will be used in the provided
+   * {@link PCollectionView}.
+   */
+  public <ElemT, ViewT> PCollectionViewWriter<ElemT, ViewT> createPCollectionViewWriter(
+      PCollection<Iterable<ElemT>> input, final PCollectionView<ViewT> output) {
+    return new PCollectionViewWriter<ElemT, ViewT>() {
+      @Override
+      public void add(Iterable<WindowedValue<ElemT>> values) {
+        sideInputContainer.write(output, values);
+      }
+    };
+  }
+
+  /**
+   * Schedule a callback to be executed after output would be produced for the given window
+   * if there had been input.
+   *
+   * <p>Output would be produced when the watermark for a {@link PValue} passes the point at
+   * which the trigger for the specified window (with the specified windowing strategy) must have
+   * fired from the perspective of that {@link PValue}, as specified by the value of
+   * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the
+   * {@link WindowingStrategy}. When the callback has fired, either values will have been produced
+   * for a key in that window, the window is empty, or all elements in the window are late. The
+   * callback will be executed regardless of whether values have been produced.
+   */
+  public void scheduleAfterOutputWouldBeProduced(
+      PValue value,
+      BoundedWindow window,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Runnable runnable) {
+    AppliedPTransform<?, ?, ?> producing = getProducing(value);
+    callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable);
+
+    fireAvailableCallbacks(lookupProducing(value));
+  }
+
+  private AppliedPTransform<?, ?, ?> getProducing(PValue value) {
+    if (value.getProducingTransformInternal() != null) {
+      return value.getProducingTransformInternal();
+    }
+    return lookupProducing(value);
+  }
+
+  private AppliedPTransform<?, ?, ?> lookupProducing(PValue value) {
+    for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
+      if (transform.getOutput().equals(value) || transform.getOutput().expand().contains(value)) {
+        return transform;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get the options used by this {@link Pipeline}.
+   */
+  public InProcessPipelineOptions getPipelineOptions() {
+    return options;
+  }
+
+  /**
+   * Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key.
+   */
+  public InProcessExecutionContext getExecutionContext(
+      AppliedPTransform<?, ?, ?> application, Object key) {
+    StepAndKey stepAndKey = StepAndKey.of(application, key);
+    return new InProcessExecutionContext(
+        options.getClock(),
+        key,
+        (CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey),
+        watermarkManager.getWatermarks(application));
+  }
+
+  /**
+   * Get all of the steps used in this {@link Pipeline}.
+   */
+  public Collection<AppliedPTransform<?, ?, ?>> getSteps() {
+    return stepNames.keySet();
+  }
+
+  /**
+   * Get the Step Name for the provided application.
+   */
+  public String getStepName(AppliedPTransform<?, ?, ?> application) {
+    return stepNames.get(application);
+  }
+
+  /**
+   * Returns a {@link SideInputReader} capable of reading the provided
+   * {@link PCollectionView PCollectionViews}.
+   * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to
+   *                   read
+   * @return a {@link SideInputReader} that can read all of the provided
+   *         {@link PCollectionView PCollectionViews}
+   */
+  public SideInputReader createSideInputReader(final List<PCollectionView<?>> sideInputs) {
+    return sideInputContainer.createReaderForViews(sideInputs);
+  }
+
+  /**
+   * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent
+   * of all other {@link CounterSet CounterSets} created by this call.
+   *
+   * The {@link InProcessEvaluationContext} is responsible for unifying the counters present in
+   * all created {@link CounterSet CounterSets} when the transforms that call this method
+   * complete.
+   */
+  public CounterSet createCounterSet() {
+    return new CounterSet();
+  }
+
+  /**
+   * Returns all of the counters that have been merged into this context via calls to
+   * {@link CounterSet#merge(CounterSet)}.
+   */
+  public CounterSet getCounters() {
+    return mergedCounters;
+  }
+
+  /**
+   * Extracts all timers that have been fired and have not already been extracted.
+   *
+   * <p>This is a destructive operation. Timers will only appear in the result of this method once
+   * for each time they are set.
+   */
+  public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() {
+    return watermarkManager.extractFiredTimers();
+  }
+
+  /**
+   * Returns true if all steps are done.
+   */
+  public boolean isDone() {
+    return watermarkManager.isDone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
index d659d96..60c8543 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
@@ -15,10 +15,15 @@
  */
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
+import com.google.cloud.dataflow.sdk.options.Default;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 
 /**
  * Options that can be used to configure the {@link InProcessPipelineRunner}.
  */
-public interface InProcessPipelineOptions extends PipelineOptions {}
+public interface InProcessPipelineOptions extends PipelineOptions {
+  @Default.InstanceFactory(NanosOffsetClock.Factory.class)
+  Clock getClock();
 
+  void setClock(Clock clock);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index 124de46..7a268ee 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -17,31 +17,22 @@ package com.google.cloud.dataflow.sdk.runners.inprocess;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.annotations.Experimental;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKey;
 import com.google.cloud.dataflow.sdk.runners.inprocess.ViewEvaluatorFactory.InProcessCreatePCollectionView;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
-import com.google.cloud.dataflow.sdk.util.ExecutionContext;
-import com.google.cloud.dataflow.sdk.util.SideInputReader;
 import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PValue;
 import com.google.common.collect.ImmutableMap;
 
 import org.joda.time.Instant;
 
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -83,6 +74,11 @@ public class InProcessPipelineRunner {
    */
   public static interface UncommittedBundle<T> {
     /**
+     * Returns the PCollection that the elements of this bundle belong to.
+     */
+    PCollection<T> getPCollection();
+
+    /**
      * Outputs an element to this bundle.
      *
      * @param element the element to add to this bundle
@@ -110,7 +106,7 @@ public class InProcessPipelineRunner {
   public static interface CommittedBundle<T> {
 
     /**
-     * @return the PCollection that the elements of this bundle belong to
+     * Returns the PCollection that the elements of this bundle belong to.
      */
     PCollection<T> getPCollection();
 
@@ -154,84 +150,22 @@ public class InProcessPipelineRunner {
     void add(Iterable<WindowedValue<ElemT>> values);
   }
 
-  /**
-   * The evaluation context for the {@link InProcessPipelineRunner}. Contains state shared within
-   * the current evaluation.
-   */
-  public static interface InProcessEvaluationContext {
-    /**
-     * Create a {@link UncommittedBundle} for use by a source.
-     */
-    <T> UncommittedBundle<T> createRootBundle(PCollection<T> output);
-
-    /**
-     * Create a {@link UncommittedBundle} whose elements belong to the specified {@link
-     * PCollection}.
-     */
-    <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output);
-
-    /**
-     * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by
-     * {@link GroupByKeyOnly} {@link PTransform PTransforms}.
-     */
-    <T> UncommittedBundle<T> createKeyedBundle(
-        CommittedBundle<?> input, Object key, PCollection<T> output);
-
-    /**
-     * Create a bundle whose elements will be used in a PCollectionView.
-     */
-    <ElemT, ViewT> PCollectionViewWriter<ElemT, ViewT> createPCollectionViewWriter(
-        PCollection<Iterable<ElemT>> input, PCollectionView<ViewT> output);
-
-    /**
-     * Get the options used by this {@link Pipeline}.
-     */
-    InProcessPipelineOptions getPipelineOptions();
-
-    /**
-     * Get an {@link ExecutionContext} for the provided application.
-     */
-    InProcessExecutionContext getExecutionContext(
-        AppliedPTransform<?, ?, ?> application, @Nullable Object key);
-
-    /**
-     * Get the Step Name for the provided application.
-     */
-    String getStepName(AppliedPTransform<?, ?, ?> application);
-
-    /**
-     * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to
-     *                   read
-     * @return a {@link SideInputReader} that can read all of the provided
-     *         {@link PCollectionView PCollectionViews}
-     */
-    SideInputReader createSideInputReader(List<PCollectionView<?>> sideInputs);
+  ////////////////////////////////////////////////////////////////////////////////////////////////
+  private final InProcessPipelineOptions options;
 
-    /**
-     * Schedules a callback after the watermark for a {@link PValue} after the trigger for the
-     * specified window (with the specified windowing strategy) must have fired from the perspective
-     * of that {@link PValue}, as specified by the value of
-     * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the
-     * {@link WindowingStrategy}.
-     */
-    void callAfterOutputMustHaveBeenProduced(PValue value, BoundedWindow window,
-        WindowingStrategy<?, ?> windowingStrategy, Runnable runnable);
+  public static InProcessPipelineRunner fromOptions(PipelineOptions options) {
+    return new InProcessPipelineRunner(options.as(InProcessPipelineOptions.class));
+  }
 
-    /**
-     * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent
-     * of all other {@link CounterSet CounterSets} created by this call.
-     *
-     * The {@link InProcessEvaluationContext} is responsible for unifying the counters present in
-     * all created {@link CounterSet CounterSets} when the transforms that call this method
-     * complete.
-     */
-    CounterSet createCounterSet();
+  private InProcessPipelineRunner(InProcessPipelineOptions options) {
+    this.options = options;
+  }
 
-    /**
-     * Returns all of the counters that have been merged into this context via calls to
-     * {@link CounterSet#merge(CounterSet)}.
-     */
-    CounterSet getCounters();
+  /**
+   * Returns the {@link PipelineOptions} used to create this {@link InProcessPipelineRunner}.
+   */
+  public InProcessPipelineOptions getPipelineOptions() {
+    return options;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java
index bf9a2e1..37c9fcf 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java
@@ -17,7 +17,6 @@ package com.google.cloud.dataflow.sdk.runners.inprocess;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
 import com.google.cloud.dataflow.sdk.util.PCollectionViewWindow;
@@ -26,6 +25,7 @@ import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
 import com.google.cloud.dataflow.sdk.values.PCollectionView;
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Throwables;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -89,7 +89,7 @@ class InProcessSideInputContainer {
    * the provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without
    * casting, but will change as this {@link InProcessSideInputContainer} is modified.
    */
-  public SideInputReader withViews(Collection<PCollectionView<?>> newContainedViews) {
+  public SideInputReader createReaderForViews(Collection<PCollectionView<?>> newContainedViews) {
     if (!containedViews.containsAll(newContainedViews)) {
       Set<PCollectionView<?>> currentlyContained = ImmutableSet.copyOf(containedViews);
       Set<PCollectionView<?>> newRequested = ImmutableSet.copyOf(newContainedViews);
@@ -108,8 +108,20 @@ class InProcessSideInputContainer {
    *
    * <p>The provided iterable is expected to contain only a single window and pane.
    */
-  public void write(PCollectionView<?> view, Iterable<? extends WindowedValue<?>> values)
-      throws ExecutionException {
+  public void write(PCollectionView<?> view, Iterable<? extends WindowedValue<?>> values) {
+    Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow =
+        indexValuesByWindow(values);
+    for (Map.Entry<BoundedWindow, Collection<WindowedValue<?>>> windowValues :
+        valuesPerWindow.entrySet()) {
+      updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue());
+    }
+  }
+
+  /**
+   * Index the provided values by all {@link BoundedWindow windows} in which they appear.
+   */
+  private Map<BoundedWindow, Collection<WindowedValue<?>>> indexValuesByWindow(
+      Iterable<? extends WindowedValue<?>> values) {
     Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow = new HashMap<>();
     for (WindowedValue<?> value : values) {
       for (BoundedWindow window : value.getWindows()) {
@@ -121,29 +133,40 @@ class InProcessSideInputContainer {
         windowValues.add(value);
       }
     }
-    for (Map.Entry<BoundedWindow, Collection<WindowedValue<?>>> windowValues :
-        valuesPerWindow.entrySet()) {
-      PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, windowValues.getKey());
-      SettableFuture<Iterable<? extends WindowedValue<?>>> future = viewByWindows.get(windowedView);
+    return valuesPerWindow;
+  }
+
+  /**
+   * Set the value of the {@link PCollectionView} in the {@link BoundedWindow} to be based on the
+   * specified values, if the values are part of a later pane than currently exist within the
+   * {@link PCollectionViewWindow}.
+   */
+  private void updatePCollectionViewWindowValues(
+      PCollectionView<?> view, BoundedWindow window, Collection<WindowedValue<?>> windowValues) {
+    PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, window);
+    SettableFuture<Iterable<? extends WindowedValue<?>>> future = null;
+    try {
+      future = viewByWindows.get(windowedView);
       if (future.isDone()) {
-        try {
-          Iterator<? extends WindowedValue<?>> existingValues = future.get().iterator();
-          PaneInfo newPane = windowValues.getValue().iterator().next().getPane();
-          // The current value may have no elements, if no elements were produced for the window,
-          // but we are recieving late data.
-          if (!existingValues.hasNext()
-              || newPane.getIndex() > existingValues.next().getPane().getIndex()) {
-            viewByWindows.invalidate(windowedView);
-            viewByWindows.get(windowedView).set(windowValues.getValue());
-          }
-        } catch (InterruptedException e) {
-          // TODO: Handle meaningfully. This should never really happen when the result remains
-          // useful, but the result could be available and the thread can still be interrupted.
-          Thread.currentThread().interrupt();
+        Iterator<? extends WindowedValue<?>> existingValues = future.get().iterator();
+        PaneInfo newPane = windowValues.iterator().next().getPane();
+        // The current value may have no elements, if no elements were produced for the window,
+        // but we are recieving late data.
+        if (!existingValues.hasNext()
+            || newPane.getIndex() > existingValues.next().getPane().getIndex()) {
+          viewByWindows.invalidate(windowedView);
+          viewByWindows.get(windowedView).set(windowValues);
         }
       } else {
-        future.set(windowValues.getValue());
+        future.set(windowValues);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      if (future != null && !future.isDone()) {
+        future.set(Collections.<WindowedValue<?>>emptyList());
       }
+    } catch (ExecutionException e) {
+      Throwables.propagate(e.getCause());
     }
   }
 
@@ -165,7 +188,7 @@ class InProcessSideInputContainer {
             viewByWindows.get(windowedView);
 
         WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
-        evaluationContext.callAfterOutputMustHaveBeenProduced(
+        evaluationContext.scheduleAfterOutputWouldBeProduced(
             view, window, windowingStrategy, new Runnable() {
               @Override
               public void run() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
index e3ae1a0..24142c2 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
@@ -17,7 +17,6 @@ package com.google.cloud.dataflow.sdk.runners.inprocess;
 
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
index cd79c21..af5914b 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
@@ -17,7 +17,6 @@ package com.google.cloud.dataflow.sdk.runners.inprocess;
 
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java
new file mode 100644
index 0000000..1595572
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.common.base.MoreObjects;
+
+import java.util.Objects;
+
+/**
+ * A (Step, Key) pair. This is useful as a map key or cache key for things that are available
+ * per-step in a keyed manner (e.g. State).
+ */
+final class StepAndKey {
+  private final AppliedPTransform<?, ?, ?> step;
+  private final Object key;
+
+  /**
+   * Create a new {@link StepAndKey} with the provided step and key.
+   */
+  public static StepAndKey of(AppliedPTransform<?, ?, ?> step, Object key) {
+    return new StepAndKey(step, key);
+  }
+
+  private StepAndKey(AppliedPTransform<?, ?, ?> step, Object key) {
+    this.step = step;
+    this.key = key;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(StepAndKey.class)
+        .add("step", step.getFullName())
+        .add("key", key)
+        .toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(step, key);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == this) {
+      return true;
+    } else if (!(other instanceof StepAndKey)) {
+      return false;
+    } else {
+      StepAndKey that = (StepAndKey) other;
+      return Objects.equals(this.step, that.step)
+          && Objects.equals(this.key, that.key);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java
index 3b672e0..860ddfe 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java
@@ -16,7 +16,6 @@
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java
new file mode 100644
index 0000000..0c8cb7e
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Flatten.FlattenPCollectionList;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory}
+ * implementations based on the type of {@link PTransform} of the application.
+ */
+class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
+  public static TransformEvaluatorRegistry defaultRegistry() {
+    @SuppressWarnings("rawtypes")
+    ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> primitives =
+        ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder()
+            .put(Read.Bounded.class, new BoundedReadEvaluatorFactory())
+            .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory())
+            .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory())
+            .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory())
+            .put(
+                GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class,
+                new GroupByKeyEvaluatorFactory())
+            .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory())
+            .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory())
+            .build();
+    return new TransformEvaluatorRegistry(primitives);
+  }
+
+  // the TransformEvaluatorFactories can construct instances of all generic types of transform,
+  // so all instances of a primitive can be handled with the same evaluator factory.
+  @SuppressWarnings("rawtypes")
+  private final Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories;
+
+  private TransformEvaluatorRegistry(
+      @SuppressWarnings("rawtypes")
+      Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories) {
+    this.factories = factories;
+  }
+
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application,
+      @Nullable CommittedBundle<?> inputBundle,
+      InProcessEvaluationContext evaluationContext)
+      throws Exception {
+    TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass());
+    return factory.forApplication(application, inputBundle, evaluationContext);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
index 4beac33..97f0e25 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
@@ -21,7 +21,6 @@ import com.google.cloud.dataflow.sdk.io.UnboundedSource.CheckpointMark;
 import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
index f47cd1d..314d81f 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
@@ -17,7 +17,6 @@ package com.google.cloud.dataflow.sdk.runners.inprocess;
 
 import com.google.cloud.dataflow.sdk.coders.KvCoder;
 import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java
new file mode 100644
index 0000000..27d59b9
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Ordering;
+
+import org.joda.time.Instant;
+
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Executes callbacks that occur based on the progression of the watermark per-step.
+ *
+ * <p>Callbacks are registered by calls to
+ * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)},
+ * and are executed after a call to {@link #fireForWatermark(AppliedPTransform, Instant)} with the
+ * same {@link AppliedPTransform} and a watermark sufficient to ensure that the trigger for the
+ * windowing strategy would have been produced.
+ *
+ * <p>NOTE: {@link WatermarkCallbackExecutor} does not track the latest observed watermark for any
+ * {@link AppliedPTransform} - any call to
+ * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)}
+ * that could have potentially already fired should be followed by a call to
+ * {@link #fireForWatermark(AppliedPTransform, Instant)} for the same transform with the current
+ * value of the watermark.
+ */
+class WatermarkCallbackExecutor {
+  /**
+   * Create a new {@link WatermarkCallbackExecutor}.
+   */
+  public static WatermarkCallbackExecutor create() {
+    return new WatermarkCallbackExecutor();
+  }
+
+  private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>>
+      callbacks;
+  private final ExecutorService executor;
+
+  private WatermarkCallbackExecutor() {
+    this.callbacks = new ConcurrentHashMap<>();
+    this.executor = Executors.newSingleThreadExecutor();
+  }
+
+  /**
+   * Execute the provided {@link Runnable} after the next call to
+   * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window is guaranteed to have
+   * produced output.
+   */
+  public void callOnGuaranteedFiring(
+      AppliedPTransform<?, ?, ?> step,
+      BoundedWindow window,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Runnable runnable) {
+    WatermarkCallback callback =
+        WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable);
+
+    PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
+    if (callbackQueue == null) {
+      callbackQueue = new PriorityQueue<>(11, new CallbackOrdering());
+      if (callbacks.putIfAbsent(step, callbackQueue) != null) {
+        callbackQueue = callbacks.get(step);
+      }
+    }
+
+    synchronized (callbackQueue) {
+      callbackQueue.offer(callback);
+    }
+  }
+
+  /**
+   * Schedule all pending callbacks that must have produced output by the time of the provided
+   * watermark.
+   */
+  public void fireForWatermark(AppliedPTransform<?, ?, ?> step, Instant watermark) {
+    PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
+    if (callbackQueue == null) {
+      return;
+    }
+    synchronized (callbackQueue) {
+      while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) {
+        executor.submit(callbackQueue.poll().getCallback());
+      }
+    }
+  }
+
+  private static class WatermarkCallback {
+    public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(
+        BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
+      @SuppressWarnings("unchecked")
+      Instant firingAfter =
+          strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window);
+      return new WatermarkCallback(firingAfter, callback);
+    }
+
+    private final Instant fireAfter;
+    private final Runnable callback;
+
+    private WatermarkCallback(Instant fireAfter, Runnable callback) {
+      this.fireAfter = fireAfter;
+      this.callback = callback;
+    }
+
+    public boolean shouldFire(Instant currentWatermark) {
+      return currentWatermark.isAfter(fireAfter)
+          || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    }
+
+    public Runnable getCallback() {
+      return callback;
+    }
+  }
+
+  private static class CallbackOrdering extends Ordering<WatermarkCallback> {
+    @Override
+    public int compare(WatermarkCallback left, WatermarkCallback right) {
+      return ComparisonChain.start()
+          .compare(left.fireAfter, right.fireAfter)
+          .compare(left.callback, right.callback, Ordering.arbitrary())
+          .result();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
index 9f22fbb..4395514 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
@@ -25,7 +25,7 @@ import static org.mockito.Mockito.when;
 import com.google.cloud.dataflow.sdk.io.BoundedSource;
 import com.google.cloud.dataflow.sdk.io.CountingSource;
 import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
+import com.google.cloud.dataflow.sdk.io.Read.Bounded;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
index bf25970..0120b98 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
@@ -22,7 +22,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
index 5c9e824..4ced82f 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
@@ -23,7 +23,6 @@ import static org.mockito.Mockito.when;
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.KvCoder;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
index 2425152..52398cf 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java
@@ -1047,6 +1047,18 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     assertThat(built.getCompletedTimers(), emptyIterable());
   }
 
+  @Test
+  public void timerUpdateWithCompletedTimersNotAddedToExisting() {
+    TimerUpdateBuilder builder = TimerUpdate.builder(null);
+    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+
+    TimerUpdate built = builder.build();
+    assertThat(built.getCompletedTimers(), emptyIterable());
+    assertThat(
+        built.withCompletedTimers(ImmutableList.of(timer)).getCompletedTimers(), contains(timer));
+    assertThat(built.getCompletedTimers(), emptyIterable());
+  }
+
   private static Matcher<Instant> earlierThan(final Instant laterInstant) {
     return new BaseMatcher<Instant>() {
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
new file mode 100644
index 0000000..1490960
--- /dev/null
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
@@ -0,0 +1,436 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.WithKeys;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo.Timing;
+import com.google.cloud.dataflow.sdk.util.SideInputReader;
+import com.google.cloud.dataflow.sdk.util.TimeDomain;
+import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.util.common.Counter;
+import com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind;
+import com.google.cloud.dataflow.sdk.util.common.CounterSet;
+import com.google.cloud.dataflow.sdk.util.state.BagState;
+import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
+import com.google.cloud.dataflow.sdk.util.state.StateTag;
+import com.google.cloud.dataflow.sdk.util.state.StateTags;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import com.google.common.collect.ImmutableList;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for {@link InProcessEvaluationContext}.
+ */
+@RunWith(JUnit4.class)
+public class InProcessEvaluationContextTest {
+  private TestPipeline p;
+  private InProcessEvaluationContext context;
+  private PCollection<Integer> created;
+  private PCollection<KV<String, Integer>> downstream;
+  private PCollectionView<Iterable<Integer>> view;
+
+  @Before
+  public void setup() {
+    InProcessPipelineRunner runner =
+        InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create());
+    p = TestPipeline.create();
+    created = p.apply(Create.of(1, 2, 3));
+    downstream = created.apply(WithKeys.<String, Integer>of("foo"));
+    view = created.apply(View.<Integer>asIterable());
+    Collection<AppliedPTransform<?, ?, ?>> rootTransforms =
+        ImmutableList.<AppliedPTransform<?, ?, ?>>of(created.getProducingTransformInternal());
+    Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers = new HashMap<>();
+    valueToConsumers.put(
+        created,
+        ImmutableList.<AppliedPTransform<?, ?, ?>>of(
+            downstream.getProducingTransformInternal(), view.getProducingTransformInternal()));
+    valueToConsumers.put(downstream, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
+    valueToConsumers.put(view, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
+
+    Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
+    stepNames.put(created.getProducingTransformInternal(), "s1");
+    stepNames.put(downstream.getProducingTransformInternal(), "s2");
+    stepNames.put(view.getProducingTransformInternal(), "s3");
+
+    Collection<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(view);
+    context = InProcessEvaluationContext.create(
+            runner.getPipelineOptions(),
+            rootTransforms,
+            valueToConsumers,
+            stepNames,
+            views);
+  }
+
+  @Test
+  public void writeToViewWriterThenReadReads() {
+    PCollectionViewWriter<Integer, Iterable<Integer>> viewWriter =
+        context.createPCollectionViewWriter(
+            PCollection.<Iterable<Integer>>createPrimitiveOutputInternal(
+                p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED),
+            view);
+    BoundedWindow window = new TestBoundedWindow(new Instant(1024L));
+    BoundedWindow second = new TestBoundedWindow(new Instant(899999L));
+    WindowedValue<Integer> firstValue =
+        WindowedValue.of(1, new Instant(1222), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    WindowedValue<Integer> secondValue =
+        WindowedValue.of(
+            2, new Instant(8766L), second, PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0));
+    Iterable<WindowedValue<Integer>> values = ImmutableList.of(firstValue, secondValue);
+    viewWriter.add(values);
+
+    SideInputReader reader =
+        context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view));
+    assertThat(reader.get(view, window), containsInAnyOrder(1));
+    assertThat(reader.get(view, second), containsInAnyOrder(2));
+
+    WindowedValue<Integer> overrittenSecondValue =
+        WindowedValue.of(
+            4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1));
+    viewWriter.add(Collections.singleton(overrittenSecondValue));
+    assertThat(reader.get(view, second), containsInAnyOrder(4444));
+  }
+
+  @Test
+  public void getExecutionContextSameStepSameKeyState() {
+    InProcessExecutionContext fooContext =
+        context.getExecutionContext(created.getProducingTransformInternal(), "foo");
+
+    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+
+    InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1", null);
+    stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
+
+    context.handleResult(
+        InProcessBundle.keyed(created, "foo").commit(Instant.now()),
+        ImmutableList.<TimerData>of(),
+        StepTransformResult.withoutHold(created.getProducingTransformInternal())
+            .withState(stepContext.commitState())
+            .build());
+
+    InProcessExecutionContext secondFooContext =
+        context.getExecutionContext(created.getProducingTransformInternal(), "foo");
+    assertThat(
+        secondFooContext
+            .getOrCreateStepContext("s1", "s1", null)
+            .stateInternals()
+            .state(StateNamespaces.global(), intBag)
+            .read(),
+        contains(1));
+  }
+
+
+  @Test
+  public void getExecutionContextDifferentKeysIndependentState() {
+    InProcessExecutionContext fooContext =
+        context.getExecutionContext(created.getProducingTransformInternal(), "foo");
+
+    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+
+    fooContext
+        .getOrCreateStepContext("s1", "s1", null)
+        .stateInternals()
+        .state(StateNamespaces.global(), intBag)
+        .add(1);
+
+    InProcessExecutionContext barContext =
+        context.getExecutionContext(created.getProducingTransformInternal(), "bar");
+    assertThat(barContext, not(equalTo(fooContext)));
+    assertThat(
+        barContext
+            .getOrCreateStepContext("s1", "s1", null)
+            .stateInternals()
+            .state(StateNamespaces.global(), intBag)
+            .read(),
+        emptyIterable());
+  }
+
+  @Test
+  public void getExecutionContextDifferentStepsIndependentState() {
+    String myKey = "foo";
+    InProcessExecutionContext fooContext =
+        context.getExecutionContext(created.getProducingTransformInternal(), myKey);
+
+    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+
+    fooContext
+        .getOrCreateStepContext("s1", "s1", null)
+        .stateInternals()
+        .state(StateNamespaces.global(), intBag)
+        .add(1);
+
+    InProcessExecutionContext barContext =
+        context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
+    assertThat(
+        barContext
+            .getOrCreateStepContext("s1", "s1", null)
+            .stateInternals()
+            .state(StateNamespaces.global(), intBag)
+            .read(),
+        emptyIterable());
+  }
+
+  @Test
+  public void handleResultMergesCounters() {
+    CounterSet counters = context.createCounterSet();
+    Counter<Long> myCounter = Counter.longs("foo", AggregationKind.SUM);
+    counters.addCounter(myCounter);
+
+    myCounter.addValue(4L);
+    InProcessTransformResult result =
+        StepTransformResult.withoutHold(created.getProducingTransformInternal())
+            .withCounters(counters)
+            .build();
+    context.handleResult(null, ImmutableList.<TimerData>of(), result);
+    assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(4L));
+
+    CounterSet againCounters = context.createCounterSet();
+    Counter<Long> myLongCounterAgain = Counter.longs("foo", AggregationKind.SUM);
+    againCounters.add(myLongCounterAgain);
+    myLongCounterAgain.addValue(8L);
+
+    InProcessTransformResult secondResult =
+        StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
+            .withCounters(againCounters)
+            .build();
+    context.handleResult(
+        InProcessBundle.unkeyed(created).commit(Instant.now()),
+        ImmutableList.<TimerData>of(),
+        secondResult);
+    assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L));
+  }
+
+  @Test
+  public void handleResultStoresState() {
+    String myKey = "foo";
+    InProcessExecutionContext fooContext =
+        context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
+
+    StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
+
+    CopyOnAccessInMemoryStateInternals<Object> state =
+        fooContext.getOrCreateStepContext("s1", "s1", null).stateInternals();
+    BagState<Integer> bag = state.state(StateNamespaces.global(), intBag);
+    bag.add(1);
+    bag.add(2);
+    bag.add(4);
+
+    InProcessTransformResult stateResult =
+        StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
+            .withState(state)
+            .build();
+
+    context.handleResult(
+        InProcessBundle.keyed(created, myKey).commit(Instant.now()),
+        ImmutableList.<TimerData>of(),
+        stateResult);
+
+    InProcessExecutionContext afterResultContext =
+        context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
+
+    CopyOnAccessInMemoryStateInternals<Object> afterResultState =
+        afterResultContext.getOrCreateStepContext("s1", "s1", null).stateInternals();
+    assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4));
+  }
+
+  @Test
+  public void callAfterOutputMustHaveBeenProducedAfterEndOfWatermarkCallsback() throws Exception {
+    final CountDownLatch callLatch = new CountDownLatch(1);
+    Runnable callback =
+        new Runnable() {
+          @Override
+          public void run() {
+            callLatch.countDown();
+          }
+        };
+
+    // Should call back after the end of the global window
+    context.scheduleAfterOutputWouldBeProduced(
+        downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback);
+
+    InProcessTransformResult result =
+        StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0))
+            .build();
+
+    context.handleResult(null, ImmutableList.<TimerData>of(), result);
+
+    // Difficult to demonstrate that we took no action in a multithreaded world; poll for a bit
+    // will likely be flaky if this logic is broken
+    assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false));
+
+    InProcessTransformResult finishedResult =
+        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+    context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
+    // Obtain the value via blocking call
+    assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true));
+  }
+
+  @Test
+  public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception {
+    InProcessTransformResult finishedResult =
+        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+    context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
+
+    final CountDownLatch callLatch = new CountDownLatch(1);
+    Runnable callback =
+        new Runnable() {
+          @Override
+          public void run() {
+            callLatch.countDown();
+          }
+        };
+    context.scheduleAfterOutputWouldBeProduced(
+        downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback);
+    assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true));
+  }
+
+  @Test
+  public void extractFiredTimersExtractsTimers() {
+    InProcessTransformResult holdResult =
+        StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0))
+            .build();
+    context.handleResult(null, ImmutableList.<TimerData>of(), holdResult);
+
+    String key = "foo";
+    TimerData toFire =
+        TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME);
+    InProcessTransformResult timerResult =
+        StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
+            .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null))
+            .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build())
+            .build();
+
+    // haven't added any timers, must be empty
+    assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
+    context.handleResult(
+        InProcessBundle.keyed(created, key).commit(Instant.now()),
+        ImmutableList.<TimerData>of(),
+        timerResult);
+
+    // timer hasn't fired
+    assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
+
+    InProcessTransformResult advanceResult =
+        StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+    // Should cause the downstream timer to fire
+    context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult);
+
+    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired = context.extractFiredTimers();
+    assertThat(
+        fired,
+        Matchers.<AppliedPTransform<?, ?, ?>>hasKey(downstream.getProducingTransformInternal()));
+    Map<Object, FiredTimers> downstreamFired =
+        fired.get(downstream.getProducingTransformInternal());
+    assertThat(downstreamFired, Matchers.<Object>hasKey(key));
+
+    FiredTimers firedForKey = downstreamFired.get(key);
+    assertThat(firedForKey.getTimers(TimeDomain.PROCESSING_TIME), emptyIterable());
+    assertThat(firedForKey.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), emptyIterable());
+    assertThat(firedForKey.getTimers(TimeDomain.EVENT_TIME), contains(toFire));
+
+    // Don't reextract timers
+    assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
+  }
+
+  @Test
+  public void createBundleUnkeyedResultUnkeyed() {
+    CommittedBundle<KV<String, Integer>> newBundle =
+        context
+            .createBundle(InProcessBundle.unkeyed(created).commit(Instant.now()), downstream)
+            .commit(Instant.now());
+    assertThat(newBundle.isKeyed(), is(false));
+  }
+
+  @Test
+  public void createBundleKeyedResultPropagatesKey() {
+    CommittedBundle<KV<String, Integer>> newBundle =
+        context
+            .createBundle(InProcessBundle.keyed(created, "foo").commit(Instant.now()), downstream)
+            .commit(Instant.now());
+    assertThat(newBundle.isKeyed(), is(true));
+    assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo"));
+  }
+
+  @Test
+  public void createRootBundleUnkeyed() {
+    assertThat(context.createRootBundle(created).commit(Instant.now()).isKeyed(), is(false));
+  }
+
+  @Test
+  public void createKeyedBundleKeyed() {
+    CommittedBundle<KV<String, Integer>> keyedBundle =
+        context
+            .createKeyedBundle(
+                InProcessBundle.unkeyed(created).commit(Instant.now()), "foo", downstream)
+            .commit(Instant.now());
+    assertThat(keyedBundle.isKeyed(), is(true));
+    assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo"));
+  }
+
+  private static class TestBoundedWindow extends BoundedWindow {
+    private final Instant ts;
+
+    public TestBoundedWindow(Instant ts) {
+      this.ts = ts;
+    }
+
+    @Override
+    public Instant maxTimestamp() {
+      return ts;
+    }
+  }
+}