You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:55 UTC

[31/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
deleted file mode 100644
index a9a62a6..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
+++ /dev/null
@@ -1,1310 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-import com.google.cloud.dataflow.sdk.util.TimerInternals;
-import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.SortedMultiset;
-import com.google.common.collect.TreeMultiset;
-
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Objects;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-
-/**
- * Manages watermarks of {@link PCollection PCollections} and input and output watermarks of
- * {@link AppliedPTransform AppliedPTransforms} to provide event-time and completion tracking for
- * in-memory execution. {@link InMemoryWatermarkManager} is designed to update and return a
- * consistent view of watermarks in the presence of concurrent updates.
- *
- * <p>An {@link InMemoryWatermarkManager} is provided with the collection of root
- * {@link AppliedPTransform AppliedPTransforms} and a map of {@link PCollection PCollections} to
- * all the {@link AppliedPTransform AppliedPTransforms} that consume them at construction time.
- *
- * <p>Whenever a root {@link AppliedPTransform transform} produces elements, the
- * {@link InMemoryWatermarkManager} is provided with the produced elements and the output watermark
- * of the producing {@link AppliedPTransform transform}. The
- * {@link InMemoryWatermarkManager watermark manager} is responsible for computing the watermarks
- * of all {@link AppliedPTransform transforms} that consume one or more
- * {@link PCollection PCollections}.
- *
- * <p>Whenever a non-root {@link AppliedPTransform} finishes processing one or more in-flight
- * elements (referred to as the input {@link CommittedBundle bundle}), the following occurs
- * atomically:
- * <ul>
- *  <li>All of the in-flight elements are removed from the collection of pending elements for the
- *      {@link AppliedPTransform}.</li>
- *  <li>All of the elements produced by the {@link AppliedPTransform} are added to the collection
- *      of pending elements for each {@link AppliedPTransform} that consumes them.</li>
- *  <li>The input watermark for the {@link AppliedPTransform} becomes the maximum value of
- *    <ul>
- *      <li>the previous input watermark</li>
- *      <li>the minimum of
- *        <ul>
- *          <li>the timestamps of all currently pending elements</li>
- *          <li>all input {@link PCollection} watermarks</li>
- *        </ul>
- *      </li>
- *    </ul>
- *  </li>
- *  <li>The output watermark for the {@link AppliedPTransform} becomes the maximum of
- *    <ul>
- *      <li>the previous output watermark</li>
- *      <li>the minimum of
- *        <ul>
- *          <li>the current input watermark</li>
- *          <li>the current watermark holds</li>
- *        </ul>
- *      </li>
- *    </ul>
- *  </li>
- *  <li>The watermark of the output {@link PCollection} can be advanced to the output watermark of
- *      the {@link AppliedPTransform}</li>
- *  <li>The watermark of all downstream {@link AppliedPTransform AppliedPTransforms} can be
- *      advanced.</li>
- * </ul>
- *
- * <p>The watermark of a {@link PCollection} is equal to the output watermark of the
- * {@link AppliedPTransform} that produces it.
- *
- * <p>The watermarks for a {@link PTransform} are updated as follows when output is committed:<pre>
- * Watermark_In'  = MAX(Watermark_In, MIN(U(TS_Pending), U(Watermark_InputPCollection)))
- * Watermark_Out' = MAX(Watermark_Out, MIN(Watermark_In', U(StateHold)))
- * Watermark_PCollection = Watermark_Out_ProducingPTransform
- * </pre>
- */
-public class InMemoryWatermarkManager {
-  /**
-   * The watermark of some {@link Pipeline} element, usually a {@link PTransform} or a
-   * {@link PCollection}.
-   *
-   * <p>A watermark is a monotonically increasing value, which represents the point up to which the
-   * system believes it has received all of the data. Data that arrives with a timestamp that is
-   * before the watermark is considered late. {@link BoundedWindow#TIMESTAMP_MAX_VALUE} is a special
-   * timestamp which indicates we have received all of the data and there will be no more on-time or
-   * late data. This value is represented by {@link InMemoryWatermarkManager#THE_END_OF_TIME}.
-   */
-  private static interface Watermark {
-    /**
-     * Returns the current value of this watermark.
-     */
-    Instant get();
-
-    /**
-     * Refreshes the value of this watermark from its input watermarks and watermark holds.
-     *
-     * @return true if the value of the watermark has changed (and thus dependent watermark must
-     *         also be updated
-     */
-    WatermarkUpdate refresh();
-  }
-
-  /**
-   * The result of computing a {@link Watermark}.
-   */
-  private static enum WatermarkUpdate {
-    /** The watermark is later than the value at the previous time it was computed. */
-    ADVANCED(true),
-    /** The watermark is equal to the value at the previous time it was computed. */
-    NO_CHANGE(false);
-
-    private final boolean advanced;
-
-    private WatermarkUpdate(boolean advanced) {
-      this.advanced = advanced;
-    }
-
-    public boolean isAdvanced() {
-      return advanced;
-    }
-
-    /**
-     * Returns the {@link WatermarkUpdate} that is a result of combining the two watermark updates.
-     *
-     * If either of the input {@link WatermarkUpdate WatermarkUpdates} were advanced, the result
-     * {@link WatermarkUpdate} has been advanced.
-     */
-    public WatermarkUpdate union(WatermarkUpdate that) {
-      if (this.advanced) {
-        return this;
-      }
-      return that;
-    }
-
-    /**
-     * Returns the {@link WatermarkUpdate} based on the former and current
-     * {@link Instant timestamps}.
-     */
-    public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTime) {
-      if (currentTime.isAfter(oldTime)) {
-        return ADVANCED;
-      }
-      return NO_CHANGE;
-    }
-  }
-
-  /**
-   * The input {@link Watermark} of an {@link AppliedPTransform}.
-   *
-   * <p>At any point, the value of an {@link AppliedPTransformInputWatermark} is equal to the
-   * minimum watermark across all of its input {@link Watermark Watermarks}, and the minimum
-   * timestamp of all of the pending elements, restricted to be monotonically increasing.
-   *
-   * <p>See {@link #refresh()} for more information.
-   */
-  private static class AppliedPTransformInputWatermark implements Watermark {
-    private final Collection<? extends Watermark> inputWatermarks;
-    private final SortedMultiset<WindowedValue<?>> pendingElements;
-    private final Map<Object, NavigableSet<TimerData>> objectTimers;
-
-    private AtomicReference<Instant> currentWatermark;
-
-    public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) {
-      this.inputWatermarks = inputWatermarks;
-      this.pendingElements = TreeMultiset.create(PENDING_ELEMENT_COMPARATOR);
-      this.objectTimers = new HashMap<>();
-      currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
-    }
-
-    @Override
-    public Instant get() {
-      return currentWatermark.get();
-    }
-
-    /**
-     * {@inheritDoc}.
-     *
-     * <p>When refresh is called, the value of the {@link AppliedPTransformInputWatermark} becomes
-     * equal to the maximum value of
-     * <ul>
-     *   <li>the previous input watermark</li>
-     *   <li>the minimum of
-     *     <ul>
-     *       <li>the timestamps of all currently pending elements</li>
-     *       <li>all input {@link PCollection} watermarks</li>
-     *     </ul>
-     *   </li>
-     * </ul>
-     */
-    @Override
-    public synchronized WatermarkUpdate refresh() {
-      Instant oldWatermark = currentWatermark.get();
-      Instant minInputWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
-      for (Watermark inputWatermark : inputWatermarks) {
-        minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, inputWatermark.get());
-      }
-      if (!pendingElements.isEmpty()) {
-        minInputWatermark = INSTANT_ORDERING.min(
-            minInputWatermark, pendingElements.firstEntry().getElement().getTimestamp());
-      }
-      Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark);
-      currentWatermark.set(newWatermark);
-      return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
-    }
-
-    private synchronized void addPendingElements(Iterable<? extends WindowedValue<?>> newPending) {
-      for (WindowedValue<?> pendingElement : newPending) {
-        pendingElements.add(pendingElement);
-      }
-    }
-
-    private synchronized void removePendingElements(
-        Iterable<? extends WindowedValue<?>> finishedElements) {
-      for (WindowedValue<?> finishedElement : finishedElements) {
-        pendingElements.remove(finishedElement);
-      }
-    }
-
-    private synchronized void updateTimers(TimerUpdate update) {
-      NavigableSet<TimerData> keyTimers = objectTimers.get(update.key);
-      if (keyTimers == null) {
-        keyTimers = new TreeSet<>();
-        objectTimers.put(update.key, keyTimers);
-      }
-      for (TimerData timer : update.setTimers) {
-        if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          keyTimers.add(timer);
-        }
-      }
-      for (TimerData timer : update.deletedTimers) {
-        if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          keyTimers.remove(timer);
-        }
-      }
-      // We don't keep references to timers that have been fired and delivered via #getFiredTimers()
-    }
-
-    private synchronized Map<Object, List<TimerData>> extractFiredEventTimeTimers() {
-      return extractFiredTimers(currentWatermark.get(), objectTimers);
-    }
-
-    @Override
-    public synchronized String toString() {
-      return MoreObjects.toStringHelper(AppliedPTransformInputWatermark.class)
-          .add("pendingElements", pendingElements)
-          .add("currentWatermark", currentWatermark)
-          .toString();
-    }
-  }
-
-  /**
-   * The output {@link Watermark} of an {@link AppliedPTransform}.
-   *
-   * <p>The value of an {@link AppliedPTransformOutputWatermark} is equal to the minimum of the
-   * current watermark hold and the {@link AppliedPTransformInputWatermark} for the same
-   * {@link AppliedPTransform}, restricted to be monotonically increasing. See
-   * {@link #refresh()} for more information.
-   */
-  private static class AppliedPTransformOutputWatermark implements Watermark {
-    private final Watermark inputWatermark;
-    private final PerKeyHolds holds;
-    private AtomicReference<Instant> currentWatermark;
-
-    public AppliedPTransformOutputWatermark(AppliedPTransformInputWatermark inputWatermark) {
-      this.inputWatermark = inputWatermark;
-      holds = new PerKeyHolds();
-      currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
-    }
-
-    public synchronized void updateHold(Object key, Instant newHold) {
-      if (newHold == null) {
-        holds.removeHold(key);
-      } else {
-        holds.updateHold(key, newHold);
-      }
-    }
-
-    @Override
-    public Instant get() {
-      return currentWatermark.get();
-    }
-
-    /**
-     * {@inheritDoc}.
-     *
-     * <p>When refresh is called, the value of the {@link AppliedPTransformOutputWatermark} becomes
-     * equal to the maximum value of:
-     * <ul>
-     *   <li>the previous output watermark</li>
-     *   <li>the minimum of
-     *     <ul>
-     *       <li>the current input watermark</li>
-     *       <li>the current watermark holds</li>
-     *     </ul>
-     *   </li>
-     * </ul>
-     */
-    @Override
-    public synchronized WatermarkUpdate refresh() {
-      Instant oldWatermark = currentWatermark.get();
-      Instant newWatermark = INSTANT_ORDERING.min(inputWatermark.get(), holds.getMinHold());
-      newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark);
-      currentWatermark.set(newWatermark);
-      return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
-    }
-
-    @Override
-    public synchronized String toString() {
-      return MoreObjects.toStringHelper(AppliedPTransformOutputWatermark.class)
-          .add("holds", holds)
-          .add("currentWatermark", currentWatermark)
-          .toString();
-    }
-  }
-
-  /**
-   * The input {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an
-   * {@link AppliedPTransform}.
-   *
-   * <p>At any point, the hold value of an {@link SynchronizedProcessingTimeInputWatermark} is equal
-   * to the minimum across all pending bundles at the {@link AppliedPTransform} and all upstream
-   * {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The value of the input
-   * synchronized processing time at any step is equal to the maximum of:
-   * <ul>
-   *   <li>The most recently returned synchronized processing input time
-   *   <li>The minimum of
-   *     <ul>
-   *       <li>The current processing time
-   *       <li>The current synchronized processing time input hold
-   *     </ul>
-   * </ul>
-   */
-  private static class SynchronizedProcessingTimeInputWatermark implements Watermark {
-    private final Collection<? extends Watermark> inputWms;
-    private final Collection<CommittedBundle<?>> pendingBundles;
-    private final Map<Object, NavigableSet<TimerData>> processingTimers;
-    private final Map<Object, NavigableSet<TimerData>> synchronizedProcessingTimers;
-
-    private final PriorityQueue<TimerData> pendingTimers;
-
-    private AtomicReference<Instant> earliestHold;
-
-    public SynchronizedProcessingTimeInputWatermark(Collection<? extends Watermark> inputWms) {
-      this.inputWms = inputWms;
-      this.pendingBundles = new HashSet<>();
-      this.processingTimers = new HashMap<>();
-      this.synchronizedProcessingTimers = new HashMap<>();
-      this.pendingTimers = new PriorityQueue<>();
-      Instant initialHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
-      for (Watermark wm : inputWms) {
-        initialHold = INSTANT_ORDERING.min(initialHold, wm.get());
-      }
-      earliestHold = new AtomicReference<>(initialHold);
-    }
-
-    @Override
-    public Instant get() {
-      return earliestHold.get();
-    }
-
-    /**
-     * {@inheritDoc}.
-     *
-     * <p>When refresh is called, the value of the {@link SynchronizedProcessingTimeInputWatermark}
-     * becomes equal to the minimum value of
-     * <ul>
-     *   <li>the timestamps of all currently pending bundles</li>
-     *   <li>all input {@link PCollection} synchronized processing time watermarks</li>
-     * </ul>
-     *
-     * <p>Note that this value is not monotonic, but the returned value for the synchronized
-     * processing time must be.
-     */
-    @Override
-    public synchronized WatermarkUpdate refresh() {
-      Instant oldHold = earliestHold.get();
-      Instant minTime = THE_END_OF_TIME.get();
-      for (Watermark input : inputWms) {
-        minTime = INSTANT_ORDERING.min(minTime, input.get());
-      }
-      for (CommittedBundle<?> bundle : pendingBundles) {
-        // TODO: Track elements in the bundle by the processing time they were output instead of
-        // entire bundles. Requried to support arbitrarily splitting and merging bundles between
-        // steps
-        minTime = INSTANT_ORDERING.min(minTime, bundle.getSynchronizedProcessingOutputWatermark());
-      }
-      earliestHold.set(minTime);
-      return WatermarkUpdate.fromTimestamps(oldHold, minTime);
-    }
-
-    public synchronized void addPending(CommittedBundle<?> bundle) {
-      pendingBundles.add(bundle);
-    }
-
-    public synchronized void removePending(CommittedBundle<?> bundle) {
-      pendingBundles.remove(bundle);
-    }
-
-    /**
-     * Return the earliest timestamp of the earliest timer that has not been completed. This is
-     * either the earliest timestamp across timers that have not been completed, or the earliest
-     * timestamp across timers that have been delivered but have not been completed.
-     */
-    public synchronized Instant getEarliestTimerTimestamp() {
-      Instant earliest = THE_END_OF_TIME.get();
-      for (NavigableSet<TimerData> timers : processingTimers.values()) {
-        if (!timers.isEmpty()) {
-          earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest);
-        }
-      }
-      for (NavigableSet<TimerData> timers : synchronizedProcessingTimers.values()) {
-        if (!timers.isEmpty()) {
-          earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest);
-        }
-      }
-      if (!pendingTimers.isEmpty()) {
-        earliest = INSTANT_ORDERING.min(pendingTimers.peek().getTimestamp(), earliest);
-      }
-      return earliest;
-    }
-
-    private synchronized void updateTimers(TimerUpdate update) {
-      for (TimerData completedTimer : update.completedTimers) {
-        pendingTimers.remove(completedTimer);
-      }
-      Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key);
-      for (TimerData addedTimer : update.setTimers) {
-        NavigableSet<TimerData> timerQueue = timerMap.get(addedTimer.getDomain());
-        if (timerQueue != null) {
-          timerQueue.add(addedTimer);
-        }
-      }
-      for (TimerData deletedTimer : update.deletedTimers) {
-        NavigableSet<TimerData> timerQueue = timerMap.get(deletedTimer.getDomain());
-        if (timerQueue != null) {
-          timerQueue.remove(deletedTimer);
-        }
-      }
-    }
-
-    private synchronized Map<Object, List<TimerData>> extractFiredDomainTimers(
-        TimeDomain domain, Instant firingTime) {
-      Map<Object, List<TimerData>> firedTimers;
-      switch (domain) {
-        case PROCESSING_TIME:
-          firedTimers = extractFiredTimers(firingTime, processingTimers);
-          break;
-        case SYNCHRONIZED_PROCESSING_TIME:
-          firedTimers =
-              extractFiredTimers(
-                  INSTANT_ORDERING.min(firingTime, earliestHold.get()),
-                  synchronizedProcessingTimers);
-          break;
-        default:
-          throw new IllegalArgumentException(
-              "Called getFiredTimers on a Synchronized Processing Time watermark"
-                  + " and gave a non-processing time domain "
-                  + domain);
-      }
-      for (Map.Entry<Object, ? extends Collection<TimerData>> firedTimer : firedTimers.entrySet()) {
-        pendingTimers.addAll(firedTimer.getValue());
-      }
-      return firedTimers;
-    }
-
-    private Map<TimeDomain, NavigableSet<TimerData>> timerMap(Object key) {
-      NavigableSet<TimerData> processingQueue = processingTimers.get(key);
-      if (processingQueue == null) {
-        processingQueue = new TreeSet<>();
-        processingTimers.put(key, processingQueue);
-      }
-      NavigableSet<TimerData> synchronizedProcessingQueue =
-          synchronizedProcessingTimers.get(key);
-      if (synchronizedProcessingQueue == null) {
-        synchronizedProcessingQueue = new TreeSet<>();
-        synchronizedProcessingTimers.put(key, synchronizedProcessingQueue);
-      }
-      EnumMap<TimeDomain, NavigableSet<TimerData>> result = new EnumMap<>(TimeDomain.class);
-      result.put(TimeDomain.PROCESSING_TIME, processingQueue);
-      result.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, synchronizedProcessingQueue);
-      return result;
-    }
-
-    @Override
-    public synchronized String toString() {
-      return MoreObjects.toStringHelper(SynchronizedProcessingTimeInputWatermark.class)
-          .add("earliestHold", earliestHold)
-          .toString();
-    }
-  }
-
-  /**
-   * The output {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an
-   * {@link AppliedPTransform}.
-   *
-   * <p>At any point, the hold value of an {@link SynchronizedProcessingTimeOutputWatermark} is
-   * equal to the minimum across all incomplete timers at the {@link AppliedPTransform} and all
-   * upstream {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The value of the output
-   * synchronized processing time at any step is equal to the maximum of:
-   * <ul>
-   *   <li>The most recently returned synchronized processing output time
-   *   <li>The minimum of
-   *     <ul>
-   *       <li>The current processing time
-   *       <li>The current synchronized processing time output hold
-   *     </ul>
-   * </ul>
-   */
-  private static class SynchronizedProcessingTimeOutputWatermark implements Watermark {
-    private final SynchronizedProcessingTimeInputWatermark inputWm;
-    private AtomicReference<Instant> latestRefresh;
-
-    public SynchronizedProcessingTimeOutputWatermark(
-        SynchronizedProcessingTimeInputWatermark inputWm) {
-      this.inputWm = inputWm;
-      this.latestRefresh = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
-    }
-
-    @Override
-    public Instant get() {
-      return latestRefresh.get();
-    }
-
-    /**
-     * {@inheritDoc}.
-     *
-     * <p>When refresh is called, the value of the {@link SynchronizedProcessingTimeOutputWatermark}
-     * becomes equal to the minimum value of:
-     * <ul>
-     *   <li>the current input watermark.
-     *   <li>all {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} timers that are based on the input
-     *       watermark.
-     *   <li>all {@link TimeDomain#PROCESSING_TIME} timers that are based on the input watermark.
-     * </ul>
-     *
-     * <p>Note that this value is not monotonic, but the returned value for the synchronized
-     * processing time must be.
-     */
-    @Override
-    public synchronized WatermarkUpdate refresh() {
-      // Hold the output synchronized processing time to the input watermark, which takes into
-      // account buffered bundles, and the earliest pending timer, which determines what to hold
-      // downstream timers to.
-      Instant oldRefresh = latestRefresh.get();
-      Instant newTimestamp =
-          INSTANT_ORDERING.min(inputWm.get(), inputWm.getEarliestTimerTimestamp());
-      latestRefresh.set(newTimestamp);
-      return WatermarkUpdate.fromTimestamps(oldRefresh, newTimestamp);
-    }
-
-    @Override
-    public synchronized String toString() {
-      return MoreObjects.toStringHelper(SynchronizedProcessingTimeOutputWatermark.class)
-          .add("latestRefresh", latestRefresh)
-          .toString();
-    }
-  }
-
-  /**
-   * The {@code Watermark} that is after the latest time it is possible to represent in the global
-   * window. This is a distinguished value representing a complete {@link PTransform}.
-   */
-  private static final Watermark THE_END_OF_TIME = new Watermark() {
-        @Override
-        public WatermarkUpdate refresh() {
-          // THE_END_OF_TIME is a distinguished value that cannot be advanced.
-          return WatermarkUpdate.NO_CHANGE;
-        }
-
-        @Override
-        public Instant get() {
-          return BoundedWindow.TIMESTAMP_MAX_VALUE;
-        }
-      };
-
-  private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural();
-
-  /**
-   * An ordering that compares windowed values by timestamp, then arbitrarily. This ensures that
-   * {@link WindowedValue WindowedValues} will be sorted by timestamp, while two different
-   * {@link WindowedValue WindowedValues} with the same timestamp are not considered equal.
-   */
-  private static final Ordering<WindowedValue<? extends Object>> PENDING_ELEMENT_COMPARATOR =
-      (new WindowedValueByTimestampComparator()).compound(Ordering.arbitrary());
-
-  /**
-   * For each (Object, PriorityQueue) pair in the provided map, remove each Timer that is before the
-   * latestTime argument and put in in the result with the same key, then remove all of the keys
-   * which have no more pending timers.
-   *
-   * The result collection retains ordering of timers (from earliest to latest).
-   */
-  private static Map<Object, List<TimerData>> extractFiredTimers(
-      Instant latestTime, Map<Object, NavigableSet<TimerData>> objectTimers) {
-    Map<Object, List<TimerData>> result = new HashMap<>();
-    Set<Object> emptyKeys = new HashSet<>();
-    for (Map.Entry<Object, NavigableSet<TimerData>> pendingTimers : objectTimers.entrySet()) {
-      NavigableSet<TimerData> timers = pendingTimers.getValue();
-      if (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) {
-        ArrayList<TimerData> keyFiredTimers = new ArrayList<>();
-        result.put(pendingTimers.getKey(), keyFiredTimers);
-        while (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) {
-          keyFiredTimers.add(timers.first());
-          timers.remove(timers.first());
-        }
-      }
-      if (timers.isEmpty()) {
-        emptyKeys.add(pendingTimers.getKey());
-      }
-    }
-    objectTimers.keySet().removeAll(emptyKeys);
-    return result;
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * The {@link Clock} providing the current time in the {@link TimeDomain#PROCESSING_TIME} domain.
-   */
-  private final Clock clock;
-
-  /**
-   * A map from each {@link PCollection} to all {@link AppliedPTransform PTransform applications}
-   * that consume that {@link PCollection}.
-   */
-  private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers;
-
-  /**
-   * The input and output watermark of each {@link AppliedPTransform}.
-   */
-  private final Map<AppliedPTransform<?, ?, ?>, TransformWatermarks> transformToWatermarks;
-
-  /**
-   * Creates a new {@link InMemoryWatermarkManager}. All watermarks within the newly created
-   * {@link InMemoryWatermarkManager} start at {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, the
-   * minimum watermark, with no watermark holds or pending elements.
-   *
-   * @param rootTransforms the root-level transforms of the {@link Pipeline}
-   * @param consumers a mapping between each {@link PCollection} in the {@link Pipeline} to the
-   *                  transforms that consume it as a part of their input
-   */
-  public static InMemoryWatermarkManager create(
-      Clock clock,
-      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) {
-    return new InMemoryWatermarkManager(clock, rootTransforms, consumers);
-  }
-
-  private InMemoryWatermarkManager(
-      Clock clock,
-      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) {
-    this.clock = clock;
-    this.consumers = consumers;
-
-    transformToWatermarks = new HashMap<>();
-
-    for (AppliedPTransform<?, ?, ?> rootTransform : rootTransforms) {
-      getTransformWatermark(rootTransform);
-    }
-    for (Collection<AppliedPTransform<?, ?, ?>> intermediateTransforms : consumers.values()) {
-      for (AppliedPTransform<?, ?, ?> transform : intermediateTransforms) {
-        getTransformWatermark(transform);
-      }
-    }
-  }
-
-  private TransformWatermarks getTransformWatermark(AppliedPTransform<?, ?, ?> transform) {
-    TransformWatermarks wms = transformToWatermarks.get(transform);
-    if (wms == null) {
-      List<Watermark> inputCollectionWatermarks = getInputWatermarks(transform);
-      AppliedPTransformInputWatermark inputWatermark =
-          new AppliedPTransformInputWatermark(inputCollectionWatermarks);
-      AppliedPTransformOutputWatermark outputWatermark =
-          new AppliedPTransformOutputWatermark(inputWatermark);
-
-      SynchronizedProcessingTimeInputWatermark inputProcessingWatermark =
-          new SynchronizedProcessingTimeInputWatermark(getInputProcessingWatermarks(transform));
-      SynchronizedProcessingTimeOutputWatermark outputProcessingWatermark =
-          new SynchronizedProcessingTimeOutputWatermark(inputProcessingWatermark);
-
-      wms =
-          new TransformWatermarks(
-              inputWatermark, outputWatermark, inputProcessingWatermark, outputProcessingWatermark);
-      transformToWatermarks.put(transform, wms);
-    }
-    return wms;
-  }
-
-  private Collection<Watermark> getInputProcessingWatermarks(
-      AppliedPTransform<?, ?, ?> transform) {
-    ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
-    Collection<? extends PValue> inputs = transform.getInput().expand();
-    if (inputs.isEmpty()) {
-      inputWmsBuilder.add(THE_END_OF_TIME);
-    }
-    for (PValue pvalue : inputs) {
-      Watermark producerOutputWatermark =
-          getTransformWatermark(pvalue.getProducingTransformInternal())
-              .synchronizedProcessingOutputWatermark;
-      inputWmsBuilder.add(producerOutputWatermark);
-    }
-    return inputWmsBuilder.build();
-  }
-
-  private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> transform) {
-    ImmutableList.Builder<Watermark> inputWatermarksBuilder = ImmutableList.builder();
-    Collection<? extends PValue> inputs = transform.getInput().expand();
-    if (inputs.isEmpty()) {
-      inputWatermarksBuilder.add(THE_END_OF_TIME);
-    }
-    for (PValue pvalue : inputs) {
-      Watermark producerOutputWatermark =
-          getTransformWatermark(pvalue.getProducingTransformInternal()).outputWatermark;
-      inputWatermarksBuilder.add(producerOutputWatermark);
-    }
-    List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build();
-    return inputCollectionWatermarks;
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Gets the input and output watermarks for an {@link AppliedPTransform}. If the
-   * {@link AppliedPTransform PTransform} has not processed any elements, return a watermark of
-   * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}.
-   *
-   * @return a snapshot of the input watermark and output watermark for the provided transform
-   */
-  public TransformWatermarks getWatermarks(AppliedPTransform<?, ?, ?> transform) {
-    return transformToWatermarks.get(transform);
-  }
-
-  /**
-   * Updates the watermarks of a transform with one or more inputs.
-   *
-   * <p>Each transform has two monotonically increasing watermarks: the input watermark, which can,
-   * at any time, be updated to equal:
-   * <pre>
-   * MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks))
-   * </pre>
-   * and the output watermark, which can, at any time, be updated to equal:
-   * <pre>
-   * MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))
-   * </pre>.
-   *
-   * @param completed the input that has completed
-   * @param transform the transform that has completed processing the input
-   * @param outputs the bundles the transform has output
-   * @param earliestHold the earliest watermark hold in the transform's state. {@code null} if there
-   *                     is no hold
-   */
-  public void updateWatermarks(
-      @Nullable CommittedBundle<?> completed,
-      AppliedPTransform<?, ?, ?> transform,
-      TimerUpdate timerUpdate,
-      Iterable<? extends CommittedBundle<?>> outputs,
-      @Nullable Instant earliestHold) {
-    updatePending(completed, transform, timerUpdate, outputs);
-    TransformWatermarks transformWms = transformToWatermarks.get(transform);
-    transformWms.setEventTimeHold(completed == null ? null : completed.getKey(), earliestHold);
-    refreshWatermarks(transform);
-  }
-
-  private void refreshWatermarks(AppliedPTransform<?, ?, ?> transform) {
-    TransformWatermarks myWatermarks = transformToWatermarks.get(transform);
-    WatermarkUpdate updateResult = myWatermarks.refresh();
-    if (updateResult.isAdvanced()) {
-      for (PValue outputPValue : transform.getOutput().expand()) {
-        Collection<AppliedPTransform<?, ?, ?>> downstreamTransforms = consumers.get(outputPValue);
-        if (downstreamTransforms != null) {
-          for (AppliedPTransform<?, ?, ?> downstreamTransform : downstreamTransforms) {
-            refreshWatermarks(downstreamTransform);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Removes all of the completed Timers from the collection of pending timers, adds all new timers,
-   * and removes all deleted timers. Removes all elements consumed by the input bundle from the
-   * {@link PTransform PTransforms} collection of pending elements, and adds all elements produced
-   * by the {@link PTransform} to the pending queue of each consumer.
-   */
-  private void updatePending(
-      CommittedBundle<?> input,
-      AppliedPTransform<?, ?, ?> transform,
-      TimerUpdate timerUpdate,
-      Iterable<? extends CommittedBundle<?>> outputs) {
-    TransformWatermarks completedTransform = transformToWatermarks.get(transform);
-    completedTransform.updateTimers(timerUpdate);
-    if (input != null) {
-      completedTransform.removePending(input);
-    }
-
-    for (CommittedBundle<?> bundle : outputs) {
-      for (AppliedPTransform<?, ?, ?> consumer : consumers.get(bundle.getPCollection())) {
-        TransformWatermarks watermarks = transformToWatermarks.get(consumer);
-        watermarks.addPending(bundle);
-      }
-    }
-  }
-
-  /**
-   * Returns a map of each {@link PTransform} that has pending timers to those timers. All of the
-   * pending timers will be removed from this {@link InMemoryWatermarkManager}.
-   */
-  public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() {
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> allTimers = new HashMap<>();
-    for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry :
-        transformToWatermarks.entrySet()) {
-      Map<Object, FiredTimers> keyFiredTimers = watermarksEntry.getValue().extractFiredTimers();
-      if (!keyFiredTimers.isEmpty()) {
-        allTimers.put(watermarksEntry.getKey(), keyFiredTimers);
-      }
-    }
-    return allTimers;
-  }
-
-  /**
-   * A (key, Instant) pair that holds the watermark. Holds are per-key, but the watermark is global,
-   * and as such the watermark manager must track holds and the release of holds on a per-key basis.
-   *
-   * <p>The {@link #compareTo(KeyedHold)} method of {@link KeyedHold} is not consistent with equals,
-   * as the key is arbitrarily ordered via identity, rather than object equality.
-   */
-  private static final class KeyedHold implements Comparable<KeyedHold> {
-    private static final Ordering<Object> KEY_ORDERING = Ordering.arbitrary().nullsLast();
-
-    private final Object key;
-    private final Instant timestamp;
-
-    /**
-     * Create a new KeyedHold with the specified key and timestamp.
-     */
-    public static KeyedHold of(Object key, Instant timestamp) {
-      return new KeyedHold(key, MoreObjects.firstNonNull(timestamp, THE_END_OF_TIME.get()));
-    }
-
-    private KeyedHold(Object key, Instant timestamp) {
-      this.key = key;
-      this.timestamp = timestamp;
-    }
-
-    @Override
-    public int compareTo(KeyedHold that) {
-      return ComparisonChain.start()
-          .compare(this.timestamp, that.timestamp)
-          .compare(this.key, that.key, KEY_ORDERING)
-          .result();
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(timestamp, key);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof KeyedHold)) {
-        return false;
-      }
-      KeyedHold that = (KeyedHold) other;
-      return Objects.equals(this.timestamp, that.timestamp) && Objects.equals(this.key, that.key);
-    }
-
-    /**
-     * Get the value of this {@link KeyedHold}.
-     */
-    public Instant getTimestamp() {
-      return timestamp;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(KeyedHold.class)
-          .add("key", key)
-          .add("hold", timestamp)
-          .toString();
-    }
-  }
-
-  private static class PerKeyHolds {
-    private final Map<Object, KeyedHold> keyedHolds;
-    private final PriorityQueue<KeyedHold> allHolds;
-
-    private PerKeyHolds() {
-      this.keyedHolds = new HashMap<>();
-      this.allHolds = new PriorityQueue<>();
-    }
-
-    /**
-     * Gets the minimum hold across all keys in this {@link PerKeyHolds}, or THE_END_OF_TIME if
-     * there are no holds within this {@link PerKeyHolds}.
-     */
-    public Instant getMinHold() {
-      return allHolds.isEmpty() ? THE_END_OF_TIME.get() : allHolds.peek().getTimestamp();
-    }
-
-    /**
-     * Updates the hold of the provided key to the provided value, removing any other holds for
-     * the same key.
-     */
-    public void updateHold(@Nullable Object key, Instant newHold) {
-      removeHold(key);
-      KeyedHold newKeyedHold = KeyedHold.of(key, newHold);
-      keyedHolds.put(key, newKeyedHold);
-      allHolds.offer(newKeyedHold);
-    }
-
-    /**
-     * Removes the hold of the provided key.
-     */
-    public void removeHold(Object key) {
-      KeyedHold oldHold = keyedHolds.get(key);
-      if (oldHold != null) {
-        allHolds.remove(oldHold);
-      }
-    }
-  }
-
-  /**
-   * A reference to the input and output watermarks of an {@link AppliedPTransform}.
-   */
-  public class TransformWatermarks {
-    private final AppliedPTransformInputWatermark inputWatermark;
-    private final AppliedPTransformOutputWatermark outputWatermark;
-
-    private final SynchronizedProcessingTimeInputWatermark synchronizedProcessingInputWatermark;
-    private final SynchronizedProcessingTimeOutputWatermark synchronizedProcessingOutputWatermark;
-
-    private Instant latestSynchronizedInputWm;
-    private Instant latestSynchronizedOutputWm;
-
-    private TransformWatermarks(
-        AppliedPTransformInputWatermark inputWatermark,
-        AppliedPTransformOutputWatermark outputWatermark,
-        SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark,
-        SynchronizedProcessingTimeOutputWatermark outputSynchProcessingWatermark) {
-      this.inputWatermark = inputWatermark;
-      this.outputWatermark = outputWatermark;
-
-      this.synchronizedProcessingInputWatermark = inputSynchProcessingWatermark;
-      this.synchronizedProcessingOutputWatermark = outputSynchProcessingWatermark;
-      this.latestSynchronizedInputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      this.latestSynchronizedOutputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
-    }
-
-    /**
-     * Returns the input watermark of the {@link AppliedPTransform}.
-     */
-    public Instant getInputWatermark() {
-      return inputWatermark.get();
-    }
-
-    /**
-     * Returns the output watermark of the {@link AppliedPTransform}.
-     */
-    public Instant getOutputWatermark() {
-      return outputWatermark.get();
-    }
-
-    /**
-     * Returns the synchronized processing input time of the {@link AppliedPTransform}.
-     *
-     * <p>The returned value is guaranteed to be monotonically increasing, and outside of the
-     * presence of holds, will increase as the system time progresses.
-     */
-    public synchronized Instant getSynchronizedProcessingInputTime() {
-      latestSynchronizedInputWm = INSTANT_ORDERING.max(
-          latestSynchronizedInputWm,
-          INSTANT_ORDERING.min(clock.now(), synchronizedProcessingInputWatermark.get()));
-      return latestSynchronizedInputWm;
-    }
-
-    /**
-     * Returns the synchronized processing output time of the {@link AppliedPTransform}.
-     *
-     * <p>The returned value is guaranteed to be monotonically increasing, and outside of the
-     * presence of holds, will increase as the system time progresses.
-     */
-    public synchronized Instant getSynchronizedProcessingOutputTime() {
-      latestSynchronizedOutputWm = INSTANT_ORDERING.max(
-          latestSynchronizedOutputWm,
-          INSTANT_ORDERING.min(clock.now(), synchronizedProcessingOutputWatermark.get()));
-      return latestSynchronizedOutputWm;
-    }
-
-    private WatermarkUpdate refresh() {
-      inputWatermark.refresh();
-      synchronizedProcessingInputWatermark.refresh();
-      WatermarkUpdate eventOutputUpdate = outputWatermark.refresh();
-      WatermarkUpdate syncOutputUpdate = synchronizedProcessingOutputWatermark.refresh();
-      return eventOutputUpdate.union(syncOutputUpdate);
-    }
-
-    private void setEventTimeHold(Object key, Instant newHold) {
-      outputWatermark.updateHold(key, newHold);
-    }
-
-    private void removePending(CommittedBundle<?> bundle) {
-      inputWatermark.removePendingElements(bundle.getElements());
-      synchronizedProcessingInputWatermark.removePending(bundle);
-    }
-
-    private void addPending(CommittedBundle<?> bundle) {
-      inputWatermark.addPendingElements(bundle.getElements());
-      synchronizedProcessingInputWatermark.addPending(bundle);
-    }
-
-    private Map<Object, FiredTimers> extractFiredTimers() {
-      Map<Object, List<TimerData>> eventTimeTimers = inputWatermark.extractFiredEventTimeTimers();
-      Map<Object, List<TimerData>> processingTimers;
-      Map<Object, List<TimerData>> synchronizedTimers;
-      if (inputWatermark.get().equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
-        processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
-            TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE);
-        synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
-            TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE);
-      } else {
-        processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
-            TimeDomain.PROCESSING_TIME, clock.now());
-        synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
-            TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime());
-      }
-      Map<Object, Map<TimeDomain, List<TimerData>>> groupedTimers = new HashMap<>();
-      groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, synchronizedTimers);
-
-      Map<Object, FiredTimers> keyFiredTimers = new HashMap<>();
-      for (Map.Entry<Object, Map<TimeDomain, List<TimerData>>> firedTimers :
-          groupedTimers.entrySet()) {
-        keyFiredTimers.put(firedTimers.getKey(), new FiredTimers(firedTimers.getValue()));
-      }
-      return keyFiredTimers;
-    }
-
-    @SafeVarargs
-    private final void groupFiredTimers(
-        Map<Object, Map<TimeDomain, List<TimerData>>> groupedToMutate,
-        Map<Object, List<TimerData>>... timersToGroup) {
-      for (Map<Object, List<TimerData>> subGroup : timersToGroup) {
-        for (Map.Entry<Object, List<TimerData>> newTimers : subGroup.entrySet()) {
-          Map<TimeDomain, List<TimerData>> grouped = groupedToMutate.get(newTimers.getKey());
-          if (grouped == null) {
-            grouped = new HashMap<>();
-            groupedToMutate.put(newTimers.getKey(), grouped);
-          }
-          grouped.put(newTimers.getValue().get(0).getDomain(), newTimers.getValue());
-        }
-      }
-    }
-
-    private void updateTimers(TimerUpdate update) {
-      inputWatermark.updateTimers(update);
-      synchronizedProcessingInputWatermark.updateTimers(update);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(TransformWatermarks.class)
-          .add("inputWatermark", inputWatermark)
-          .add("outputWatermark", outputWatermark)
-          .add("inputProcessingTime", synchronizedProcessingInputWatermark)
-          .add("outputProcessingTime", synchronizedProcessingOutputWatermark)
-          .toString();
-    }
-  }
-
-  /**
-   * A collection of newly set, deleted, and completed timers.
-   *
-   * <p>setTimers and deletedTimers are collections of {@link TimerData} that have been added to the
-   * {@link TimerInternals} of an executed step. completedTimers are timers that were delivered as
-   * the input to the executed step.
-   */
-  public static class TimerUpdate {
-    private final Object key;
-    private final Iterable<? extends TimerData> completedTimers;
-
-    private final Iterable<? extends TimerData> setTimers;
-    private final Iterable<? extends TimerData> deletedTimers;
-
-    /**
-     * Returns a TimerUpdate for a null key with no timers.
-     */
-    public static TimerUpdate empty() {
-      return new TimerUpdate(
-          null,
-          Collections.<TimerData>emptyList(),
-          Collections.<TimerData>emptyList(),
-          Collections.<TimerData>emptyList());
-    }
-
-    /**
-     * Creates a new {@link TimerUpdate} builder with the provided completed timers that needs the
-     * set and deleted timers to be added to it.
-     */
-    public static TimerUpdateBuilder builder(Object key) {
-      return new TimerUpdateBuilder(key);
-    }
-
-    /**
-     * A {@link TimerUpdate} builder that needs to be provided with set timers and deleted timers.
-     */
-    public static final class TimerUpdateBuilder {
-      private final Object key;
-      private final Collection<TimerData> completedTimers;
-      private final Collection<TimerData> setTimers;
-      private final Collection<TimerData> deletedTimers;
-
-      private TimerUpdateBuilder(Object key) {
-        this.key = key;
-        this.completedTimers = new HashSet<>();
-        this.setTimers = new HashSet<>();
-        this.deletedTimers = new HashSet<>();
-      }
-
-      /**
-       * Adds all of the provided timers to the collection of completed timers, and returns this
-       * {@link TimerUpdateBuilder}.
-       */
-      public TimerUpdateBuilder withCompletedTimers(Iterable<TimerData> completedTimers) {
-        Iterables.addAll(this.completedTimers, completedTimers);
-        return this;
-      }
-
-      /**
-       * Adds the provided timer to the collection of set timers, removing it from deleted timers if
-       * it has previously been deleted. Returns this {@link TimerUpdateBuilder}.
-       */
-      public TimerUpdateBuilder setTimer(TimerData setTimer) {
-        deletedTimers.remove(setTimer);
-        setTimers.add(setTimer);
-        return this;
-      }
-
-      /**
-       * Adds the provided timer to the collection of deleted timers, removing it from set timers if
-       * it has previously been set. Returns this {@link TimerUpdateBuilder}.
-       */
-      public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) {
-        deletedTimers.add(deletedTimer);
-        setTimers.remove(deletedTimer);
-        return this;
-      }
-
-      /**
-       * Returns a new {@link TimerUpdate} with the most recently set completedTimers, setTimers,
-       * and deletedTimers.
-       */
-      public TimerUpdate build() {
-        return new TimerUpdate(
-            key,
-            ImmutableSet.copyOf(completedTimers),
-            ImmutableSet.copyOf(setTimers),
-            ImmutableSet.copyOf(deletedTimers));
-      }
-    }
-
-    private TimerUpdate(
-        Object key,
-        Iterable<? extends TimerData> completedTimers,
-        Iterable<? extends TimerData> setTimers,
-        Iterable<? extends TimerData> deletedTimers) {
-      this.key = key;
-      this.completedTimers = completedTimers;
-      this.setTimers = setTimers;
-      this.deletedTimers = deletedTimers;
-    }
-
-    @VisibleForTesting
-    Object getKey() {
-      return key;
-    }
-
-    @VisibleForTesting
-    Iterable<? extends TimerData> getCompletedTimers() {
-      return completedTimers;
-    }
-
-    @VisibleForTesting
-    Iterable<? extends TimerData> getSetTimers() {
-      return setTimers;
-    }
-
-    @VisibleForTesting
-    Iterable<? extends TimerData> getDeletedTimers() {
-      return deletedTimers;
-    }
-
-    /**
-     * Returns a {@link TimerUpdate} that is like this one, but with the specified completed timers.
-     */
-    public TimerUpdate withCompletedTimers(Iterable<TimerData> completedTimers) {
-      return new TimerUpdate(this.key, completedTimers, setTimers, deletedTimers);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(key, completedTimers, setTimers, deletedTimers);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof TimerUpdate)) {
-        return false;
-      }
-      TimerUpdate that = (TimerUpdate) other;
-      return Objects.equals(this.key, that.key)
-          && Objects.equals(this.completedTimers, that.completedTimers)
-          && Objects.equals(this.setTimers, that.setTimers)
-          && Objects.equals(this.deletedTimers, that.deletedTimers);
-    }
-  }
-
-  /**
-   * A pair of {@link TimerData} and key which can be delivered to the appropriate
-   * {@link AppliedPTransform}. A timer fires at the transform that set it with a specific key when
-   * the time domain in which it lives progresses past a specified time, as determined by the
-   * {@link InMemoryWatermarkManager}.
-   */
-  public static class FiredTimers {
-    private final Map<TimeDomain, ? extends Collection<TimerData>> timers;
-
-    private FiredTimers(Map<TimeDomain, ? extends Collection<TimerData>> timers) {
-      this.timers = timers;
-    }
-
-    /**
-     * Gets all of the timers that have fired within the provided {@link TimeDomain}. If no timers
-     * fired within the provided domain, return an empty collection.
-     *
-     * <p>Timers within a {@link TimeDomain} are guaranteed to be in order of increasing timestamp.
-     */
-    public Collection<TimerData> getTimers(TimeDomain domain) {
-      Collection<TimerData> domainTimers = timers.get(domain);
-      if (domainTimers == null) {
-        return Collections.emptyList();
-      }
-      return domainTimers;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(FiredTimers.class).add("timers", timers).toString();
-    }
-  }
-
-  private static class WindowedValueByTimestampComparator extends Ordering<WindowedValue<?>> {
-    @Override
-    public int compare(WindowedValue<?> o1, WindowedValue<?> o2) {
-      return o1.getTimestamp().compareTo(o2.getTimestamp());
-    }
-  }
-
-  public Set<AppliedPTransform<?, ?, ?>> getCompletedTransforms() {
-    Set<AppliedPTransform<?, ?, ?>> result = new HashSet<>();
-    for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> wms :
-        transformToWatermarks.entrySet()) {
-      if (wms.getValue().getOutputWatermark().equals(THE_END_OF_TIME.get())) {
-        result.add(wms.getKey());
-      }
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java
deleted file mode 100644
index 112ba17..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link UncommittedBundle} that buffers elements in memory.
- */
-public final class InProcessBundle<T> implements UncommittedBundle<T> {
-  private final PCollection<T> pcollection;
-  private final boolean keyed;
-  private final Object key;
-  private boolean committed = false;
-  private ImmutableList.Builder<WindowedValue<T>> elements;
-
-  /**
-   * Create a new {@link InProcessBundle} for the specified {@link PCollection} without a key.
-   */
-  public static <T> InProcessBundle<T> unkeyed(PCollection<T> pcollection) {
-    return new InProcessBundle<T>(pcollection, false, null);
-  }
-
-  /**
-   * Create a new {@link InProcessBundle} for the specified {@link PCollection} with the specified
-   * key.
-   *
-   * See {@link CommittedBundle#getKey()} and {@link CommittedBundle#isKeyed()} for more
-   * information.
-   */
-  public static <T> InProcessBundle<T> keyed(PCollection<T> pcollection, Object key) {
-    return new InProcessBundle<T>(pcollection, true, key);
-  }
-
-  private InProcessBundle(PCollection<T> pcollection, boolean keyed, Object key) {
-    this.pcollection = pcollection;
-    this.keyed = keyed;
-    this.key = key;
-    this.elements = ImmutableList.builder();
-  }
-
-  @Override
-  public PCollection<T> getPCollection() {
-    return pcollection;
-  }
-
-  @Override
-  public InProcessBundle<T> add(WindowedValue<T> element) {
-    checkState(!committed, "Can't add element %s to committed bundle %s", element, this);
-    elements.add(element);
-    return this;
-  }
-
-  @Override
-  public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) {
-    checkState(!committed, "Can't commit already committed bundle %s", this);
-    committed = true;
-    final Iterable<WindowedValue<T>> committedElements = elements.build();
-    return new CommittedBundle<T>() {
-      @Override
-      @Nullable
-      public Object getKey() {
-        return key;
-      }
-
-      @Override
-      public boolean isKeyed() {
-        return keyed;
-      }
-
-      @Override
-      public Iterable<WindowedValue<T>> getElements() {
-        return committedElements;
-      }
-
-      @Override
-      public PCollection<T> getPCollection() {
-        return pcollection;
-      }
-
-      @Override
-      public Instant getSynchronizedProcessingOutputWatermark() {
-        return synchronizedCompletionTime;
-      }
-
-      @Override
-      public String toString() {
-        return MoreObjects.toStringHelper(this)
-            .omitNullValues()
-            .add("pcollection", pcollection)
-            .add("key", key)
-            .add("elements", committedElements)
-            .toString();
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleOutputManager.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleOutputManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleOutputManager.java
deleted file mode 100644
index 406e2d4..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleOutputManager.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import java.util.Map;
-
-/**
- * An {@link OutputManager} that outputs to {@link CommittedBundle Bundles} used by the
- * {@link InProcessPipelineRunner}.
- */
-public class InProcessBundleOutputManager implements OutputManager {
-  private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
-
-  public static InProcessBundleOutputManager create(
-      Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
-    return new InProcessBundleOutputManager(outputBundles);
-  }
-
-  public InProcessBundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
-    this.bundles = bundles;
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-    @SuppressWarnings("rawtypes")
-    UncommittedBundle bundle = bundles.get(tag);
-    bundle.add(output);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
deleted file mode 100644
index 9023b7b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.Create.Values;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-/**
- * An in-process implementation of the {@link Values Create.Values} {@link PTransform}, implemented
- * using a {@link BoundedSource}.
- *
- * The coder is inferred via the {@link Values#getDefaultOutputCoder(PInput)} method on the original
- * transform.
- */
-class InProcessCreate<T> extends ForwardingPTransform<PInput, PCollection<T>> {
-  private final Create.Values<T> original;
-
-  public static <T> InProcessCreate<T> from(Create.Values<T> original) {
-    return new InProcessCreate<>(original);
-  }
-
-  private InProcessCreate(Values<T> original) {
-    this.original = original;
-  }
-
-  @Override
-  public PCollection<T> apply(PInput input) {
-    Coder<T> elementCoder;
-    try {
-      elementCoder = original.getDefaultOutputCoder(input);
-    } catch (CannotProvideCoderException e) {
-      throw new IllegalArgumentException(
-          "Unable to infer a coder and no Coder was specified. "
-          + "Please set a coder by invoking Create.withCoder() explicitly.",
-          e);
-    }
-    InMemorySource<T> source;
-    try {
-      source = new InMemorySource<>(original.getElements(), elementCoder);
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-    PCollection<T> result = input.getPipeline().apply(Read.from(source));
-    result.setCoder(elementCoder);
-    return result;
-  }
-
-  @Override
-  public PTransform<PInput, PCollection<T>> delegate() {
-    return original;
-  }
-
-  @VisibleForTesting
-  static class InMemorySource<T> extends BoundedSource<T> {
-    private final Collection<byte[]> allElementsBytes;
-    private final long totalSize;
-    private final Coder<T> coder;
-
-    public InMemorySource(Iterable<T> elements, Coder<T> elemCoder)
-        throws CoderException, IOException {
-      allElementsBytes = new ArrayList<>();
-      long totalSize = 0L;
-      for (T element : elements) {
-        byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
-        allElementsBytes.add(bytes);
-        totalSize += bytes.length;
-      }
-      this.totalSize = totalSize;
-      this.coder = elemCoder;
-    }
-
-    /**
-     * Create a new source with the specified bytes. The new source owns the input element bytes,
-     * which must not be modified after this constructor is called.
-     */
-    private InMemorySource(Collection<byte[]> elementBytes, long totalSize, Coder<T> coder) {
-      this.allElementsBytes = ImmutableList.copyOf(elementBytes);
-      this.totalSize = totalSize;
-      this.coder = coder;
-    }
-
-    @Override
-    public List<? extends BoundedSource<T>> splitIntoBundles(
-        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-      ImmutableList.Builder<InMemorySource<T>> resultBuilder = ImmutableList.builder();
-      long currentSourceSize = 0L;
-      List<byte[]> currentElems = new ArrayList<>();
-      for (byte[] elemBytes : allElementsBytes) {
-        currentElems.add(elemBytes);
-        currentSourceSize += elemBytes.length;
-        if (currentSourceSize >= desiredBundleSizeBytes) {
-          resultBuilder.add(new InMemorySource<>(currentElems, currentSourceSize, coder));
-          currentElems.clear();
-          currentSourceSize = 0L;
-        }
-      }
-      if (!currentElems.isEmpty()) {
-        resultBuilder.add(new InMemorySource<>(currentElems, currentSourceSize, coder));
-      }
-      return resultBuilder.build();
-    }
-
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      return totalSize;
-    }
-
-    @Override
-    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-      return false;
-    }
-
-    @Override
-    public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
-      return new BytesReader();
-    }
-
-    @Override
-    public void validate() {}
-
-    @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return coder;
-    }
-
-    private class BytesReader extends BoundedReader<T> {
-      private final PeekingIterator<byte[]> iter;
-      /**
-       * Use an optional to distinguish between null next element (as Optional.absent()) and no next
-       * element (next is null).
-       */
-      @Nullable private Optional<T> next;
-
-      public BytesReader() {
-        this.iter = Iterators.peekingIterator(allElementsBytes.iterator());
-      }
-
-      @Override
-      public BoundedSource<T> getCurrentSource() {
-        return InMemorySource.this;
-      }
-
-      @Override
-      public boolean start() throws IOException {
-        return advance();
-      }
-
-      @Override
-      public boolean advance() throws IOException {
-        boolean hasNext = iter.hasNext();
-        if (hasNext) {
-          next = Optional.fromNullable(CoderUtils.decodeFromByteArray(coder, iter.next()));
-        } else {
-          next = null;
-        }
-        return hasNext;
-      }
-
-      @Override
-      @Nullable
-      public T getCurrent() throws NoSuchElementException {
-        if (next == null) {
-          throw new NoSuchElementException();
-        }
-        return next.orNull();
-      }
-
-      @Override
-      public void close() throws IOException {}
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
deleted file mode 100644
index 4aeb0d3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
-import com.google.cloud.dataflow.sdk.util.ExecutionContext;
-import com.google.cloud.dataflow.sdk.util.SideInputReader;
-import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
-
-/**
- * The evaluation context for a specific pipeline being executed by the
- * {@link InProcessPipelineRunner}. Contains state shared within the execution across all
- * transforms.
- *
- * <p>{@link InProcessEvaluationContext} contains shared state for an execution of the
- * {@link InProcessPipelineRunner} that can be used while evaluating a {@link PTransform}. This
- * consists of views into underlying state and watermark implementations, access to read and write
- * {@link PCollectionView PCollectionViews}, and constructing {@link CounterSet CounterSets} and
- * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when
- * state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and
- * known to be empty).
- *
- * <p>{@link InProcessEvaluationContext} also handles results by committing finalizing bundles based
- * on the current global state and updating the global state appropriately. This includes updating
- * the per-{@link StepAndKey} state, updating global watermarks, and executing any callbacks that
- * can be executed.
- */
-class InProcessEvaluationContext {
-  /** The step name for each {@link AppliedPTransform} in the {@link Pipeline}. */
-  private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
-
-  /** The options that were used to create this {@link Pipeline}. */
-  private final InProcessPipelineOptions options;
-
-  /** The current processing time and event time watermarks and timers. */
-  private final InMemoryWatermarkManager watermarkManager;
-
-  /** Executes callbacks based on the progression of the watermark. */
-  private final WatermarkCallbackExecutor callbackExecutor;
-
-  /** The stateInternals of the world, by applied PTransform and key. */
-  private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals<?>>
-      applicationStateInternals;
-
-  private final InProcessSideInputContainer sideInputContainer;
-
-  private final CounterSet mergedCounters;
-
-  public static InProcessEvaluationContext create(
-      InProcessPipelineOptions options,
-      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
-      Map<AppliedPTransform<?, ?, ?>, String> stepNames,
-      Collection<PCollectionView<?>> views) {
-    return new InProcessEvaluationContext(
-        options, rootTransforms, valueToConsumers, stepNames, views);
-  }
-
-  private InProcessEvaluationContext(
-      InProcessPipelineOptions options,
-      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
-      Map<AppliedPTransform<?, ?, ?>, String> stepNames,
-      Collection<PCollectionView<?>> views) {
-    this.options = checkNotNull(options);
-    checkNotNull(rootTransforms);
-    checkNotNull(valueToConsumers);
-    checkNotNull(stepNames);
-    checkNotNull(views);
-    this.stepNames = stepNames;
-
-    this.watermarkManager =
-        InMemoryWatermarkManager.create(
-            NanosOffsetClock.create(), rootTransforms, valueToConsumers);
-    this.sideInputContainer = InProcessSideInputContainer.create(this, views);
-
-    this.applicationStateInternals = new ConcurrentHashMap<>();
-    this.mergedCounters = new CounterSet();
-
-    this.callbackExecutor = WatermarkCallbackExecutor.create();
-  }
-
-  /**
-   * Handle the provided {@link InProcessTransformResult}, produced after evaluating the provided
-   * {@link CommittedBundle} (potentially null, if the result of a root {@link PTransform}).
-   *
-   * <p>The result is the output of running the transform contained in the
-   * {@link InProcessTransformResult} on the contents of the provided bundle.
-   *
-   * @param completedBundle the bundle that was processed to produce the result. Potentially
-   *                        {@code null} if the transform that produced the result is a root
-   *                        transform
-   * @param completedTimers the timers that were delivered to produce the {@code completedBundle},
-   *                        or an empty iterable if no timers were delivered
-   * @param result the result of evaluating the input bundle
-   * @return the committed bundles contained within the handled {@code result}
-   */
-  public synchronized Iterable<? extends CommittedBundle<?>> handleResult(
-      @Nullable CommittedBundle<?> completedBundle,
-      Iterable<TimerData> completedTimers,
-      InProcessTransformResult result) {
-    Iterable<? extends CommittedBundle<?>> committedBundles =
-        commitBundles(result.getOutputBundles());
-    // Update watermarks and timers
-    watermarkManager.updateWatermarks(
-        completedBundle,
-        result.getTransform(),
-        result.getTimerUpdate().withCompletedTimers(completedTimers),
-        committedBundles,
-        result.getWatermarkHold());
-    fireAllAvailableCallbacks();
-    // Update counters
-    if (result.getCounters() != null) {
-      mergedCounters.merge(result.getCounters());
-    }
-    // Update state internals
-    CopyOnAccessInMemoryStateInternals<?> theirState = result.getState();
-    if (theirState != null) {
-      CopyOnAccessInMemoryStateInternals<?> committedState = theirState.commit();
-      StepAndKey stepAndKey =
-          StepAndKey.of(
-              result.getTransform(), completedBundle == null ? null : completedBundle.getKey());
-      if (!committedState.isEmpty()) {
-        applicationStateInternals.put(stepAndKey, committedState);
-      } else {
-        applicationStateInternals.remove(stepAndKey);
-      }
-    }
-    return committedBundles;
-  }
-
-  private Iterable<? extends CommittedBundle<?>> commitBundles(
-      Iterable<? extends UncommittedBundle<?>> bundles) {
-    ImmutableList.Builder<CommittedBundle<?>> completed = ImmutableList.builder();
-    for (UncommittedBundle<?> inProgress : bundles) {
-      AppliedPTransform<?, ?, ?> producing =
-          inProgress.getPCollection().getProducingTransformInternal();
-      TransformWatermarks watermarks = watermarkManager.getWatermarks(producing);
-      CommittedBundle<?> committed =
-          inProgress.commit(watermarks.getSynchronizedProcessingOutputTime());
-      // Empty bundles don't impact watermarks and shouldn't trigger downstream execution, so
-      // filter them out
-      if (!Iterables.isEmpty(committed.getElements())) {
-        completed.add(committed);
-      }
-    }
-    return completed.build();
-  }
-
-  private void fireAllAvailableCallbacks() {
-    for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
-      fireAvailableCallbacks(transform);
-    }
-  }
-
-  private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> producingTransform) {
-    TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform);
-    callbackExecutor.fireForWatermark(producingTransform, watermarks.getOutputWatermark());
-  }
-
-  /**
-   * Create a {@link UncommittedBundle} for use by a source.
-   */
-  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
-    return InProcessBundle.unkeyed(output);
-  }
-
-  /**
-   * Create a {@link UncommittedBundle} whose elements belong to the specified {@link
-   * PCollection}.
-   */
-  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
-    return input.isKeyed()
-        ? InProcessBundle.keyed(output, input.getKey())
-        : InProcessBundle.unkeyed(output);
-  }
-
-  /**
-   * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by
-   * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}.
-   */
-  public <T> UncommittedBundle<T> createKeyedBundle(
-      CommittedBundle<?> input, Object key, PCollection<T> output) {
-    return InProcessBundle.keyed(output, key);
-  }
-
-  /**
-   * Create a {@link PCollectionViewWriter}, whose elements will be used in the provided
-   * {@link PCollectionView}.
-   */
-  public <ElemT, ViewT> PCollectionViewWriter<ElemT, ViewT> createPCollectionViewWriter(
-      PCollection<Iterable<ElemT>> input, final PCollectionView<ViewT> output) {
-    return new PCollectionViewWriter<ElemT, ViewT>() {
-      @Override
-      public void add(Iterable<WindowedValue<ElemT>> values) {
-        sideInputContainer.write(output, values);
-      }
-    };
-  }
-
-  /**
-   * Schedule a callback to be executed after output would be produced for the given window
-   * if there had been input.
-   *
-   * <p>Output would be produced when the watermark for a {@link PValue} passes the point at
-   * which the trigger for the specified window (with the specified windowing strategy) must have
-   * fired from the perspective of that {@link PValue}, as specified by the value of
-   * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the
-   * {@link WindowingStrategy}. When the callback has fired, either values will have been produced
-   * for a key in that window, the window is empty, or all elements in the window are late. The
-   * callback will be executed regardless of whether values have been produced.
-   */
-  public void scheduleAfterOutputWouldBeProduced(
-      PValue value,
-      BoundedWindow window,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Runnable runnable) {
-    AppliedPTransform<?, ?, ?> producing = getProducing(value);
-    callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable);
-
-    fireAvailableCallbacks(lookupProducing(value));
-  }
-
-  private AppliedPTransform<?, ?, ?> getProducing(PValue value) {
-    if (value.getProducingTransformInternal() != null) {
-      return value.getProducingTransformInternal();
-    }
-    return lookupProducing(value);
-  }
-
-  private AppliedPTransform<?, ?, ?> lookupProducing(PValue value) {
-    for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
-      if (transform.getOutput().equals(value) || transform.getOutput().expand().contains(value)) {
-        return transform;
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Get the options used by this {@link Pipeline}.
-   */
-  public InProcessPipelineOptions getPipelineOptions() {
-    return options;
-  }
-
-  /**
-   * Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key.
-   */
-  public InProcessExecutionContext getExecutionContext(
-      AppliedPTransform<?, ?, ?> application, Object key) {
-    StepAndKey stepAndKey = StepAndKey.of(application, key);
-    return new InProcessExecutionContext(
-        options.getClock(),
-        key,
-        (CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey),
-        watermarkManager.getWatermarks(application));
-  }
-
-  /**
-   * Get all of the steps used in this {@link Pipeline}.
-   */
-  public Collection<AppliedPTransform<?, ?, ?>> getSteps() {
-    return stepNames.keySet();
-  }
-
-  /**
-   * Get the Step Name for the provided application.
-   */
-  public String getStepName(AppliedPTransform<?, ?, ?> application) {
-    return stepNames.get(application);
-  }
-
-  /**
-   * Returns a {@link SideInputReader} capable of reading the provided
-   * {@link PCollectionView PCollectionViews}.
-   * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to
-   *                   read
-   * @return a {@link SideInputReader} that can read all of the provided
-   *         {@link PCollectionView PCollectionViews}
-   */
-  public SideInputReader createSideInputReader(final List<PCollectionView<?>> sideInputs) {
-    return sideInputContainer.createReaderForViews(sideInputs);
-  }
-
-  /**
-   * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent
-   * of all other {@link CounterSet CounterSets} created by this call.
-   *
-   * The {@link InProcessEvaluationContext} is responsible for unifying the counters present in
-   * all created {@link CounterSet CounterSets} when the transforms that call this method
-   * complete.
-   */
-  public CounterSet createCounterSet() {
-    return new CounterSet();
-  }
-
-  /**
-   * Returns all of the counters that have been merged into this context via calls to
-   * {@link CounterSet#merge(CounterSet)}.
-   */
-  public CounterSet getCounters() {
-    return mergedCounters;
-  }
-
-  /**
-   * Extracts all timers that have been fired and have not already been extracted.
-   *
-   * <p>This is a destructive operation. Timers will only appear in the result of this method once
-   * for each time they are set.
-   */
-  public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() {
-    return watermarkManager.extractFiredTimers();
-  }
-
-  /**
-   * Returns true if the step will not produce additional output.
-   *
-   * <p>If the provided transform produces only {@link IsBounded#BOUNDED}
-   * {@link PCollection PCollections}, returns true if the watermark is at
-   * {@link BoundedWindow#TIMESTAMP_MAX_VALUE positive infinity}.
-   *
-   * <p>If the provided transform produces any {@link IsBounded#UNBOUNDED}
-   * {@link PCollection PCollections}, returns the value of
-   * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()}.
-   */
-  public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
-    // if the PTransform's watermark isn't at the max value, it isn't done
-    if (watermarkManager
-        .getWatermarks(transform)
-        .getOutputWatermark()
-        .isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
-      return false;
-    }
-    // If the PTransform has any unbounded outputs, and unbounded producers should not be shut down,
-    // the PTransform may produce additional output. It is not done.
-    for (PValue output : transform.getOutput().expand()) {
-      if (output instanceof PCollection) {
-        IsBounded bounded = ((PCollection<?>) output).isBounded();
-        if (bounded.equals(IsBounded.UNBOUNDED)
-            && !options.isShutdownUnboundedProducersWithMaxWatermark()) {
-          return false;
-        }
-      }
-    }
-    // The PTransform's watermark was at positive infinity and all of its outputs are known to be
-    // done. It is done.
-    return true;
-  }
-
-  /**
-   * Returns true if all steps are done.
-   */
-  public boolean isDone() {
-    for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
-      if (!isDone(transform)) {
-        return false;
-      }
-    }
-    return true;
-  }
-}