You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/16 02:30:43 UTC

[08/12] incubator-beam git commit: Remove InProcess Prefixes

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 4dd1475..ef31ba7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -42,7 +42,7 @@ class TransformExecutor<T> implements Runnable {
   public static <T> TransformExecutor<T> create(
       TransformEvaluatorFactory factory,
       Iterable<? extends ModelEnforcementFactory> modelEnforcements,
-      InProcessEvaluationContext evaluationContext,
+      EvaluationContext evaluationContext,
       CommittedBundle<T> inputBundle,
       AppliedPTransform<?, ?, ?> transform,
       CompletionCallback completionCallback,
@@ -60,7 +60,7 @@ class TransformExecutor<T> implements Runnable {
   private final TransformEvaluatorFactory evaluatorFactory;
   private final Iterable<? extends ModelEnforcementFactory> modelEnforcements;
 
-  private final InProcessEvaluationContext evaluationContext;
+  private final EvaluationContext evaluationContext;
 
   /** The transform that will be evaluated. */
   private final AppliedPTransform<?, ?, ?> transform;
@@ -75,7 +75,7 @@ class TransformExecutor<T> implements Runnable {
   private TransformExecutor(
       TransformEvaluatorFactory factory,
       Iterable<? extends ModelEnforcementFactory> modelEnforcements,
-      InProcessEvaluationContext evaluationContext,
+      EvaluationContext evaluationContext,
       CommittedBundle<T> inputBundle,
       AppliedPTransform<?, ?, ?> transform,
       CompletionCallback completionCallback,
@@ -117,7 +117,7 @@ class TransformExecutor<T> implements Runnable {
 
       processElements(evaluator, enforcements);
 
-      InProcessTransformResult result = finishBundle(evaluator, enforcements);
+      TransformResult result = finishBundle(evaluator, enforcements);
     } catch (Throwable t) {
       onComplete.handleThrowable(inputBundle, t);
       if (t instanceof RuntimeException) {
@@ -155,13 +155,13 @@ class TransformExecutor<T> implements Runnable {
    * Finishes processing the input bundle and commit the result using the
    * {@link CompletionCallback}, applying any {@link ModelEnforcement} if necessary.
    *
-   * @return the {@link InProcessTransformResult} produced by
+   * @return the {@link TransformResult} produced by
    *         {@link TransformEvaluator#finishBundle()}
    */
-  private InProcessTransformResult finishBundle(
+  private TransformResult finishBundle(
       TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements)
       throws Exception {
-    InProcessTransformResult result = evaluator.finishBundle();
+    TransformResult result = evaluator.finishBundle();
     CommittedResult outputs = onComplete.handleResult(inputBundle, result);
     for (ModelEnforcement<T> enforcement : enforcements) {
       enforcement.afterFinish(inputBundle, result, outputs.getOutputs());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
new file mode 100644
index 0000000..c1e502d
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+
+/**
+ * The result of evaluating an {@link AppliedPTransform} with a {@link TransformEvaluator}.
+ */
+public interface TransformResult {
+  /**
+   * Returns the {@link AppliedPTransform} that produced this result.
+   */
+  AppliedPTransform<?, ?, ?> getTransform();
+
+  /**
+   * Returns the {@link UncommittedBundle (uncommitted) Bundles} output by this transform. These
+   * will be committed by the evaluation context as part of completing this result.
+   */
+  Iterable<? extends UncommittedBundle<?>> getOutputBundles();
+
+  /**
+   * Returns elements that were provided to the {@link TransformEvaluator} as input but were not
+   * processed.
+   */
+  Iterable<? extends WindowedValue<?>> getUnprocessedElements();
+
+  /**
+   * Returns the {@link CounterSet} used by this {@link PTransform}, or null if this transform did
+   * not use a {@link CounterSet}.
+   */
+  @Nullable CounterSet getCounters();
+
+  /**
+   * Returns the Watermark Hold for the transform at the time this result was produced.
+   *
+   * If the transform does not set any watermark hold, returns
+   * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+   */
+  Instant getWatermarkHold();
+
+  /**
+   * Returns the State used by the transform.
+   *
+   * If this evaluation did not access state, this may return null.
+   */
+  @Nullable
+  CopyOnAccessInMemoryStateInternals<?> getState();
+
+  /**
+   * Returns a TimerUpdateBuilder that was produced as a result of this evaluation. If the
+   * evaluation was triggered due to the delivery of one or more timers, those timers must be added
+   * to the builder before it is complete.
+   *
+   * <p>If this evaluation did not add or remove any timers, returns an empty TimerUpdate.
+   */
+  TimerUpdate getTimerUpdate();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 5030730..3fb773e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -69,13 +69,13 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
   @Override
   @Nullable
   public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) {
+      @Nullable CommittedBundle<?> inputBundle, EvaluationContext evaluationContext) {
     return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
   }
 
   private <OutputT> TransformEvaluator<?> getTransformEvaluator(
       final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
-      final InProcessEvaluationContext evaluationContext) {
+      final EvaluationContext evaluationContext) {
     return getTransformEvaluatorQueue(transform, evaluationContext).poll();
   }
 
@@ -90,7 +90,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
   private <OutputT, CheckpointMarkT extends CheckpointMark>
   Queue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> getTransformEvaluatorQueue(
       final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
-      final InProcessEvaluationContext evaluationContext) {
+      final EvaluationContext evaluationContext) {
     // Key by the application and the context the evaluation is occurring in (which call to
     // Pipeline#run).
     EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
@@ -134,7 +134,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     private static final int ARBITRARY_MAX_ELEMENTS = 10;
 
     private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
-    private final InProcessEvaluationContext evaluationContext;
+    private final EvaluationContext evaluationContext;
     private final ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>
         evaluatorQueue;
     /**
@@ -153,7 +153,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
 
     public UnboundedReadEvaluator(
         AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
-        InProcessEvaluationContext evaluationContext,
+        EvaluationContext evaluationContext,
         UnboundedSource<OutputT, CheckpointMarkT> source,
         ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue) {
       this.transform = transform;
@@ -168,7 +168,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     public void processElement(WindowedValue<Object> element) {}
 
     @Override
-    public InProcessTransformResult finishBundle() throws IOException {
+    public TransformResult finishBundle() throws IOException {
       UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
       try {
         boolean elementAvailable = startReader();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
new file mode 100644
index 0000000..570dc90
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UncommittedBundleOutputManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
+import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.Map;
+
+/**
+ * An {@link OutputManager} that outputs to {@link CommittedBundle Bundles} used by the
+ * {@link DirectRunner}.
+ */
+public class UncommittedBundleOutputManager implements OutputManager {
+  private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
+
+  public static UncommittedBundleOutputManager create(
+      Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
+    return new UncommittedBundleOutputManager(outputBundles);
+  }
+
+  public UncommittedBundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
+    this.bundles = bundles;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+    @SuppressWarnings("rawtypes")
+    UncommittedBundle bundle = bundles.get(tag);
+    bundle.add(output);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index f4260f5..47eaae7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -41,7 +41,7 @@ import java.util.List;
  *
  * <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for
  * the {@link WriteView} {@link PTransform}, which is part of the
- * {@link InProcessCreatePCollectionView} composite transform. This transform is an override for the
+ * {@link DirectCreatePCollectionView} composite transform. This transform is an override for the
  * {@link CreatePCollectionView} transform that applies windowing and triggers before the view is
  * written.
  */
@@ -50,7 +50,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
   public <T> TransformEvaluator<T> forApplication(
       AppliedPTransform<?, ?, ?> application,
       DirectRunner.CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
+      EvaluationContext evaluationContext) {
     @SuppressWarnings({"cast", "unchecked", "rawtypes"})
     TransformEvaluator<T> evaluator = createEvaluator(
             (AppliedPTransform) application, evaluationContext);
@@ -60,7 +60,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
   private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(
       final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>>
           application,
-      InProcessEvaluationContext context) {
+      EvaluationContext context) {
     PCollection<Iterable<InT>> input = application.getInput();
     final PCollectionViewWriter<InT, OuT> writer =
         context.createPCollectionViewWriter(input, application.getOutput());
@@ -75,14 +75,14 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
       }
 
       @Override
-      public InProcessTransformResult finishBundle() {
+      public TransformResult finishBundle() {
         writer.add(elements);
         return StepTransformResult.withoutHold(application).build();
       }
     };
   }
 
-  public static class InProcessViewOverrideFactory implements PTransformOverrideFactory {
+  public static class ViewOverrideFactory implements PTransformOverrideFactory {
     @Override
     public <InputT extends PInput, OutputT extends POutput>
         PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> transform) {
@@ -92,7 +92,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
       @SuppressWarnings({"rawtypes", "unchecked"})
       PTransform<InputT, OutputT> createView =
           (PTransform<InputT, OutputT>)
-              new InProcessCreatePCollectionView<>((CreatePCollectionView) transform);
+              new DirectCreatePCollectionView<>((CreatePCollectionView) transform);
       return createView;
     }
   }
@@ -100,11 +100,11 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
   /**
    * An in-process override for {@link CreatePCollectionView}.
    */
-  private static class InProcessCreatePCollectionView<ElemT, ViewT>
+  private static class DirectCreatePCollectionView<ElemT, ViewT>
       extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
     private final CreatePCollectionView<ElemT, ViewT> og;
 
-    private InProcessCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) {
+    private DirectCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) {
       this.og = og;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
new file mode 100644
index 0000000..b8f9987
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -0,0 +1,1420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.SortedMultiset;
+import com.google.common.collect.TreeMultiset;
+
+import org.joda.time.Instant;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+/**
+ * Manages watermarks of {@link PCollection PCollections} and input and output watermarks of
+ * {@link AppliedPTransform AppliedPTransforms} to provide event-time and completion tracking for
+ * in-memory execution. {@link WatermarkManager} is designed to update and return a
+ * consistent view of watermarks in the presence of concurrent updates.
+ *
+ * <p>An {@link WatermarkManager} is provided with the collection of root
+ * {@link AppliedPTransform AppliedPTransforms} and a map of {@link PCollection PCollections} to
+ * all the {@link AppliedPTransform AppliedPTransforms} that consume them at construction time.
+ *
+ * <p>Whenever a root {@link AppliedPTransform transform} produces elements, the
+ * {@link WatermarkManager} is provided with the produced elements and the output watermark
+ * of the producing {@link AppliedPTransform transform}. The
+ * {@link WatermarkManager watermark manager} is responsible for computing the watermarks
+ * of all {@link AppliedPTransform transforms} that consume one or more
+ * {@link PCollection PCollections}.
+ *
+ * <p>Whenever a non-root {@link AppliedPTransform} finishes processing one or more in-flight
+ * elements (referred to as the input {@link CommittedBundle bundle}), the following occurs
+ * atomically:
+ * <ul>
+ *  <li>All of the in-flight elements are removed from the collection of pending elements for the
+ *      {@link AppliedPTransform}.</li>
+ *  <li>All of the elements produced by the {@link AppliedPTransform} are added to the collection
+ *      of pending elements for each {@link AppliedPTransform} that consumes them.</li>
+ *  <li>The input watermark for the {@link AppliedPTransform} becomes the maximum value of
+ *    <ul>
+ *      <li>the previous input watermark</li>
+ *      <li>the minimum of
+ *        <ul>
+ *          <li>the timestamps of all currently pending elements</li>
+ *          <li>all input {@link PCollection} watermarks</li>
+ *        </ul>
+ *      </li>
+ *    </ul>
+ *  </li>
+ *  <li>The output watermark for the {@link AppliedPTransform} becomes the maximum of
+ *    <ul>
+ *      <li>the previous output watermark</li>
+ *      <li>the minimum of
+ *        <ul>
+ *          <li>the current input watermark</li>
+ *          <li>the current watermark holds</li>
+ *        </ul>
+ *      </li>
+ *    </ul>
+ *  </li>
+ *  <li>The watermark of the output {@link PCollection} can be advanced to the output watermark of
+ *      the {@link AppliedPTransform}</li>
+ *  <li>The watermark of all downstream {@link AppliedPTransform AppliedPTransforms} can be
+ *      advanced.</li>
+ * </ul>
+ *
+ * <p>The watermark of a {@link PCollection} is equal to the output watermark of the
+ * {@link AppliedPTransform} that produces it.
+ *
+ * <p>The watermarks for a {@link PTransform} are updated as follows when output is committed:<pre>
+ * Watermark_In'  = MAX(Watermark_In, MIN(U(TS_Pending), U(Watermark_InputPCollection)))
+ * Watermark_Out' = MAX(Watermark_Out, MIN(Watermark_In', U(StateHold)))
+ * Watermark_PCollection = Watermark_Out_ProducingPTransform
+ * </pre>
+ */
+public class WatermarkManager {
+  /**
+   * The watermark of some {@link Pipeline} element, usually a {@link PTransform} or a
+   * {@link PCollection}.
+   *
+   * <p>A watermark is a monotonically increasing value, which represents the point up to which the
+   * system believes it has received all of the data. Data that arrives with a timestamp that is
+   * before the watermark is considered late. {@link BoundedWindow#TIMESTAMP_MAX_VALUE} is a special
+   * timestamp which indicates we have received all of the data and there will be no more on-time or
+   * late data. This value is represented by {@link WatermarkManager#THE_END_OF_TIME}.
+   */
+  private static interface Watermark {
+    /**
+     * Returns the current value of this watermark.
+     */
+    Instant get();
+
+    /**
+     * Refreshes the value of this watermark from its input watermarks and watermark holds.
+     *
+     * @return true if the value of the watermark has changed (and thus dependent watermark must
+     *         also be updated
+     */
+    WatermarkUpdate refresh();
+  }
+
+  /**
+   * The result of computing a {@link Watermark}.
+   */
+  private static enum WatermarkUpdate {
+    /** The watermark is later than the value at the previous time it was computed. */
+    ADVANCED(true),
+    /** The watermark is equal to the value at the previous time it was computed. */
+    NO_CHANGE(false);
+
+    private final boolean advanced;
+
+    private WatermarkUpdate(boolean advanced) {
+      this.advanced = advanced;
+    }
+
+    public boolean isAdvanced() {
+      return advanced;
+    }
+
+    /**
+     * Returns the {@link WatermarkUpdate} that is a result of combining the two watermark updates.
+     *
+     * If either of the input {@link WatermarkUpdate WatermarkUpdates} were advanced, the result
+     * {@link WatermarkUpdate} has been advanced.
+     */
+    public WatermarkUpdate union(WatermarkUpdate that) {
+      if (this.advanced) {
+        return this;
+      }
+      return that;
+    }
+
+    /**
+     * Returns the {@link WatermarkUpdate} based on the former and current
+     * {@link Instant timestamps}.
+     */
+    public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTime) {
+      if (currentTime.isAfter(oldTime)) {
+        return ADVANCED;
+      }
+      return NO_CHANGE;
+    }
+  }
+
+  /**
+   * The input {@link Watermark} of an {@link AppliedPTransform}.
+   *
+   * <p>At any point, the value of an {@link AppliedPTransformInputWatermark} is equal to the
+   * minimum watermark across all of its input {@link Watermark Watermarks}, and the minimum
+   * timestamp of all of the pending elements, restricted to be monotonically increasing.
+   *
+   * <p>See {@link #refresh()} for more information.
+   */
+  private static class AppliedPTransformInputWatermark implements Watermark {
+    private final Collection<? extends Watermark> inputWatermarks;
+    private final SortedMultiset<WindowedValue<?>> pendingElements;
+    private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
+
+    private AtomicReference<Instant> currentWatermark;
+
+    public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) {
+      this.inputWatermarks = inputWatermarks;
+      this.pendingElements = TreeMultiset.create(new WindowedValueByTimestampComparator());
+      this.objectTimers = new HashMap<>();
+      currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
+    }
+
+    @Override
+    public Instant get() {
+      return currentWatermark.get();
+    }
+
+    /**
+     * {@inheritDoc}.
+     *
+     * <p>When refresh is called, the value of the {@link AppliedPTransformInputWatermark} becomes
+     * equal to the maximum value of
+     * <ul>
+     *   <li>the previous input watermark</li>
+     *   <li>the minimum of
+     *     <ul>
+     *       <li>the timestamps of all currently pending elements</li>
+     *       <li>all input {@link PCollection} watermarks</li>
+     *     </ul>
+     *   </li>
+     * </ul>
+     */
+    @Override
+    public synchronized WatermarkUpdate refresh() {
+      Instant oldWatermark = currentWatermark.get();
+      Instant minInputWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
+      for (Watermark inputWatermark : inputWatermarks) {
+        minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, inputWatermark.get());
+      }
+      if (!pendingElements.isEmpty()) {
+        minInputWatermark = INSTANT_ORDERING.min(
+            minInputWatermark, pendingElements.firstEntry().getElement().getTimestamp());
+      }
+      Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark);
+      currentWatermark.set(newWatermark);
+      return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
+    }
+
+    private synchronized void addPendingElements(Iterable<? extends WindowedValue<?>> newPending) {
+      for (WindowedValue<?> pendingElement : newPending) {
+        pendingElements.add(pendingElement);
+      }
+    }
+
+    private synchronized void removePendingElements(
+        Iterable<? extends WindowedValue<?>> finishedElements) {
+      for (WindowedValue<?> finishedElement : finishedElements) {
+        pendingElements.remove(finishedElement);
+      }
+    }
+
+    private synchronized void updateTimers(TimerUpdate update) {
+      NavigableSet<TimerData> keyTimers = objectTimers.get(update.key);
+      if (keyTimers == null) {
+        keyTimers = new TreeSet<>();
+        objectTimers.put(update.key, keyTimers);
+      }
+      for (TimerData timer : update.setTimers) {
+        if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
+          keyTimers.add(timer);
+        }
+      }
+      for (TimerData timer : update.deletedTimers) {
+        if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
+          keyTimers.remove(timer);
+        }
+      }
+      // We don't keep references to timers that have been fired and delivered via #getFiredTimers()
+    }
+
+    private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredEventTimeTimers() {
+      return extractFiredTimers(currentWatermark.get(), objectTimers);
+    }
+
+    @Override
+    public synchronized String toString() {
+      return MoreObjects.toStringHelper(AppliedPTransformInputWatermark.class)
+          .add("pendingElements", pendingElements)
+          .add("currentWatermark", currentWatermark)
+          .toString();
+    }
+  }
+
+  /**
+   * The output {@link Watermark} of an {@link AppliedPTransform}.
+   *
+   * <p>The value of an {@link AppliedPTransformOutputWatermark} is equal to the minimum of the
+   * current watermark hold and the {@link AppliedPTransformInputWatermark} for the same
+   * {@link AppliedPTransform}, restricted to be monotonically increasing. See
+   * {@link #refresh()} for more information.
+   */
+  private static class AppliedPTransformOutputWatermark implements Watermark {
+    private final Watermark inputWatermark;
+    private final PerKeyHolds holds;
+    private AtomicReference<Instant> currentWatermark;
+
+    public AppliedPTransformOutputWatermark(AppliedPTransformInputWatermark inputWatermark) {
+      this.inputWatermark = inputWatermark;
+      holds = new PerKeyHolds();
+      currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
+    }
+
+    public synchronized void updateHold(Object key, Instant newHold) {
+      if (newHold == null) {
+        holds.removeHold(key);
+      } else {
+        holds.updateHold(key, newHold);
+      }
+    }
+
+    @Override
+    public Instant get() {
+      return currentWatermark.get();
+    }
+
+    /**
+     * {@inheritDoc}.
+     *
+     * <p>When refresh is called, the value of the {@link AppliedPTransformOutputWatermark} becomes
+     * equal to the maximum value of:
+     * <ul>
+     *   <li>the previous output watermark</li>
+     *   <li>the minimum of
+     *     <ul>
+     *       <li>the current input watermark</li>
+     *       <li>the current watermark holds</li>
+     *     </ul>
+     *   </li>
+     * </ul>
+     */
+    @Override
+    public synchronized WatermarkUpdate refresh() {
+      Instant oldWatermark = currentWatermark.get();
+      Instant newWatermark = INSTANT_ORDERING.min(inputWatermark.get(), holds.getMinHold());
+      newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark);
+      currentWatermark.set(newWatermark);
+      return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
+    }
+
+    @Override
+    public synchronized String toString() {
+      return MoreObjects.toStringHelper(AppliedPTransformOutputWatermark.class)
+          .add("holds", holds)
+          .add("currentWatermark", currentWatermark)
+          .toString();
+    }
+  }
+
+  /**
+   * The input {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an
+   * {@link AppliedPTransform}.
+   *
+   * <p>At any point, the hold value of an {@link SynchronizedProcessingTimeInputWatermark} is equal
+   * to the minimum across all pending bundles at the {@link AppliedPTransform} and all upstream
+   * {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The value of the input
+   * synchronized processing time at any step is equal to the maximum of:
+   * <ul>
+   *   <li>The most recently returned synchronized processing input time
+   *   <li>The minimum of
+   *     <ul>
+   *       <li>The current processing time
+   *       <li>The current synchronized processing time input hold
+   *     </ul>
+   * </ul>
+   */
+  private static class SynchronizedProcessingTimeInputWatermark implements Watermark {
+    private final Collection<? extends Watermark> inputWms;
+    private final Collection<CommittedBundle<?>> pendingBundles;
+    private final Map<StructuralKey<?>, NavigableSet<TimerData>> processingTimers;
+    private final Map<StructuralKey<?>, NavigableSet<TimerData>> synchronizedProcessingTimers;
+
+    private final PriorityQueue<TimerData> pendingTimers;
+
+    private AtomicReference<Instant> earliestHold;
+
+    public SynchronizedProcessingTimeInputWatermark(Collection<? extends Watermark> inputWms) {
+      this.inputWms = inputWms;
+      this.pendingBundles = new HashSet<>();
+      this.processingTimers = new HashMap<>();
+      this.synchronizedProcessingTimers = new HashMap<>();
+      this.pendingTimers = new PriorityQueue<>();
+      Instant initialHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
+      for (Watermark wm : inputWms) {
+        initialHold = INSTANT_ORDERING.min(initialHold, wm.get());
+      }
+      earliestHold = new AtomicReference<>(initialHold);
+    }
+
+    @Override
+    public Instant get() {
+      return earliestHold.get();
+    }
+
+    /**
+     * {@inheritDoc}.
+     *
+     * <p>When refresh is called, the value of the {@link SynchronizedProcessingTimeInputWatermark}
+     * becomes equal to the minimum value of
+     * <ul>
+     *   <li>the timestamps of all currently pending bundles</li>
+     *   <li>all input {@link PCollection} synchronized processing time watermarks</li>
+     * </ul>
+     *
+     * <p>Note that this value is not monotonic, but the returned value for the synchronized
+     * processing time must be.
+     */
+    @Override
+    public synchronized WatermarkUpdate refresh() {
+      Instant oldHold = earliestHold.get();
+      Instant minTime = THE_END_OF_TIME.get();
+      for (Watermark input : inputWms) {
+        minTime = INSTANT_ORDERING.min(minTime, input.get());
+      }
+      for (CommittedBundle<?> bundle : pendingBundles) {
+        // TODO: Track elements in the bundle by the processing time they were output instead of
+        // entire bundles. Requried to support arbitrarily splitting and merging bundles between
+        // steps
+        minTime = INSTANT_ORDERING.min(minTime, bundle.getSynchronizedProcessingOutputWatermark());
+      }
+      earliestHold.set(minTime);
+      return WatermarkUpdate.fromTimestamps(oldHold, minTime);
+    }
+
+    public synchronized void addPending(CommittedBundle<?> bundle) {
+      pendingBundles.add(bundle);
+    }
+
+    public synchronized void removePending(CommittedBundle<?> bundle) {
+      pendingBundles.remove(bundle);
+    }
+
+    /**
+     * Return the earliest timestamp of the earliest timer that has not been completed. This is
+     * either the earliest timestamp across timers that have not been completed, or the earliest
+     * timestamp across timers that have been delivered but have not been completed.
+     */
+    public synchronized Instant getEarliestTimerTimestamp() {
+      Instant earliest = THE_END_OF_TIME.get();
+      for (NavigableSet<TimerData> timers : processingTimers.values()) {
+        if (!timers.isEmpty()) {
+          earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest);
+        }
+      }
+      for (NavigableSet<TimerData> timers : synchronizedProcessingTimers.values()) {
+        if (!timers.isEmpty()) {
+          earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest);
+        }
+      }
+      if (!pendingTimers.isEmpty()) {
+        earliest = INSTANT_ORDERING.min(pendingTimers.peek().getTimestamp(), earliest);
+      }
+      return earliest;
+    }
+
+    private synchronized void updateTimers(TimerUpdate update) {
+      Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key);
+      for (TimerData addedTimer : update.setTimers) {
+        NavigableSet<TimerData> timerQueue = timerMap.get(addedTimer.getDomain());
+        if (timerQueue != null) {
+          timerQueue.add(addedTimer);
+        }
+      }
+
+      for (TimerData completedTimer : update.completedTimers) {
+        pendingTimers.remove(completedTimer);
+      }
+      for (TimerData deletedTimer : update.deletedTimers) {
+        NavigableSet<TimerData> timerQueue = timerMap.get(deletedTimer.getDomain());
+        if (timerQueue != null) {
+          timerQueue.remove(deletedTimer);
+        }
+      }
+    }
+
+    private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredDomainTimers(
+        TimeDomain domain, Instant firingTime) {
+      Map<StructuralKey<?>, List<TimerData>> firedTimers;
+      switch (domain) {
+        case PROCESSING_TIME:
+          firedTimers = extractFiredTimers(firingTime, processingTimers);
+          break;
+        case SYNCHRONIZED_PROCESSING_TIME:
+          firedTimers =
+              extractFiredTimers(
+                  INSTANT_ORDERING.min(firingTime, earliestHold.get()),
+                  synchronizedProcessingTimers);
+          break;
+        default:
+          throw new IllegalArgumentException(
+              "Called getFiredTimers on a Synchronized Processing Time watermark"
+                  + " and gave a non-processing time domain "
+                  + domain);
+      }
+      for (Map.Entry<StructuralKey<?>, ? extends Collection<TimerData>> firedTimer :
+          firedTimers.entrySet()) {
+        pendingTimers.addAll(firedTimer.getValue());
+      }
+      return firedTimers;
+    }
+
+    private Map<TimeDomain, NavigableSet<TimerData>> timerMap(StructuralKey<?> key) {
+      NavigableSet<TimerData> processingQueue = processingTimers.get(key);
+      if (processingQueue == null) {
+        processingQueue = new TreeSet<>();
+        processingTimers.put(key, processingQueue);
+      }
+      NavigableSet<TimerData> synchronizedProcessingQueue =
+          synchronizedProcessingTimers.get(key);
+      if (synchronizedProcessingQueue == null) {
+        synchronizedProcessingQueue = new TreeSet<>();
+        synchronizedProcessingTimers.put(key, synchronizedProcessingQueue);
+      }
+      EnumMap<TimeDomain, NavigableSet<TimerData>> result = new EnumMap<>(TimeDomain.class);
+      result.put(TimeDomain.PROCESSING_TIME, processingQueue);
+      result.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, synchronizedProcessingQueue);
+      return result;
+    }
+
+    @Override
+    public synchronized String toString() {
+      return MoreObjects.toStringHelper(SynchronizedProcessingTimeInputWatermark.class)
+          .add("earliestHold", earliestHold)
+          .toString();
+    }
+  }
+
+  /**
+   * The output {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an
+   * {@link AppliedPTransform}.
+   *
+   * <p>At any point, the hold value of an {@link SynchronizedProcessingTimeOutputWatermark} is
+   * equal to the minimum across all incomplete timers at the {@link AppliedPTransform} and all
+   * upstream {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The value of the output
+   * synchronized processing time at any step is equal to the maximum of:
+   * <ul>
+   *   <li>The most recently returned synchronized processing output time
+   *   <li>The minimum of
+   *     <ul>
+   *       <li>The current processing time
+   *       <li>The current synchronized processing time output hold
+   *     </ul>
+   * </ul>
+   */
+  private static class SynchronizedProcessingTimeOutputWatermark implements Watermark {
+    private final SynchronizedProcessingTimeInputWatermark inputWm;
+    private AtomicReference<Instant> latestRefresh;
+
+    public SynchronizedProcessingTimeOutputWatermark(
+        SynchronizedProcessingTimeInputWatermark inputWm) {
+      this.inputWm = inputWm;
+      this.latestRefresh = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
+    }
+
+    @Override
+    public Instant get() {
+      return latestRefresh.get();
+    }
+
+    /**
+     * {@inheritDoc}.
+     *
+     * <p>When refresh is called, the value of the {@link SynchronizedProcessingTimeOutputWatermark}
+     * becomes equal to the minimum value of:
+     * <ul>
+     *   <li>the current input watermark.
+     *   <li>all {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} timers that are based on the input
+     *       watermark.
+     *   <li>all {@link TimeDomain#PROCESSING_TIME} timers that are based on the input watermark.
+     * </ul>
+     *
+     * <p>Note that this value is not monotonic, but the returned value for the synchronized
+     * processing time must be.
+     */
+    @Override
+    public synchronized WatermarkUpdate refresh() {
+      // Hold the output synchronized processing time to the input watermark, which takes into
+      // account buffered bundles, and the earliest pending timer, which determines what to hold
+      // downstream timers to.
+      Instant oldRefresh = latestRefresh.get();
+      Instant newTimestamp =
+          INSTANT_ORDERING.min(inputWm.get(), inputWm.getEarliestTimerTimestamp());
+      latestRefresh.set(newTimestamp);
+      return WatermarkUpdate.fromTimestamps(oldRefresh, newTimestamp);
+    }
+
+    @Override
+    public synchronized String toString() {
+      return MoreObjects.toStringHelper(SynchronizedProcessingTimeOutputWatermark.class)
+          .add("latestRefresh", latestRefresh)
+          .toString();
+    }
+  }
+
+  /**
+   * The {@code Watermark} that is after the latest time it is possible to represent in the global
+   * window. This is a distinguished value representing a complete {@link PTransform}.
+   */
+  private static final Watermark THE_END_OF_TIME = new Watermark() {
+        @Override
+        public WatermarkUpdate refresh() {
+          // THE_END_OF_TIME is a distinguished value that cannot be advanced.
+          return WatermarkUpdate.NO_CHANGE;
+        }
+
+        @Override
+        public Instant get() {
+          return BoundedWindow.TIMESTAMP_MAX_VALUE;
+        }
+      };
+
+  private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural();
+
+  /**
+   * A function that takes a WindowedValue and returns the exploded representation of that
+   * {@link WindowedValue}.
+   */
+  private static final Function<WindowedValue<?>, ? extends Iterable<? extends WindowedValue<?>>>
+      EXPLODE_WINDOWS_FN =
+          new Function<WindowedValue<?>, Iterable<? extends WindowedValue<?>>>() {
+            @Override
+            public Iterable<? extends WindowedValue<?>> apply(WindowedValue<?> input) {
+              return input.explodeWindows();
+            }
+          };
+
+  /**
+   * For each (Object, PriorityQueue) pair in the provided map, remove each Timer that is before the
+   * latestTime argument and put in in the result with the same key, then remove all of the keys
+   * which have no more pending timers.
+   *
+   * The result collection retains ordering of timers (from earliest to latest).
+   */
+  private static Map<StructuralKey<?>, List<TimerData>> extractFiredTimers(
+      Instant latestTime, Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers) {
+    Map<StructuralKey<?>, List<TimerData>> result = new HashMap<>();
+    Set<StructuralKey<?>> emptyKeys = new HashSet<>();
+    for (Map.Entry<StructuralKey<?>, NavigableSet<TimerData>> pendingTimers :
+        objectTimers.entrySet()) {
+      NavigableSet<TimerData> timers = pendingTimers.getValue();
+      if (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) {
+        ArrayList<TimerData> keyFiredTimers = new ArrayList<>();
+        result.put(pendingTimers.getKey(), keyFiredTimers);
+        while (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) {
+          keyFiredTimers.add(timers.first());
+          timers.remove(timers.first());
+        }
+      }
+      if (timers.isEmpty()) {
+        emptyKeys.add(pendingTimers.getKey());
+      }
+    }
+    objectTimers.keySet().removeAll(emptyKeys);
+    return result;
+  }
+
+  ////////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * The {@link Clock} providing the current time in the {@link TimeDomain#PROCESSING_TIME} domain.
+   */
+  private final Clock clock;
+
+  /**
+   * A map from each {@link PCollection} to all {@link AppliedPTransform PTransform applications}
+   * that consume that {@link PCollection}.
+   */
+  private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers;
+
+  /**
+   * The input and output watermark of each {@link AppliedPTransform}.
+   */
+  private final Map<AppliedPTransform<?, ?, ?>, TransformWatermarks> transformToWatermarks;
+
+  /**
+   * A queue of pending updates to the state of this {@link WatermarkManager}.
+   */
+  private final ConcurrentLinkedQueue<PendingWatermarkUpdate> pendingUpdates;
+
+  /**
+   * A queue of pending {@link AppliedPTransform AppliedPTransforms} that have potentially
+   * stale data.
+   */
+  private final ConcurrentLinkedQueue<AppliedPTransform<?, ?, ?>> pendingRefreshes;
+
+  /**
+   * Creates a new {@link WatermarkManager}. All watermarks within the newly created
+   * {@link WatermarkManager} start at {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, the
+   * minimum watermark, with no watermark holds or pending elements.
+   *
+   * @param rootTransforms the root-level transforms of the {@link Pipeline}
+   * @param consumers a mapping between each {@link PCollection} in the {@link Pipeline} to the
+   *                  transforms that consume it as a part of their input
+   */
+  public static WatermarkManager create(
+      Clock clock,
+      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
+      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) {
+    return new WatermarkManager(clock, rootTransforms, consumers);
+  }
+
+  private WatermarkManager(
+      Clock clock,
+      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
+      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) {
+    this.clock = clock;
+    this.consumers = consumers;
+    this.pendingUpdates = new ConcurrentLinkedQueue<>();
+    this.pendingRefreshes = new ConcurrentLinkedQueue<>();
+
+    transformToWatermarks = new HashMap<>();
+
+    for (AppliedPTransform<?, ?, ?> rootTransform : rootTransforms) {
+      getTransformWatermark(rootTransform);
+    }
+    for (Collection<AppliedPTransform<?, ?, ?>> intermediateTransforms : consumers.values()) {
+      for (AppliedPTransform<?, ?, ?> transform : intermediateTransforms) {
+        getTransformWatermark(transform);
+      }
+    }
+  }
+
+  private TransformWatermarks getTransformWatermark(AppliedPTransform<?, ?, ?> transform) {
+    TransformWatermarks wms = transformToWatermarks.get(transform);
+    if (wms == null) {
+      List<Watermark> inputCollectionWatermarks = getInputWatermarks(transform);
+      AppliedPTransformInputWatermark inputWatermark =
+          new AppliedPTransformInputWatermark(inputCollectionWatermarks);
+      AppliedPTransformOutputWatermark outputWatermark =
+          new AppliedPTransformOutputWatermark(inputWatermark);
+
+      SynchronizedProcessingTimeInputWatermark inputProcessingWatermark =
+          new SynchronizedProcessingTimeInputWatermark(getInputProcessingWatermarks(transform));
+      SynchronizedProcessingTimeOutputWatermark outputProcessingWatermark =
+          new SynchronizedProcessingTimeOutputWatermark(inputProcessingWatermark);
+
+      wms =
+          new TransformWatermarks(
+              inputWatermark, outputWatermark, inputProcessingWatermark, outputProcessingWatermark);
+      transformToWatermarks.put(transform, wms);
+    }
+    return wms;
+  }
+
+  private Collection<Watermark> getInputProcessingWatermarks(
+      AppliedPTransform<?, ?, ?> transform) {
+    ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
+    Collection<? extends PValue> inputs = transform.getInput().expand();
+    if (inputs.isEmpty()) {
+      inputWmsBuilder.add(THE_END_OF_TIME);
+    }
+    for (PValue pvalue : inputs) {
+      Watermark producerOutputWatermark =
+          getTransformWatermark(pvalue.getProducingTransformInternal())
+              .synchronizedProcessingOutputWatermark;
+      inputWmsBuilder.add(producerOutputWatermark);
+    }
+    return inputWmsBuilder.build();
+  }
+
+  private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> transform) {
+    ImmutableList.Builder<Watermark> inputWatermarksBuilder = ImmutableList.builder();
+    Collection<? extends PValue> inputs = transform.getInput().expand();
+    if (inputs.isEmpty()) {
+      inputWatermarksBuilder.add(THE_END_OF_TIME);
+    }
+    for (PValue pvalue : inputs) {
+      Watermark producerOutputWatermark =
+          getTransformWatermark(pvalue.getProducingTransformInternal()).outputWatermark;
+      inputWatermarksBuilder.add(producerOutputWatermark);
+    }
+    List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build();
+    return inputCollectionWatermarks;
+  }
+
+  ////////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Gets the input and output watermarks for an {@link AppliedPTransform}. If the
+   * {@link AppliedPTransform PTransform} has not processed any elements, return a watermark of
+   * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}.
+   *
+   * @return a snapshot of the input watermark and output watermark for the provided transform
+   */
+  public TransformWatermarks getWatermarks(AppliedPTransform<?, ?, ?> transform) {
+    return transformToWatermarks.get(transform);
+  }
+
+  /**
+   * Updates the watermarks of a transform with one or more inputs.
+   *
+   * <p>Each transform has two monotonically increasing watermarks: the input watermark, which can,
+   * at any time, be updated to equal:
+   * <pre>
+   * MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks))
+   * </pre>
+   * and the output watermark, which can, at any time, be updated to equal:
+   * <pre>
+   * MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))
+   * </pre>.
+   *
+   * @param completed the input that has completed
+   * @param timerUpdate the timers that were added, removed, and completed as part of producing
+   *                    this update
+   * @param result the result that was produced by processing the input
+   * @param earliestHold the earliest watermark hold in the transform's state. {@code null} if there
+   *                     is no hold
+   */
+  public void updateWatermarks(
+      @Nullable CommittedBundle<?> completed,
+      TimerUpdate timerUpdate,
+      CommittedResult result,
+      Instant earliestHold) {
+    pendingUpdates.offer(PendingWatermarkUpdate.create(completed,
+        timerUpdate,
+        result,
+        earliestHold));
+  }
+
+  /**
+   * Applies all pending updates to this {@link WatermarkManager}, causing the pending state
+   * of all {@link TransformWatermarks} to be advanced as far as possible.
+   */
+  private void applyPendingUpdates() {
+    Set<AppliedPTransform<?, ?, ?>> updatedTransforms = new HashSet<>();
+    PendingWatermarkUpdate pending = pendingUpdates.poll();
+    while (pending != null) {
+      applyPendingUpdate(pending);
+      updatedTransforms.add(pending.getTransform());
+      pending = pendingUpdates.poll();
+    }
+    pendingRefreshes.addAll(updatedTransforms);
+  }
+
+  private void applyPendingUpdate(PendingWatermarkUpdate pending) {
+    CommittedResult result = pending.getResult();
+    AppliedPTransform transform = result.getTransform();
+    CommittedBundle<?> inputBundle = pending.getInputBundle();
+
+    updatePending(inputBundle, pending.getTimerUpdate(), result);
+
+    TransformWatermarks transformWms = transformToWatermarks.get(transform);
+    transformWms.setEventTimeHold(inputBundle == null ? null : inputBundle.getKey(),
+        pending.getEarliestHold());
+  }
+
+  /**
+   * First adds all produced elements to the queue of pending elements for each consumer, then adds
+   * all pending timers to the collection of pending timers, then removes all completed and deleted
+   * timers from the collection of pending timers, then removes all completed elements from the
+   * pending queue of the transform.
+   *
+   * <p>It is required that all newly pending elements are added to the queue of pending elements
+   * for each consumer prior to the completed elements being removed, as doing otherwise could cause
+   * a Watermark to appear in a state in which the upstream (completed) element does not hold the
+   * watermark but the element it produced is not yet pending. This can cause the watermark to
+   * erroneously advance.
+   */
+  private void updatePending(
+      CommittedBundle<?> input,
+      TimerUpdate timerUpdate,
+      CommittedResult result) {
+    // Newly pending elements must be added before completed elements are removed, as the two
+    // do not share a Mutex within this call and thus can be interleaved with external calls to
+    // refresh.
+    for (CommittedBundle<?> bundle : result.getOutputs()) {
+      for (AppliedPTransform<?, ?, ?> consumer : consumers.get(bundle.getPCollection())) {
+        TransformWatermarks watermarks = transformToWatermarks.get(consumer);
+        watermarks.addPending(bundle);
+      }
+    }
+
+    TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform());
+    if (input != null) {
+      // Add the unprocessed inputs
+      completedTransform.addPending(result.getUnprocessedInputs());
+    }
+    completedTransform.updateTimers(timerUpdate);
+    if (input != null) {
+      completedTransform.removePending(input);
+    }
+  }
+
+  /**
+   * Refresh the watermarks contained within this {@link WatermarkManager}, causing all
+   * watermarks to be advanced as far as possible.
+   */
+  synchronized void refreshAll() {
+    applyPendingUpdates();
+    while (!pendingRefreshes.isEmpty()) {
+      refreshWatermarks(pendingRefreshes.poll());
+    }
+  }
+
+  private void refreshWatermarks(AppliedPTransform<?, ?, ?> toRefresh) {
+    TransformWatermarks myWatermarks = transformToWatermarks.get(toRefresh);
+    WatermarkUpdate updateResult = myWatermarks.refresh();
+    Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
+    if (updateResult.isAdvanced()) {
+      for (PValue outputPValue : toRefresh.getOutput().expand()) {
+        additionalRefreshes.addAll(consumers.get(outputPValue));
+      }
+    }
+    pendingRefreshes.addAll(additionalRefreshes);
+  }
+
+  /**
+   * Returns a map of each {@link PTransform} that has pending timers to those timers. All of the
+   * pending timers will be removed from this {@link WatermarkManager}.
+   */
+  public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() {
+    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> allTimers = new HashMap<>();
+    for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry :
+        transformToWatermarks.entrySet()) {
+      Map<StructuralKey<?>, FiredTimers> keyFiredTimers =
+          watermarksEntry.getValue().extractFiredTimers();
+      if (!keyFiredTimers.isEmpty()) {
+        allTimers.put(watermarksEntry.getKey(), keyFiredTimers);
+      }
+    }
+    return allTimers;
+  }
+
+  /**
+   * A (key, Instant) pair that holds the watermark. Holds are per-key, but the watermark is global,
+   * and as such the watermark manager must track holds and the release of holds on a per-key basis.
+   *
+   * <p>The {@link #compareTo(KeyedHold)} method of {@link KeyedHold} is not consistent with equals,
+   * as the key is arbitrarily ordered via identity, rather than object equality.
+   */
+  private static final class KeyedHold implements Comparable<KeyedHold> {
+    private static final Ordering<Object> KEY_ORDERING = Ordering.arbitrary().nullsLast();
+
+    private final Object key;
+    private final Instant timestamp;
+
+    /**
+     * Create a new KeyedHold with the specified key and timestamp.
+     */
+    public static KeyedHold of(Object key, Instant timestamp) {
+      return new KeyedHold(key, MoreObjects.firstNonNull(timestamp, THE_END_OF_TIME.get()));
+    }
+
+    private KeyedHold(Object key, Instant timestamp) {
+      this.key = key;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public int compareTo(KeyedHold that) {
+      return ComparisonChain.start()
+          .compare(this.timestamp, that.timestamp)
+          .compare(this.key, that.key, KEY_ORDERING)
+          .result();
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(timestamp, key);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == null || !(other instanceof KeyedHold)) {
+        return false;
+      }
+      KeyedHold that = (KeyedHold) other;
+      return Objects.equals(this.timestamp, that.timestamp) && Objects.equals(this.key, that.key);
+    }
+
+    /**
+     * Get the value of this {@link KeyedHold}.
+     */
+    public Instant getTimestamp() {
+      return timestamp;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(KeyedHold.class)
+          .add("key", key)
+          .add("hold", timestamp)
+          .toString();
+    }
+  }
+
+  private static class PerKeyHolds {
+    private final Map<Object, KeyedHold> keyedHolds;
+    private final PriorityQueue<KeyedHold> allHolds;
+
+    private PerKeyHolds() {
+      this.keyedHolds = new HashMap<>();
+      this.allHolds = new PriorityQueue<>();
+    }
+
+    /**
+     * Gets the minimum hold across all keys in this {@link PerKeyHolds}, or THE_END_OF_TIME if
+     * there are no holds within this {@link PerKeyHolds}.
+     */
+    public Instant getMinHold() {
+      return allHolds.isEmpty() ? THE_END_OF_TIME.get() : allHolds.peek().getTimestamp();
+    }
+
+    /**
+     * Updates the hold of the provided key to the provided value, removing any other holds for
+     * the same key.
+     */
+    public void updateHold(@Nullable Object key, Instant newHold) {
+      removeHold(key);
+      KeyedHold newKeyedHold = KeyedHold.of(key, newHold);
+      keyedHolds.put(key, newKeyedHold);
+      allHolds.offer(newKeyedHold);
+    }
+
+    /**
+     * Removes the hold of the provided key.
+     */
+    public void removeHold(Object key) {
+      KeyedHold oldHold = keyedHolds.get(key);
+      if (oldHold != null) {
+        allHolds.remove(oldHold);
+      }
+    }
+  }
+
+  /**
+   * A reference to the input and output watermarks of an {@link AppliedPTransform}.
+   */
+  public class TransformWatermarks {
+    private final AppliedPTransformInputWatermark inputWatermark;
+    private final AppliedPTransformOutputWatermark outputWatermark;
+
+    private final SynchronizedProcessingTimeInputWatermark synchronizedProcessingInputWatermark;
+    private final SynchronizedProcessingTimeOutputWatermark synchronizedProcessingOutputWatermark;
+
+    private Instant latestSynchronizedInputWm;
+    private Instant latestSynchronizedOutputWm;
+
+    private TransformWatermarks(
+        AppliedPTransformInputWatermark inputWatermark,
+        AppliedPTransformOutputWatermark outputWatermark,
+        SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark,
+        SynchronizedProcessingTimeOutputWatermark outputSynchProcessingWatermark) {
+      this.inputWatermark = inputWatermark;
+      this.outputWatermark = outputWatermark;
+
+      this.synchronizedProcessingInputWatermark = inputSynchProcessingWatermark;
+      this.synchronizedProcessingOutputWatermark = outputSynchProcessingWatermark;
+      this.latestSynchronizedInputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      this.latestSynchronizedOutputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    /**
+     * Returns the input watermark of the {@link AppliedPTransform}.
+     */
+    public Instant getInputWatermark() {
+      return Preconditions.checkNotNull(inputWatermark.get());
+    }
+
+    /**
+     * Returns the output watermark of the {@link AppliedPTransform}.
+     */
+    public Instant getOutputWatermark() {
+      return outputWatermark.get();
+    }
+
+    /**
+     * Returns the synchronized processing input time of the {@link AppliedPTransform}.
+     *
+     * <p>The returned value is guaranteed to be monotonically increasing, and outside of the
+     * presence of holds, will increase as the system time progresses.
+     */
+    public synchronized Instant getSynchronizedProcessingInputTime() {
+      latestSynchronizedInputWm = INSTANT_ORDERING.max(
+          latestSynchronizedInputWm,
+          INSTANT_ORDERING.min(clock.now(), synchronizedProcessingInputWatermark.get()));
+      return latestSynchronizedInputWm;
+    }
+
+    /**
+     * Returns the synchronized processing output time of the {@link AppliedPTransform}.
+     *
+     * <p>The returned value is guaranteed to be monotonically increasing, and outside of the
+     * presence of holds, will increase as the system time progresses.
+     */
+    public synchronized Instant getSynchronizedProcessingOutputTime() {
+      latestSynchronizedOutputWm = INSTANT_ORDERING.max(
+          latestSynchronizedOutputWm,
+          INSTANT_ORDERING.min(clock.now(), synchronizedProcessingOutputWatermark.get()));
+      return latestSynchronizedOutputWm;
+    }
+
+    private WatermarkUpdate refresh() {
+      inputWatermark.refresh();
+      synchronizedProcessingInputWatermark.refresh();
+      WatermarkUpdate eventOutputUpdate = outputWatermark.refresh();
+      WatermarkUpdate syncOutputUpdate = synchronizedProcessingOutputWatermark.refresh();
+      return eventOutputUpdate.union(syncOutputUpdate);
+    }
+
+    private void setEventTimeHold(Object key, Instant newHold) {
+      outputWatermark.updateHold(key, newHold);
+    }
+
+    private void removePending(CommittedBundle<?> bundle) {
+      inputWatermark.removePendingElements(elementsFromBundle(bundle));
+      synchronizedProcessingInputWatermark.removePending(bundle);
+    }
+
+    private void addPending(CommittedBundle<?> bundle) {
+      inputWatermark.addPendingElements(elementsFromBundle(bundle));
+      synchronizedProcessingInputWatermark.addPending(bundle);
+    }
+
+    private Iterable<? extends WindowedValue<?>> elementsFromBundle(CommittedBundle<?> bundle) {
+      return FluentIterable.from(bundle.getElements()).transformAndConcat(EXPLODE_WINDOWS_FN);
+    }
+
+    private Map<StructuralKey<?>, FiredTimers> extractFiredTimers() {
+      Map<StructuralKey<?>, List<TimerData>> eventTimeTimers =
+          inputWatermark.extractFiredEventTimeTimers();
+      Map<StructuralKey<?>, List<TimerData>> processingTimers;
+      Map<StructuralKey<?>, List<TimerData>> synchronizedTimers;
+      if (inputWatermark.get().equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+        processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
+            TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE);
+        synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
+            TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE);
+      } else {
+        processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
+            TimeDomain.PROCESSING_TIME, clock.now());
+        synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime());
+      }
+      Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedTimers = new HashMap<>();
+      groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, synchronizedTimers);
+
+      Map<StructuralKey<?>, FiredTimers> keyFiredTimers = new HashMap<>();
+      for (Map.Entry<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> firedTimers :
+          groupedTimers.entrySet()) {
+        keyFiredTimers.put(firedTimers.getKey(), new FiredTimers(firedTimers.getValue()));
+      }
+      return keyFiredTimers;
+    }
+
+    @SafeVarargs
+    private final void groupFiredTimers(
+        Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedToMutate,
+        Map<StructuralKey<?>, List<TimerData>>... timersToGroup) {
+      for (Map<StructuralKey<?>, List<TimerData>> subGroup : timersToGroup) {
+        for (Map.Entry<StructuralKey<?>, List<TimerData>> newTimers : subGroup.entrySet()) {
+          Map<TimeDomain, List<TimerData>> grouped = groupedToMutate.get(newTimers.getKey());
+          if (grouped == null) {
+            grouped = new HashMap<>();
+            groupedToMutate.put(newTimers.getKey(), grouped);
+          }
+          grouped.put(newTimers.getValue().get(0).getDomain(), newTimers.getValue());
+        }
+      }
+    }
+
+    private void updateTimers(TimerUpdate update) {
+      inputWatermark.updateTimers(update);
+      synchronizedProcessingInputWatermark.updateTimers(update);
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(TransformWatermarks.class)
+          .add("inputWatermark", inputWatermark)
+          .add("outputWatermark", outputWatermark)
+          .add("inputProcessingTime", synchronizedProcessingInputWatermark)
+          .add("outputProcessingTime", synchronizedProcessingOutputWatermark)
+          .toString();
+    }
+  }
+
+  /**
+   * A collection of newly set, deleted, and completed timers.
+   *
+   * <p>setTimers and deletedTimers are collections of {@link TimerData} that have been added to the
+   * {@link TimerInternals} of an executed step. completedTimers are timers that were delivered as
+   * the input to the executed step.
+   */
+  public static class TimerUpdate {
+    private final StructuralKey<?> key;
+    private final Iterable<? extends TimerData> completedTimers;
+
+    private final Iterable<? extends TimerData> setTimers;
+    private final Iterable<? extends TimerData> deletedTimers;
+
+    /**
+     * Returns a TimerUpdate for a null key with no timers.
+     */
+    public static TimerUpdate empty() {
+      return new TimerUpdate(
+          null,
+          Collections.<TimerData>emptyList(),
+          Collections.<TimerData>emptyList(),
+          Collections.<TimerData>emptyList());
+    }
+
+    /**
+     * Creates a new {@link TimerUpdate} builder with the provided completed timers that needs the
+     * set and deleted timers to be added to it.
+     */
+    public static TimerUpdateBuilder builder(StructuralKey<?> key) {
+      return new TimerUpdateBuilder(key);
+    }
+
+    /**
+     * A {@link TimerUpdate} builder that needs to be provided with set timers and deleted timers.
+     */
+    public static final class TimerUpdateBuilder {
+      private final StructuralKey<?> key;
+      private final Collection<TimerData> completedTimers;
+      private final Collection<TimerData> setTimers;
+      private final Collection<TimerData> deletedTimers;
+
+      private TimerUpdateBuilder(StructuralKey<?> key) {
+        this.key = key;
+        this.completedTimers = new HashSet<>();
+        this.setTimers = new HashSet<>();
+        this.deletedTimers = new HashSet<>();
+      }
+
+      /**
+       * Adds all of the provided timers to the collection of completed timers, and returns this
+       * {@link TimerUpdateBuilder}.
+       */
+      public TimerUpdateBuilder withCompletedTimers(Iterable<TimerData> completedTimers) {
+        Iterables.addAll(this.completedTimers, completedTimers);
+        return this;
+      }
+
+      /**
+       * Adds the provided timer to the collection of set timers, removing it from deleted timers if
+       * it has previously been deleted. Returns this {@link TimerUpdateBuilder}.
+       */
+      public TimerUpdateBuilder setTimer(TimerData setTimer) {
+        deletedTimers.remove(setTimer);
+        setTimers.add(setTimer);
+        return this;
+      }
+
+      /**
+       * Adds the provided timer to the collection of deleted timers, removing it from set timers if
+       * it has previously been set. Returns this {@link TimerUpdateBuilder}.
+       */
+      public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) {
+        deletedTimers.add(deletedTimer);
+        setTimers.remove(deletedTimer);
+        return this;
+      }
+
+      /**
+       * Returns a new {@link TimerUpdate} with the most recently set completedTimers, setTimers,
+       * and deletedTimers.
+       */
+      public TimerUpdate build() {
+        return new TimerUpdate(
+            key,
+            ImmutableSet.copyOf(completedTimers),
+            ImmutableSet.copyOf(setTimers),
+            ImmutableSet.copyOf(deletedTimers));
+      }
+    }
+
+    private TimerUpdate(
+        StructuralKey<?> key,
+        Iterable<? extends TimerData> completedTimers,
+        Iterable<? extends TimerData> setTimers,
+        Iterable<? extends TimerData> deletedTimers) {
+      this.key = key;
+      this.completedTimers = completedTimers;
+      this.setTimers = setTimers;
+      this.deletedTimers = deletedTimers;
+    }
+
+    @VisibleForTesting
+    StructuralKey<?> getKey() {
+      return key;
+    }
+
+    @VisibleForTesting
+    Iterable<? extends TimerData> getCompletedTimers() {
+      return completedTimers;
+    }
+
+    @VisibleForTesting
+    Iterable<? extends TimerData> getSetTimers() {
+      return setTimers;
+    }
+
+    @VisibleForTesting
+    Iterable<? extends TimerData> getDeletedTimers() {
+      return deletedTimers;
+    }
+
+    /**
+     * Returns a {@link TimerUpdate} that is like this one, but with the specified completed timers.
+     */
+    public TimerUpdate withCompletedTimers(Iterable<TimerData> completedTimers) {
+      return new TimerUpdate(this.key, completedTimers, setTimers, deletedTimers);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(key, completedTimers, setTimers, deletedTimers);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == null || !(other instanceof TimerUpdate)) {
+        return false;
+      }
+      TimerUpdate that = (TimerUpdate) other;
+      return Objects.equals(this.key, that.key)
+          && Objects.equals(this.completedTimers, that.completedTimers)
+          && Objects.equals(this.setTimers, that.setTimers)
+          && Objects.equals(this.deletedTimers, that.deletedTimers);
+    }
+  }
+
+  /**
+   * A pair of {@link TimerData} and key which can be delivered to the appropriate
+   * {@link AppliedPTransform}. A timer fires at the transform that set it with a specific key when
+   * the time domain in which it lives progresses past a specified time, as determined by the
+   * {@link WatermarkManager}.
+   */
+  public static class FiredTimers {
+    private final Map<TimeDomain, ? extends Collection<TimerData>> timers;
+
+    private FiredTimers(Map<TimeDomain, ? extends Collection<TimerData>> timers) {
+      this.timers = timers;
+    }
+
+    /**
+     * Gets all of the timers that have fired within the provided {@link TimeDomain}. If no timers
+     * fired within the provided domain, return an empty collection.
+     *
+     * <p>Timers within a {@link TimeDomain} are guaranteed to be in order of increasing timestamp.
+     */
+    public Collection<TimerData> getTimers(TimeDomain domain) {
+      Collection<TimerData> domainTimers = timers.get(domain);
+      if (domainTimers == null) {
+        return Collections.emptyList();
+      }
+      return domainTimers;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(FiredTimers.class).add("timers", timers).toString();
+    }
+  }
+
+  private static class WindowedValueByTimestampComparator extends Ordering<WindowedValue<?>> {
+    @Override
+    public int compare(WindowedValue<?> o1, WindowedValue<?> o2) {
+      return ComparisonChain.start()
+          .compare(o1.getTimestamp(), o2.getTimestamp())
+          .result();
+    }
+  }
+
+  public Set<AppliedPTransform<?, ?, ?>> getCompletedTransforms() {
+    Set<AppliedPTransform<?, ?, ?>> result = new HashSet<>();
+    for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> wms :
+        transformToWatermarks.entrySet()) {
+      if (wms.getValue().getOutputWatermark().equals(THE_END_OF_TIME.get())) {
+        result.add(wms.getKey());
+      }
+    }
+    return result;
+  }
+
+  @AutoValue
+  abstract static class PendingWatermarkUpdate {
+    @Nullable
+    public abstract CommittedBundle<?> getInputBundle();
+    public abstract TimerUpdate getTimerUpdate();
+    public abstract CommittedResult getResult();
+    public abstract Instant getEarliestHold();
+
+    /**
+     * Gets the {@link AppliedPTransform} that generated this result.
+     */
+    public AppliedPTransform<?, ?, ?> getTransform() {
+      return getResult().getTransform();
+    }
+
+    public static PendingWatermarkUpdate create(
+        CommittedBundle<?> inputBundle,
+        TimerUpdate timerUpdate,
+        CommittedResult result, Instant earliestHold) {
+      return new AutoValue_WatermarkManager_PendingWatermarkUpdate(inputBundle,
+          timerUpdate,
+          result,
+          earliestHold);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 89866cc..b07b58a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -45,7 +45,7 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
   public <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application,
       @Nullable CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext)
+      EvaluationContext evaluationContext)
       throws Exception {
     return createTransformEvaluator(
         (AppliedPTransform) application, inputBundle, evaluationContext);
@@ -54,7 +54,7 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
   private <InputT> TransformEvaluator<InputT> createTransformEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform,
       CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
+      EvaluationContext evaluationContext) {
     WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn();
     UncommittedBundle<InputT> outputBundle =
         evaluationContext.createBundle(inputBundle, transform.getOutput());
@@ -92,22 +92,22 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
     private <W extends BoundedWindow> Collection<? extends BoundedWindow> assignWindows(
         WindowFn<InputT, W> windowFn, WindowedValue<InputT> element) throws Exception {
       WindowFn<InputT, W>.AssignContext assignContext =
-          new InProcessAssignContext<>(windowFn, element);
+          new DirectAssignContext<>(windowFn, element);
       Collection<? extends BoundedWindow> windows = windowFn.assignWindows(assignContext);
       return windows;
     }
 
     @Override
-    public InProcessTransformResult finishBundle() throws Exception {
+    public TransformResult finishBundle() throws Exception {
       return StepTransformResult.withoutHold(transform).addOutput(outputBundle).build();
     }
   }
 
-  private static class InProcessAssignContext<InputT, W extends BoundedWindow>
+  private static class DirectAssignContext<InputT, W extends BoundedWindow>
       extends WindowFn<InputT, W>.AssignContext {
     private final WindowedValue<InputT> value;
 
-    public InProcessAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
+    public DirectAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
       fn.super();
       this.value = value;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index e26f860..9bc4f7b 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -64,7 +64,7 @@ public class BoundedReadEvaluatorFactoryTest {
   private BoundedSource<Long> source;
   private PCollection<Long> longs;
   private TransformEvaluatorFactory factory;
-  @Mock private InProcessEvaluationContext context;
+  @Mock private EvaluationContext context;
   private BundleFactory bundleFactory;
 
   @Before
@@ -75,7 +75,7 @@ public class BoundedReadEvaluatorFactoryTest {
     longs = p.apply(Read.from(source));
 
     factory = new BoundedReadEvaluatorFactory();
-    bundleFactory = InProcessBundleFactory.create();
+    bundleFactory = ImmutableListBundleFactory.create();
   }
 
   @Test
@@ -85,7 +85,7 @@ public class BoundedReadEvaluatorFactoryTest {
 
     TransformEvaluator<?> evaluator =
         factory.forApplication(longs.getProducingTransformInternal(), null, context);
-    InProcessTransformResult result = evaluator.finishBundle();
+    TransformResult result = evaluator.finishBundle();
     assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
     assertThat(
         output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(),
@@ -105,7 +105,7 @@ public class BoundedReadEvaluatorFactoryTest {
 
     TransformEvaluator<?> evaluator =
         factory.forApplication(longs.getProducingTransformInternal(), null, context);
-    InProcessTransformResult result = evaluator.finishBundle();
+    TransformResult result = evaluator.finishBundle();
     assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
     Iterable<? extends WindowedValue<Long>> outputElements =
         output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements();
@@ -138,7 +138,7 @@ public class BoundedReadEvaluatorFactoryTest {
         factory.forApplication(longs.getProducingTransformInternal(), null, context);
     assertThat(secondEvaluator, nullValue());
 
-    InProcessTransformResult result = evaluator.finishBundle();
+    TransformResult result = evaluator.finishBundle();
     assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
     Iterable<? extends WindowedValue<Long>> outputElements =
         output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index 4969a30..1e51b55 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -53,7 +53,7 @@ public class CommittedResultTest implements Serializable {
   private transient AppliedPTransform<?, ?, ?> transform =
       AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>() {
       });
-  private transient BundleFactory bundleFactory = InProcessBundleFactory.create();
+  private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
   @Test
   public void getTransformExtractsFromResult() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4649eebe/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java
index cd44b7e..5c89f1b 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRegistrarTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.direct;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import org.apache.beam.runners.direct.DirectRegistrar.InProcessRunner;
+import org.apache.beam.runners.direct.DirectRegistrar.DirectRunner;
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
 import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
 
@@ -33,42 +33,42 @@ import org.junit.runners.JUnit4;
 
 import java.util.ServiceLoader;
 
-/** Tests for {@link InProcessRunner}. */
+/** Tests for {@link DirectRunner}. */
 @RunWith(JUnit4.class)
 public class DirectRegistrarTest {
   @Test
   public void testCorrectOptionsAreReturned() {
     assertEquals(
         ImmutableList.of(DirectOptions.class),
-        new DirectRegistrar.InProcessOptions().getPipelineOptions());
+        new DirectRegistrar.DirectOptions().getPipelineOptions());
   }
 
   @Test
   public void testCorrectRunnersAreReturned() {
     assertEquals(
-        ImmutableList.of(DirectRunner.class),
-        new DirectRegistrar.InProcessRunner().getPipelineRunners());
+        ImmutableList.of(org.apache.beam.runners.direct.DirectRunner.class),
+        new DirectRunner().getPipelineRunners());
   }
 
   @Test
   public void testServiceLoaderForOptions() {
     for (PipelineOptionsRegistrar registrar :
         Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) {
-      if (registrar instanceof DirectRegistrar.InProcessOptions) {
+      if (registrar instanceof DirectRegistrar.DirectOptions) {
         return;
       }
     }
-    fail("Expected to find " + DirectRegistrar.InProcessOptions.class);
+    fail("Expected to find " + DirectRegistrar.DirectOptions.class);
   }
 
   @Test
   public void testServiceLoaderForRunner() {
     for (PipelineRunnerRegistrar registrar :
         Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) {
-      if (registrar instanceof DirectRegistrar.InProcessRunner) {
+      if (registrar instanceof DirectRunner) {
         return;
       }
     }
-    fail("Expected to find " + DirectRegistrar.InProcessRunner.class);
+    fail("Expected to find " + DirectRunner.class);
   }
 }