You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/14 06:47:53 UTC

[06/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
deleted file mode 100644
index 193d6a4..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
+++ /dev/null
@@ -1,1313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-import com.google.cloud.dataflow.sdk.util.TimerInternals;
-import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.SortedMultiset;
-import com.google.common.collect.TreeMultiset;
-
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Objects;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-
-/**
- * Manages watermarks of {@link PCollection PCollections} and input and output watermarks of
- * {@link AppliedPTransform AppliedPTransforms} to provide event-time and completion tracking for
- * in-memory execution. {@link InMemoryWatermarkManager} is designed to update and return a
- * consistent view of watermarks in the presence of concurrent updates.
- *
- * <p>An {@link InMemoryWatermarkManager} is provided with the collection of root
- * {@link AppliedPTransform AppliedPTransforms} and a map of {@link PCollection PCollections} to
- * all the {@link AppliedPTransform AppliedPTransforms} that consume them at construction time.
- *
- * <p>Whenever a root {@link AppliedPTransform transform} produces elements, the
- * {@link InMemoryWatermarkManager} is provided with the produced elements and the output watermark
- * of the producing {@link AppliedPTransform transform}. The
- * {@link InMemoryWatermarkManager watermark manager} is responsible for computing the watermarks
- * of all {@link AppliedPTransform transforms} that consume one or more
- * {@link PCollection PCollections}.
- *
- * <p>Whenever a non-root {@link AppliedPTransform} finishes processing one or more in-flight
- * elements (referred to as the input {@link CommittedBundle bundle}), the following occurs
- * atomically:
- * <ul>
- *  <li>All of the in-flight elements are removed from the collection of pending elements for the
- *      {@link AppliedPTransform}.</li>
- *  <li>All of the elements produced by the {@link AppliedPTransform} are added to the collection
- *      of pending elements for each {@link AppliedPTransform} that consumes them.</li>
- *  <li>The input watermark for the {@link AppliedPTransform} becomes the maximum value of
- *    <ul>
- *      <li>the previous input watermark</li>
- *      <li>the minimum of
- *        <ul>
- *          <li>the timestamps of all currently pending elements</li>
- *          <li>all input {@link PCollection} watermarks</li>
- *        </ul>
- *      </li>
- *    </ul>
- *  </li>
- *  <li>The output watermark for the {@link AppliedPTransform} becomes the maximum of
- *    <ul>
- *      <li>the previous output watermark</li>
- *      <li>the minimum of
- *        <ul>
- *          <li>the current input watermark</li>
- *          <li>the current watermark holds</li>
- *        </ul>
- *      </li>
- *    </ul>
- *  </li>
- *  <li>The watermark of the output {@link PCollection} can be advanced to the output watermark of
- *      the {@link AppliedPTransform}</li>
- *  <li>The watermark of all downstream {@link AppliedPTransform AppliedPTransforms} can be
- *      advanced.</li>
- * </ul>
- *
- * <p>The watermark of a {@link PCollection} is equal to the output watermark of the
- * {@link AppliedPTransform} that produces it.
- *
- * <p>The watermarks for a {@link PTransform} are updated as follows when output is committed:<pre>
- * Watermark_In'  = MAX(Watermark_In, MIN(U(TS_Pending), U(Watermark_InputPCollection)))
- * Watermark_Out' = MAX(Watermark_Out, MIN(Watermark_In', U(StateHold)))
- * Watermark_PCollection = Watermark_Out_ProducingPTransform
- * </pre>
- */
-public class InMemoryWatermarkManager {
-  /**
-   * The watermark of some {@link Pipeline} element, usually a {@link PTransform} or a
-   * {@link PCollection}.
-   *
-   * <p>A watermark is a monotonically increasing value, which represents the point up to which the
-   * system believes it has received all of the data. Data that arrives with a timestamp that is
-   * before the watermark is considered late. {@link BoundedWindow#TIMESTAMP_MAX_VALUE} is a special
-   * timestamp which indicates we have received all of the data and there will be no more on-time or
-   * late data. This value is represented by {@link InMemoryWatermarkManager#THE_END_OF_TIME}.
-   */
-  private static interface Watermark {
-    /**
-     * Returns the current value of this watermark.
-     */
-    Instant get();
-
-    /**
-     * Refreshes the value of this watermark from its input watermarks and watermark holds.
-     *
-     * @return true if the value of the watermark has changed (and thus dependent watermark must
-     *         also be updated
-     */
-    WatermarkUpdate refresh();
-  }
-
-  /**
-   * The result of computing a {@link Watermark}.
-   */
-  private static enum WatermarkUpdate {
-    /** The watermark is later than the value at the previous time it was computed. */
-    ADVANCED(true),
-    /** The watermark is equal to the value at the previous time it was computed. */
-    NO_CHANGE(false);
-
-    private final boolean advanced;
-
-    private WatermarkUpdate(boolean advanced) {
-      this.advanced = advanced;
-    }
-
-    public boolean isAdvanced() {
-      return advanced;
-    }
-
-    /**
-     * Returns the {@link WatermarkUpdate} that is a result of combining the two watermark updates.
-     *
-     * If either of the input {@link WatermarkUpdate WatermarkUpdates} were advanced, the result
-     * {@link WatermarkUpdate} has been advanced.
-     */
-    public WatermarkUpdate union(WatermarkUpdate that) {
-      if (this.advanced) {
-        return this;
-      }
-      return that;
-    }
-
-    /**
-     * Returns the {@link WatermarkUpdate} based on the former and current
-     * {@link Instant timestamps}.
-     */
-    public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTime) {
-      if (currentTime.isAfter(oldTime)) {
-        return ADVANCED;
-      }
-      return NO_CHANGE;
-    }
-  }
-
-  /**
-   * The input {@link Watermark} of an {@link AppliedPTransform}.
-   *
-   * <p>At any point, the value of an {@link AppliedPTransformInputWatermark} is equal to the
-   * minimum watermark across all of its input {@link Watermark Watermarks}, and the minimum
-   * timestamp of all of the pending elements, restricted to be monotonically increasing.
-   *
-   * <p>See {@link #refresh()} for more information.
-   */
-  private static class AppliedPTransformInputWatermark implements Watermark {
-    private final Collection<? extends Watermark> inputWatermarks;
-    private final SortedMultiset<WindowedValue<?>> pendingElements;
-    private final Map<Object, NavigableSet<TimerData>> objectTimers;
-
-    private AtomicReference<Instant> currentWatermark;
-
-    public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) {
-      this.inputWatermarks = inputWatermarks;
-      this.pendingElements = TreeMultiset.create(PENDING_ELEMENT_COMPARATOR);
-      this.objectTimers = new HashMap<>();
-      currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
-    }
-
-    @Override
-    public Instant get() {
-      return currentWatermark.get();
-    }
-
-    /**
-     * {@inheritDoc}.
-     *
-     * <p>When refresh is called, the value of the {@link AppliedPTransformInputWatermark} becomes
-     * equal to the maximum value of
-     * <ul>
-     *   <li>the previous input watermark</li>
-     *   <li>the minimum of
-     *     <ul>
-     *       <li>the timestamps of all currently pending elements</li>
-     *       <li>all input {@link PCollection} watermarks</li>
-     *     </ul>
-     *   </li>
-     * </ul>
-     */
-    @Override
-    public synchronized WatermarkUpdate refresh() {
-      Instant oldWatermark = currentWatermark.get();
-      Instant minInputWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
-      for (Watermark inputWatermark : inputWatermarks) {
-        minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, inputWatermark.get());
-      }
-      if (!pendingElements.isEmpty()) {
-        minInputWatermark = INSTANT_ORDERING.min(
-            minInputWatermark, pendingElements.firstEntry().getElement().getTimestamp());
-      }
-      Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark);
-      currentWatermark.set(newWatermark);
-      return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
-    }
-
-    private synchronized void addPendingElements(Iterable<? extends WindowedValue<?>> newPending) {
-      for (WindowedValue<?> pendingElement : newPending) {
-        pendingElements.add(pendingElement);
-      }
-    }
-
-    private synchronized void removePendingElements(
-        Iterable<? extends WindowedValue<?>> finishedElements) {
-      for (WindowedValue<?> finishedElement : finishedElements) {
-        pendingElements.remove(finishedElement);
-      }
-    }
-
-    private synchronized void updateTimers(TimerUpdate update) {
-      NavigableSet<TimerData> keyTimers = objectTimers.get(update.key);
-      if (keyTimers == null) {
-        keyTimers = new TreeSet<>();
-        objectTimers.put(update.key, keyTimers);
-      }
-      for (TimerData timer : update.setTimers) {
-        if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          keyTimers.add(timer);
-        }
-      }
-      for (TimerData timer : update.deletedTimers) {
-        if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          keyTimers.remove(timer);
-        }
-      }
-      // We don't keep references to timers that have been fired and delivered via #getFiredTimers()
-    }
-
-    private synchronized Map<Object, List<TimerData>> extractFiredEventTimeTimers() {
-      return extractFiredTimers(currentWatermark.get(), objectTimers);
-    }
-
-    @Override
-    public synchronized String toString() {
-      return MoreObjects.toStringHelper(AppliedPTransformInputWatermark.class)
-          .add("pendingElements", pendingElements)
-          .add("currentWatermark", currentWatermark)
-          .toString();
-    }
-  }
-
-  /**
-   * The output {@link Watermark} of an {@link AppliedPTransform}.
-   *
-   * <p>The value of an {@link AppliedPTransformOutputWatermark} is equal to the minimum of the
-   * current watermark hold and the {@link AppliedPTransformInputWatermark} for the same
-   * {@link AppliedPTransform}, restricted to be monotonically increasing. See
-   * {@link #refresh()} for more information.
-   */
-  private static class AppliedPTransformOutputWatermark implements Watermark {
-    private final Watermark inputWatermark;
-    private final PerKeyHolds holds;
-    private AtomicReference<Instant> currentWatermark;
-
-    public AppliedPTransformOutputWatermark(AppliedPTransformInputWatermark inputWatermark) {
-      this.inputWatermark = inputWatermark;
-      holds = new PerKeyHolds();
-      currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
-    }
-
-    public synchronized void updateHold(Object key, Instant newHold) {
-      if (newHold == null) {
-        holds.removeHold(key);
-      } else {
-        holds.updateHold(key, newHold);
-      }
-    }
-
-    @Override
-    public Instant get() {
-      return currentWatermark.get();
-    }
-
-    /**
-     * {@inheritDoc}.
-     *
-     * <p>When refresh is called, the value of the {@link AppliedPTransformOutputWatermark} becomes
-     * equal to the maximum value of:
-     * <ul>
-     *   <li>the previous output watermark</li>
-     *   <li>the minimum of
-     *     <ul>
-     *       <li>the current input watermark</li>
-     *       <li>the current watermark holds</li>
-     *     </ul>
-     *   </li>
-     * </ul>
-     */
-    @Override
-    public synchronized WatermarkUpdate refresh() {
-      Instant oldWatermark = currentWatermark.get();
-      Instant newWatermark = INSTANT_ORDERING.min(inputWatermark.get(), holds.getMinHold());
-      newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark);
-      currentWatermark.set(newWatermark);
-      return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
-    }
-
-    @Override
-    public synchronized String toString() {
-      return MoreObjects.toStringHelper(AppliedPTransformOutputWatermark.class)
-          .add("holds", holds)
-          .add("currentWatermark", currentWatermark)
-          .toString();
-    }
-  }
-
-  /**
-   * The input {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an
-   * {@link AppliedPTransform}.
-   *
-   * <p>At any point, the hold value of an {@link SynchronizedProcessingTimeInputWatermark} is equal
-   * to the minimum across all pending bundles at the {@link AppliedPTransform} and all upstream
-   * {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The value of the input
-   * synchronized processing time at any step is equal to the maximum of:
-   * <ul>
-   *   <li>The most recently returned synchronized processing input time
-   *   <li>The minimum of
-   *     <ul>
-   *       <li>The current processing time
-   *       <li>The current synchronized processing time input hold
-   *     </ul>
-   * </ul>
-   */
-  private static class SynchronizedProcessingTimeInputWatermark implements Watermark {
-    private final Collection<? extends Watermark> inputWms;
-    private final Collection<CommittedBundle<?>> pendingBundles;
-    private final Map<Object, NavigableSet<TimerData>> processingTimers;
-    private final Map<Object, NavigableSet<TimerData>> synchronizedProcessingTimers;
-
-    private final PriorityQueue<TimerData> pendingTimers;
-
-    private AtomicReference<Instant> earliestHold;
-
-    public SynchronizedProcessingTimeInputWatermark(Collection<? extends Watermark> inputWms) {
-      this.inputWms = inputWms;
-      this.pendingBundles = new HashSet<>();
-      this.processingTimers = new HashMap<>();
-      this.synchronizedProcessingTimers = new HashMap<>();
-      this.pendingTimers = new PriorityQueue<>();
-      Instant initialHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
-      for (Watermark wm : inputWms) {
-        initialHold = INSTANT_ORDERING.min(initialHold, wm.get());
-      }
-      earliestHold = new AtomicReference<>(initialHold);
-    }
-
-    @Override
-    public Instant get() {
-      return earliestHold.get();
-    }
-
-    /**
-     * {@inheritDoc}.
-     *
-     * <p>When refresh is called, the value of the {@link SynchronizedProcessingTimeInputWatermark}
-     * becomes equal to the minimum value of
-     * <ul>
-     *   <li>the timestamps of all currently pending bundles</li>
-     *   <li>all input {@link PCollection} synchronized processing time watermarks</li>
-     * </ul>
-     *
-     * <p>Note that this value is not monotonic, but the returned value for the synchronized
-     * processing time must be.
-     */
-    @Override
-    public synchronized WatermarkUpdate refresh() {
-      Instant oldHold = earliestHold.get();
-      Instant minTime = THE_END_OF_TIME.get();
-      for (Watermark input : inputWms) {
-        minTime = INSTANT_ORDERING.min(minTime, input.get());
-      }
-      for (CommittedBundle<?> bundle : pendingBundles) {
-        // TODO: Track elements in the bundle by the processing time they were output instead of
-        // entire bundles. Requried to support arbitrarily splitting and merging bundles between
-        // steps
-        minTime = INSTANT_ORDERING.min(minTime, bundle.getSynchronizedProcessingOutputWatermark());
-      }
-      earliestHold.set(minTime);
-      return WatermarkUpdate.fromTimestamps(oldHold, minTime);
-    }
-
-    public synchronized void addPending(CommittedBundle<?> bundle) {
-      pendingBundles.add(bundle);
-    }
-
-    public synchronized void removePending(CommittedBundle<?> bundle) {
-      pendingBundles.remove(bundle);
-    }
-
-    /**
-     * Return the earliest timestamp of the earliest timer that has not been completed. This is
-     * either the earliest timestamp across timers that have not been completed, or the earliest
-     * timestamp across timers that have been delivered but have not been completed.
-     */
-    public synchronized Instant getEarliestTimerTimestamp() {
-      Instant earliest = THE_END_OF_TIME.get();
-      for (NavigableSet<TimerData> timers : processingTimers.values()) {
-        if (!timers.isEmpty()) {
-          earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest);
-        }
-      }
-      for (NavigableSet<TimerData> timers : synchronizedProcessingTimers.values()) {
-        if (!timers.isEmpty()) {
-          earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest);
-        }
-      }
-      if (!pendingTimers.isEmpty()) {
-        earliest = INSTANT_ORDERING.min(pendingTimers.peek().getTimestamp(), earliest);
-      }
-      return earliest;
-    }
-
-    private synchronized void updateTimers(TimerUpdate update) {
-      for (TimerData completedTimer : update.completedTimers) {
-        pendingTimers.remove(completedTimer);
-      }
-      Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key);
-      for (TimerData addedTimer : update.setTimers) {
-        NavigableSet<TimerData> timerQueue = timerMap.get(addedTimer.getDomain());
-        if (timerQueue != null) {
-          timerQueue.add(addedTimer);
-        }
-      }
-      for (TimerData deletedTimer : update.deletedTimers) {
-        NavigableSet<TimerData> timerQueue = timerMap.get(deletedTimer.getDomain());
-        if (timerQueue != null) {
-          timerQueue.remove(deletedTimer);
-        }
-      }
-    }
-
-    private synchronized Map<Object, List<TimerData>> extractFiredDomainTimers(
-        TimeDomain domain, Instant firingTime) {
-      Map<Object, List<TimerData>> firedTimers;
-      switch (domain) {
-        case PROCESSING_TIME:
-          firedTimers = extractFiredTimers(firingTime, processingTimers);
-          break;
-        case SYNCHRONIZED_PROCESSING_TIME:
-          firedTimers =
-              extractFiredTimers(
-                  INSTANT_ORDERING.min(firingTime, earliestHold.get()),
-                  synchronizedProcessingTimers);
-          break;
-        default:
-          throw new IllegalArgumentException(
-              "Called getFiredTimers on a Synchronized Processing Time watermark"
-                  + " and gave a non-processing time domain "
-                  + domain);
-      }
-      for (Map.Entry<Object, ? extends Collection<TimerData>> firedTimer : firedTimers.entrySet()) {
-        pendingTimers.addAll(firedTimer.getValue());
-      }
-      return firedTimers;
-    }
-
-    private Map<TimeDomain, NavigableSet<TimerData>> timerMap(Object key) {
-      NavigableSet<TimerData> processingQueue = processingTimers.get(key);
-      if (processingQueue == null) {
-        processingQueue = new TreeSet<>();
-        processingTimers.put(key, processingQueue);
-      }
-      NavigableSet<TimerData> synchronizedProcessingQueue =
-          synchronizedProcessingTimers.get(key);
-      if (synchronizedProcessingQueue == null) {
-        synchronizedProcessingQueue = new TreeSet<>();
-        synchronizedProcessingTimers.put(key, synchronizedProcessingQueue);
-      }
-      EnumMap<TimeDomain, NavigableSet<TimerData>> result = new EnumMap<>(TimeDomain.class);
-      result.put(TimeDomain.PROCESSING_TIME, processingQueue);
-      result.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, synchronizedProcessingQueue);
-      return result;
-    }
-
-    @Override
-    public synchronized String toString() {
-      return MoreObjects.toStringHelper(SynchronizedProcessingTimeInputWatermark.class)
-          .add("earliestHold", earliestHold)
-          .toString();
-    }
-  }
-
-  /**
-   * The output {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an
-   * {@link AppliedPTransform}.
-   *
-   * <p>At any point, the hold value of an {@link SynchronizedProcessingTimeOutputWatermark} is
-   * equal to the minimum across all incomplete timers at the {@link AppliedPTransform} and all
-   * upstream {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The value of the output
-   * synchronized processing time at any step is equal to the maximum of:
-   * <ul>
-   *   <li>The most recently returned synchronized processing output time
-   *   <li>The minimum of
-   *     <ul>
-   *       <li>The current processing time
-   *       <li>The current synchronized processing time output hold
-   *     </ul>
-   * </ul>
-   */
-  private static class SynchronizedProcessingTimeOutputWatermark implements Watermark {
-    private final SynchronizedProcessingTimeInputWatermark inputWm;
-    private AtomicReference<Instant> latestRefresh;
-
-    public SynchronizedProcessingTimeOutputWatermark(
-        SynchronizedProcessingTimeInputWatermark inputWm) {
-      this.inputWm = inputWm;
-      this.latestRefresh = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
-    }
-
-    @Override
-    public Instant get() {
-      return latestRefresh.get();
-    }
-
-    /**
-     * {@inheritDoc}.
-     *
-     * <p>When refresh is called, the value of the {@link SynchronizedProcessingTimeOutputWatermark}
-     * becomes equal to the minimum value of:
-     * <ul>
-     *   <li>the current input watermark.
-     *   <li>all {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} timers that are based on the input
-     *       watermark.
-     *   <li>all {@link TimeDomain#PROCESSING_TIME} timers that are based on the input watermark.
-     * </ul>
-     *
-     * <p>Note that this value is not monotonic, but the returned value for the synchronized
-     * processing time must be.
-     */
-    @Override
-    public synchronized WatermarkUpdate refresh() {
-      // Hold the output synchronized processing time to the input watermark, which takes into
-      // account buffered bundles, and the earliest pending timer, which determines what to hold
-      // downstream timers to.
-      Instant oldRefresh = latestRefresh.get();
-      Instant newTimestamp =
-          INSTANT_ORDERING.min(inputWm.get(), inputWm.getEarliestTimerTimestamp());
-      latestRefresh.set(newTimestamp);
-      return WatermarkUpdate.fromTimestamps(oldRefresh, newTimestamp);
-    }
-
-    @Override
-    public synchronized String toString() {
-      return MoreObjects.toStringHelper(SynchronizedProcessingTimeOutputWatermark.class)
-          .add("latestRefresh", latestRefresh)
-          .toString();
-    }
-  }
-
-  /**
-   * The {@code Watermark} that is after the latest time it is possible to represent in the global
-   * window. This is a distinguished value representing a complete {@link PTransform}.
-   */
-  private static final Watermark THE_END_OF_TIME = new Watermark() {
-        @Override
-        public WatermarkUpdate refresh() {
-          // THE_END_OF_TIME is a distinguished value that cannot be advanced.
-          return WatermarkUpdate.NO_CHANGE;
-        }
-
-        @Override
-        public Instant get() {
-          return BoundedWindow.TIMESTAMP_MAX_VALUE;
-        }
-      };
-
-  private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural();
-
-  /**
-   * An ordering that compares windowed values by timestamp, then arbitrarily. This ensures that
-   * {@link WindowedValue WindowedValues} will be sorted by timestamp, while two different
-   * {@link WindowedValue WindowedValues} with the same timestamp are not considered equal.
-   */
-  private static final Ordering<WindowedValue<? extends Object>> PENDING_ELEMENT_COMPARATOR =
-      (new WindowedValueByTimestampComparator()).compound(Ordering.arbitrary());
-
-  /**
-   * For each (Object, PriorityQueue) pair in the provided map, remove each Timer that is before the
-   * latestTime argument and put in in the result with the same key, then remove all of the keys
-   * which have no more pending timers.
-   *
-   * The result collection retains ordering of timers (from earliest to latest).
-   */
-  private static Map<Object, List<TimerData>> extractFiredTimers(
-      Instant latestTime, Map<Object, NavigableSet<TimerData>> objectTimers) {
-    Map<Object, List<TimerData>> result = new HashMap<>();
-    Set<Object> emptyKeys = new HashSet<>();
-    for (Map.Entry<Object, NavigableSet<TimerData>> pendingTimers : objectTimers.entrySet()) {
-      NavigableSet<TimerData> timers = pendingTimers.getValue();
-      if (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) {
-        ArrayList<TimerData> keyFiredTimers = new ArrayList<>();
-        result.put(pendingTimers.getKey(), keyFiredTimers);
-        while (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) {
-          keyFiredTimers.add(timers.first());
-          timers.remove(timers.first());
-        }
-      }
-      if (timers.isEmpty()) {
-        emptyKeys.add(pendingTimers.getKey());
-      }
-    }
-    objectTimers.keySet().removeAll(emptyKeys);
-    return result;
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * The {@link Clock} providing the current time in the {@link TimeDomain#PROCESSING_TIME} domain.
-   */
-  private final Clock clock;
-
-  /**
-   * A map from each {@link PCollection} to all {@link AppliedPTransform PTransform applications}
-   * that consume that {@link PCollection}.
-   */
-  private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers;
-
-  /**
-   * The input and output watermark of each {@link AppliedPTransform}.
-   */
-  private final Map<AppliedPTransform<?, ?, ?>, TransformWatermarks> transformToWatermarks;
-
-  /**
-   * Creates a new {@link InMemoryWatermarkManager}. All watermarks within the newly created
-   * {@link InMemoryWatermarkManager} start at {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, the
-   * minimum watermark, with no watermark holds or pending elements.
-   *
-   * @param rootTransforms the root-level transforms of the {@link Pipeline}
-   * @param consumers a mapping between each {@link PCollection} in the {@link Pipeline} to the
-   *                  transforms that consume it as a part of their input
-   */
-  public static InMemoryWatermarkManager create(
-      Clock clock,
-      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) {
-    return new InMemoryWatermarkManager(clock, rootTransforms, consumers);
-  }
-
-  private InMemoryWatermarkManager(
-      Clock clock,
-      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
-      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) {
-    this.clock = clock;
-    this.consumers = consumers;
-
-    transformToWatermarks = new HashMap<>();
-
-    for (AppliedPTransform<?, ?, ?> rootTransform : rootTransforms) {
-      getTransformWatermark(rootTransform);
-    }
-    for (Collection<AppliedPTransform<?, ?, ?>> intermediateTransforms : consumers.values()) {
-      for (AppliedPTransform<?, ?, ?> transform : intermediateTransforms) {
-        getTransformWatermark(transform);
-      }
-    }
-  }
-
-  private TransformWatermarks getTransformWatermark(AppliedPTransform<?, ?, ?> transform) {
-    TransformWatermarks wms = transformToWatermarks.get(transform);
-    if (wms == null) {
-      List<Watermark> inputCollectionWatermarks = getInputWatermarks(transform);
-      AppliedPTransformInputWatermark inputWatermark =
-          new AppliedPTransformInputWatermark(inputCollectionWatermarks);
-      AppliedPTransformOutputWatermark outputWatermark =
-          new AppliedPTransformOutputWatermark(inputWatermark);
-
-      SynchronizedProcessingTimeInputWatermark inputProcessingWatermark =
-          new SynchronizedProcessingTimeInputWatermark(getInputProcessingWatermarks(transform));
-      SynchronizedProcessingTimeOutputWatermark outputProcessingWatermark =
-          new SynchronizedProcessingTimeOutputWatermark(inputProcessingWatermark);
-
-      wms =
-          new TransformWatermarks(
-              inputWatermark, outputWatermark, inputProcessingWatermark, outputProcessingWatermark);
-      transformToWatermarks.put(transform, wms);
-    }
-    return wms;
-  }
-
-  private Collection<Watermark> getInputProcessingWatermarks(
-      AppliedPTransform<?, ?, ?> transform) {
-    ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
-    Collection<? extends PValue> inputs = transform.getInput().expand();
-    if (inputs.isEmpty()) {
-      inputWmsBuilder.add(THE_END_OF_TIME);
-    }
-    for (PValue pvalue : inputs) {
-      Watermark producerOutputWatermark =
-          getTransformWatermark(pvalue.getProducingTransformInternal())
-              .synchronizedProcessingOutputWatermark;
-      inputWmsBuilder.add(producerOutputWatermark);
-    }
-    return inputWmsBuilder.build();
-  }
-
-  private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> transform) {
-    ImmutableList.Builder<Watermark> inputWatermarksBuilder = ImmutableList.builder();
-    Collection<? extends PValue> inputs = transform.getInput().expand();
-    if (inputs.isEmpty()) {
-      inputWatermarksBuilder.add(THE_END_OF_TIME);
-    }
-    for (PValue pvalue : inputs) {
-      Watermark producerOutputWatermark =
-          getTransformWatermark(pvalue.getProducingTransformInternal()).outputWatermark;
-      inputWatermarksBuilder.add(producerOutputWatermark);
-    }
-    List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build();
-    return inputCollectionWatermarks;
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Gets the input and output watermarks for an {@link AppliedPTransform}. If the
-   * {@link AppliedPTransform PTransform} has not processed any elements, return a watermark of
-   * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}.
-   *
-   * @return a snapshot of the input watermark and output watermark for the provided transform
-   */
-  public TransformWatermarks getWatermarks(AppliedPTransform<?, ?, ?> transform) {
-    return transformToWatermarks.get(transform);
-  }
-
-  /**
-   * Updates the watermarks of a transform with one or more inputs.
-   *
-   * <p>Each transform has two monotonically increasing watermarks: the input watermark, which can,
-   * at any time, be updated to equal:
-   * <pre>
-   * MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks))
-   * </pre>
-   * and the output watermark, which can, at any time, be updated to equal:
-   * <pre>
-   * MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))
-   * </pre>.
-   *
-   * @param completed the input that has completed
-   * @param transform the transform that has completed processing the input
-   * @param outputs the bundles the transform has output
-   * @param earliestHold the earliest watermark hold in the transform's state. {@code null} if there
-   *                     is no hold
-   */
-  public void updateWatermarks(
-      @Nullable CommittedBundle<?> completed,
-      AppliedPTransform<?, ?, ?> transform,
-      TimerUpdate timerUpdate,
-      Iterable<? extends CommittedBundle<?>> outputs,
-      @Nullable Instant earliestHold) {
-    updatePending(completed, transform, timerUpdate, outputs);
-    TransformWatermarks transformWms = transformToWatermarks.get(transform);
-    transformWms.setEventTimeHold(completed == null ? null : completed.getKey(), earliestHold);
-    refreshWatermarks(transform);
-  }
-
-  private void refreshWatermarks(AppliedPTransform<?, ?, ?> transform) {
-    TransformWatermarks myWatermarks = transformToWatermarks.get(transform);
-    WatermarkUpdate updateResult = myWatermarks.refresh();
-    if (updateResult.isAdvanced()) {
-      for (PValue outputPValue : transform.getOutput().expand()) {
-        Collection<AppliedPTransform<?, ?, ?>> downstreamTransforms = consumers.get(outputPValue);
-        if (downstreamTransforms != null) {
-          for (AppliedPTransform<?, ?, ?> downstreamTransform : downstreamTransforms) {
-            refreshWatermarks(downstreamTransform);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Removes all of the completed Timers from the collection of pending timers, adds all new timers,
-   * and removes all deleted timers. Removes all elements consumed by the input bundle from the
-   * {@link PTransform PTransforms} collection of pending elements, and adds all elements produced
-   * by the {@link PTransform} to the pending queue of each consumer.
-   */
-  private void updatePending(
-      CommittedBundle<?> input,
-      AppliedPTransform<?, ?, ?> transform,
-      TimerUpdate timerUpdate,
-      Iterable<? extends CommittedBundle<?>> outputs) {
-    TransformWatermarks completedTransform = transformToWatermarks.get(transform);
-    completedTransform.updateTimers(timerUpdate);
-    if (input != null) {
-      completedTransform.removePending(input);
-    }
-
-    for (CommittedBundle<?> bundle : outputs) {
-      for (AppliedPTransform<?, ?, ?> consumer : consumers.get(bundle.getPCollection())) {
-        TransformWatermarks watermarks = transformToWatermarks.get(consumer);
-        watermarks.addPending(bundle);
-      }
-    }
-  }
-
-  /**
-   * Returns a map of each {@link PTransform} that has pending timers to those timers. All of the
-   * pending timers will be removed from this {@link InMemoryWatermarkManager}.
-   */
-  public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() {
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> allTimers = new HashMap<>();
-    for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry :
-        transformToWatermarks.entrySet()) {
-      Map<Object, FiredTimers> keyFiredTimers = watermarksEntry.getValue().extractFiredTimers();
-      if (!keyFiredTimers.isEmpty()) {
-        allTimers.put(watermarksEntry.getKey(), keyFiredTimers);
-      }
-    }
-    return allTimers;
-  }
-
-  /**
-   * A (key, Instant) pair that holds the watermark. Holds are per-key, but the watermark is global,
-   * and as such the watermark manager must track holds and the release of holds on a per-key basis.
-   *
-   * <p>The {@link #compareTo(KeyedHold)} method of {@link KeyedHold} is not consistent with equals,
-   * as the key is arbitrarily ordered via identity, rather than object equality.
-   */
-  private static final class KeyedHold implements Comparable<KeyedHold> {
-    private static final Ordering<Object> KEY_ORDERING = Ordering.arbitrary().nullsLast();
-
-    private final Object key;
-    private final Instant timestamp;
-
-    /**
-     * Create a new KeyedHold with the specified key and timestamp.
-     */
-    public static KeyedHold of(Object key, Instant timestamp) {
-      return new KeyedHold(key, MoreObjects.firstNonNull(timestamp, THE_END_OF_TIME.get()));
-    }
-
-    private KeyedHold(Object key, Instant timestamp) {
-      this.key = key;
-      this.timestamp = timestamp;
-    }
-
-    @Override
-    public int compareTo(KeyedHold that) {
-      return ComparisonChain.start()
-          .compare(this.timestamp, that.timestamp)
-          .compare(this.key, that.key, KEY_ORDERING)
-          .result();
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(timestamp, key);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof KeyedHold)) {
-        return false;
-      }
-      KeyedHold that = (KeyedHold) other;
-      return Objects.equals(this.timestamp, that.timestamp) && Objects.equals(this.key, that.key);
-    }
-
-    /**
-     * Get the value of this {@link KeyedHold}.
-     */
-    public Instant getTimestamp() {
-      return timestamp;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(KeyedHold.class)
-          .add("key", key)
-          .add("hold", timestamp)
-          .toString();
-    }
-  }
-
-  private static class PerKeyHolds {
-    private final Map<Object, KeyedHold> keyedHolds;
-    private final PriorityQueue<KeyedHold> allHolds;
-
-    private PerKeyHolds() {
-      this.keyedHolds = new HashMap<>();
-      this.allHolds = new PriorityQueue<>();
-    }
-
-    /**
-     * Gets the minimum hold across all keys in this {@link PerKeyHolds}, or THE_END_OF_TIME if
-     * there are no holds within this {@link PerKeyHolds}.
-     */
-    public Instant getMinHold() {
-      return allHolds.isEmpty() ? THE_END_OF_TIME.get() : allHolds.peek().getTimestamp();
-    }
-
-    /**
-     * Updates the hold of the provided key to the provided value, removing any other holds for
-     * the same key.
-     */
-    public void updateHold(@Nullable Object key, Instant newHold) {
-      removeHold(key);
-      KeyedHold newKeyedHold = KeyedHold.of(key, newHold);
-      keyedHolds.put(key, newKeyedHold);
-      allHolds.offer(newKeyedHold);
-    }
-
-    /**
-     * Removes the hold of the provided key.
-     */
-    public void removeHold(Object key) {
-      KeyedHold oldHold = keyedHolds.get(key);
-      if (oldHold != null) {
-        allHolds.remove(oldHold);
-      }
-    }
-  }
-
-  /**
-   * A reference to the input and output watermarks of an {@link AppliedPTransform}.
-   */
-  public class TransformWatermarks {
-    private final AppliedPTransformInputWatermark inputWatermark;
-    private final AppliedPTransformOutputWatermark outputWatermark;
-
-    private final SynchronizedProcessingTimeInputWatermark synchronizedProcessingInputWatermark;
-    private final SynchronizedProcessingTimeOutputWatermark synchronizedProcessingOutputWatermark;
-
-    private Instant latestSynchronizedInputWm;
-    private Instant latestSynchronizedOutputWm;
-
-    private TransformWatermarks(
-        AppliedPTransformInputWatermark inputWatermark,
-        AppliedPTransformOutputWatermark outputWatermark,
-        SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark,
-        SynchronizedProcessingTimeOutputWatermark outputSynchProcessingWatermark) {
-      this.inputWatermark = inputWatermark;
-      this.outputWatermark = outputWatermark;
-
-      this.synchronizedProcessingInputWatermark = inputSynchProcessingWatermark;
-      this.synchronizedProcessingOutputWatermark = outputSynchProcessingWatermark;
-      this.latestSynchronizedInputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      this.latestSynchronizedOutputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
-    }
-
-    /**
-     * Returns the input watermark of the {@link AppliedPTransform}.
-     */
-    public Instant getInputWatermark() {
-      return Preconditions.checkNotNull(inputWatermark.get());
-    }
-
-    /**
-     * Returns the output watermark of the {@link AppliedPTransform}.
-     */
-    public Instant getOutputWatermark() {
-      return outputWatermark.get();
-    }
-
-    /**
-     * Returns the synchronized processing input time of the {@link AppliedPTransform}.
-     *
-     * <p>The returned value is guaranteed to be monotonically increasing, and outside of the
-     * presence of holds, will increase as the system time progresses.
-     */
-    public synchronized Instant getSynchronizedProcessingInputTime() {
-      latestSynchronizedInputWm = INSTANT_ORDERING.max(
-          latestSynchronizedInputWm,
-          INSTANT_ORDERING.min(clock.now(), synchronizedProcessingInputWatermark.get()));
-      return latestSynchronizedInputWm;
-    }
-
-    /**
-     * Returns the synchronized processing output time of the {@link AppliedPTransform}.
-     *
-     * <p>The returned value is guaranteed to be monotonically increasing, and outside of the
-     * presence of holds, will increase as the system time progresses.
-     */
-    public synchronized Instant getSynchronizedProcessingOutputTime() {
-      latestSynchronizedOutputWm = INSTANT_ORDERING.max(
-          latestSynchronizedOutputWm,
-          INSTANT_ORDERING.min(clock.now(), synchronizedProcessingOutputWatermark.get()));
-      return latestSynchronizedOutputWm;
-    }
-
-    private WatermarkUpdate refresh() {
-      inputWatermark.refresh();
-      synchronizedProcessingInputWatermark.refresh();
-      WatermarkUpdate eventOutputUpdate = outputWatermark.refresh();
-      WatermarkUpdate syncOutputUpdate = synchronizedProcessingOutputWatermark.refresh();
-      return eventOutputUpdate.union(syncOutputUpdate);
-    }
-
-    private void setEventTimeHold(Object key, Instant newHold) {
-      outputWatermark.updateHold(key, newHold);
-    }
-
-    private void removePending(CommittedBundle<?> bundle) {
-      inputWatermark.removePendingElements(bundle.getElements());
-      synchronizedProcessingInputWatermark.removePending(bundle);
-    }
-
-    private void addPending(CommittedBundle<?> bundle) {
-      inputWatermark.addPendingElements(bundle.getElements());
-      synchronizedProcessingInputWatermark.addPending(bundle);
-    }
-
-    private Map<Object, FiredTimers> extractFiredTimers() {
-      Map<Object, List<TimerData>> eventTimeTimers = inputWatermark.extractFiredEventTimeTimers();
-      Map<Object, List<TimerData>> processingTimers;
-      Map<Object, List<TimerData>> synchronizedTimers;
-      if (inputWatermark.get().equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
-        processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
-            TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE);
-        synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
-            TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE);
-      } else {
-        processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
-            TimeDomain.PROCESSING_TIME, clock.now());
-        synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
-            TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime());
-      }
-      Map<Object, Map<TimeDomain, List<TimerData>>> groupedTimers = new HashMap<>();
-      groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, synchronizedTimers);
-
-      Map<Object, FiredTimers> keyFiredTimers = new HashMap<>();
-      for (Map.Entry<Object, Map<TimeDomain, List<TimerData>>> firedTimers :
-          groupedTimers.entrySet()) {
-        keyFiredTimers.put(firedTimers.getKey(), new FiredTimers(firedTimers.getValue()));
-      }
-      return keyFiredTimers;
-    }
-
-    @SafeVarargs
-    private final void groupFiredTimers(
-        Map<Object, Map<TimeDomain, List<TimerData>>> groupedToMutate,
-        Map<Object, List<TimerData>>... timersToGroup) {
-      for (Map<Object, List<TimerData>> subGroup : timersToGroup) {
-        for (Map.Entry<Object, List<TimerData>> newTimers : subGroup.entrySet()) {
-          Map<TimeDomain, List<TimerData>> grouped = groupedToMutate.get(newTimers.getKey());
-          if (grouped == null) {
-            grouped = new HashMap<>();
-            groupedToMutate.put(newTimers.getKey(), grouped);
-          }
-          grouped.put(newTimers.getValue().get(0).getDomain(), newTimers.getValue());
-        }
-      }
-    }
-
-    private void updateTimers(TimerUpdate update) {
-      inputWatermark.updateTimers(update);
-      synchronizedProcessingInputWatermark.updateTimers(update);
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(TransformWatermarks.class)
-          .add("inputWatermark", inputWatermark)
-          .add("outputWatermark", outputWatermark)
-          .add("inputProcessingTime", synchronizedProcessingInputWatermark)
-          .add("outputProcessingTime", synchronizedProcessingOutputWatermark)
-          .toString();
-    }
-  }
-
-  /**
-   * A collection of newly set, deleted, and completed timers.
-   *
-   * <p>setTimers and deletedTimers are collections of {@link TimerData} that have been added to the
-   * {@link TimerInternals} of an executed step. completedTimers are timers that were delivered as
-   * the input to the executed step.
-   */
-  public static class TimerUpdate {
-    private final Object key;
-    private final Iterable<? extends TimerData> completedTimers;
-
-    private final Iterable<? extends TimerData> setTimers;
-    private final Iterable<? extends TimerData> deletedTimers;
-
-    /**
-     * Returns a TimerUpdate for a null key with no timers.
-     */
-    public static TimerUpdate empty() {
-      return new TimerUpdate(
-          null,
-          Collections.<TimerData>emptyList(),
-          Collections.<TimerData>emptyList(),
-          Collections.<TimerData>emptyList());
-    }
-
-    /**
-     * Creates a new {@link TimerUpdate} builder with the provided completed timers that needs the
-     * set and deleted timers to be added to it.
-     */
-    public static TimerUpdateBuilder builder(Object key) {
-      return new TimerUpdateBuilder(key);
-    }
-
-    /**
-     * A {@link TimerUpdate} builder that needs to be provided with set timers and deleted timers.
-     */
-    public static final class TimerUpdateBuilder {
-      private final Object key;
-      private final Collection<TimerData> completedTimers;
-      private final Collection<TimerData> setTimers;
-      private final Collection<TimerData> deletedTimers;
-
-      private TimerUpdateBuilder(Object key) {
-        this.key = key;
-        this.completedTimers = new HashSet<>();
-        this.setTimers = new HashSet<>();
-        this.deletedTimers = new HashSet<>();
-      }
-
-      /**
-       * Adds all of the provided timers to the collection of completed timers, and returns this
-       * {@link TimerUpdateBuilder}.
-       */
-      public TimerUpdateBuilder withCompletedTimers(Iterable<TimerData> completedTimers) {
-        Iterables.addAll(this.completedTimers, completedTimers);
-        return this;
-      }
-
-      /**
-       * Adds the provided timer to the collection of set timers, removing it from deleted timers if
-       * it has previously been deleted. Returns this {@link TimerUpdateBuilder}.
-       */
-      public TimerUpdateBuilder setTimer(TimerData setTimer) {
-        deletedTimers.remove(setTimer);
-        setTimers.add(setTimer);
-        return this;
-      }
-
-      /**
-       * Adds the provided timer to the collection of deleted timers, removing it from set timers if
-       * it has previously been set. Returns this {@link TimerUpdateBuilder}.
-       */
-      public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) {
-        deletedTimers.add(deletedTimer);
-        setTimers.remove(deletedTimer);
-        return this;
-      }
-
-      /**
-       * Returns a new {@link TimerUpdate} with the most recently set completedTimers, setTimers,
-       * and deletedTimers.
-       */
-      public TimerUpdate build() {
-        return new TimerUpdate(
-            key,
-            ImmutableSet.copyOf(completedTimers),
-            ImmutableSet.copyOf(setTimers),
-            ImmutableSet.copyOf(deletedTimers));
-      }
-    }
-
-    private TimerUpdate(
-        Object key,
-        Iterable<? extends TimerData> completedTimers,
-        Iterable<? extends TimerData> setTimers,
-        Iterable<? extends TimerData> deletedTimers) {
-      this.key = key;
-      this.completedTimers = completedTimers;
-      this.setTimers = setTimers;
-      this.deletedTimers = deletedTimers;
-    }
-
-    @VisibleForTesting
-    Object getKey() {
-      return key;
-    }
-
-    @VisibleForTesting
-    Iterable<? extends TimerData> getCompletedTimers() {
-      return completedTimers;
-    }
-
-    @VisibleForTesting
-    Iterable<? extends TimerData> getSetTimers() {
-      return setTimers;
-    }
-
-    @VisibleForTesting
-    Iterable<? extends TimerData> getDeletedTimers() {
-      return deletedTimers;
-    }
-
-    /**
-     * Returns a {@link TimerUpdate} that is like this one, but with the specified completed timers.
-     */
-    public TimerUpdate withCompletedTimers(Iterable<TimerData> completedTimers) {
-      return new TimerUpdate(this.key, completedTimers, setTimers, deletedTimers);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(key, completedTimers, setTimers, deletedTimers);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof TimerUpdate)) {
-        return false;
-      }
-      TimerUpdate that = (TimerUpdate) other;
-      return Objects.equals(this.key, that.key)
-          && Objects.equals(this.completedTimers, that.completedTimers)
-          && Objects.equals(this.setTimers, that.setTimers)
-          && Objects.equals(this.deletedTimers, that.deletedTimers);
-    }
-  }
-
-  /**
-   * A pair of {@link TimerData} and key which can be delivered to the appropriate
-   * {@link AppliedPTransform}. A timer fires at the transform that set it with a specific key when
-   * the time domain in which it lives progresses past a specified time, as determined by the
-   * {@link InMemoryWatermarkManager}.
-   */
-  public static class FiredTimers {
-    private final Map<TimeDomain, ? extends Collection<TimerData>> timers;
-
-    private FiredTimers(Map<TimeDomain, ? extends Collection<TimerData>> timers) {
-      this.timers = timers;
-    }
-
-    /**
-     * Gets all of the timers that have fired within the provided {@link TimeDomain}. If no timers
-     * fired within the provided domain, return an empty collection.
-     *
-     * <p>Timers within a {@link TimeDomain} are guaranteed to be in order of increasing timestamp.
-     */
-    public Collection<TimerData> getTimers(TimeDomain domain) {
-      Collection<TimerData> domainTimers = timers.get(domain);
-      if (domainTimers == null) {
-        return Collections.emptyList();
-      }
-      return domainTimers;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(FiredTimers.class).add("timers", timers).toString();
-    }
-  }
-
-  private static class WindowedValueByTimestampComparator extends Ordering<WindowedValue<?>> {
-    @Override
-    public int compare(WindowedValue<?> o1, WindowedValue<?> o2) {
-      return o1.getTimestamp().compareTo(o2.getTimestamp());
-    }
-  }
-
-  public Set<AppliedPTransform<?, ?, ?>> getCompletedTransforms() {
-    Set<AppliedPTransform<?, ?, ?>> result = new HashSet<>();
-    for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> wms :
-        transformToWatermarks.entrySet()) {
-      if (wms.getValue().getOutputWatermark().equals(THE_END_OF_TIME.get())) {
-        result.add(wms.getKey());
-      }
-    }
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java
deleted file mode 100644
index 2fb6536..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link UncommittedBundle} that buffers elements in memory.
- */
-public final class InProcessBundle<T> implements UncommittedBundle<T> {
-  private final PCollection<T> pcollection;
-  private final boolean keyed;
-  private final Object key;
-  private boolean committed = false;
-  private ImmutableList.Builder<WindowedValue<T>> elements;
-
-  /**
-   * Create a new {@link InProcessBundle} for the specified {@link PCollection} without a key.
-   */
-  public static <T> InProcessBundle<T> unkeyed(PCollection<T> pcollection) {
-    return new InProcessBundle<T>(pcollection, false, null);
-  }
-
-  /**
-   * Create a new {@link InProcessBundle} for the specified {@link PCollection} with the specified
-   * key.
-   *
-   * See {@link CommittedBundle#getKey()} and {@link CommittedBundle#isKeyed()} for more
-   * information.
-   */
-  public static <T> InProcessBundle<T> keyed(PCollection<T> pcollection, Object key) {
-    return new InProcessBundle<T>(pcollection, true, key);
-  }
-
-  private InProcessBundle(PCollection<T> pcollection, boolean keyed, Object key) {
-    this.pcollection = pcollection;
-    this.keyed = keyed;
-    this.key = key;
-    this.elements = ImmutableList.builder();
-  }
-
-  @Override
-  public PCollection<T> getPCollection() {
-    return pcollection;
-  }
-
-  @Override
-  public InProcessBundle<T> add(WindowedValue<T> element) {
-    checkState(!committed, "Can't add element %s to committed bundle %s", element, this);
-    elements.add(element);
-    return this;
-  }
-
-  @Override
-  public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) {
-    checkState(!committed, "Can't commit already committed bundle %s", this);
-    committed = true;
-    final Iterable<WindowedValue<T>> committedElements = elements.build();
-    return new CommittedBundle<T>() {
-      @Override
-      @Nullable
-      public Object getKey() {
-        return key;
-      }
-
-      @Override
-      public boolean isKeyed() {
-        return keyed;
-      }
-
-      @Override
-      public Iterable<WindowedValue<T>> getElements() {
-        return committedElements;
-      }
-
-      @Override
-      public PCollection<T> getPCollection() {
-        return pcollection;
-      }
-
-      @Override
-      public Instant getSynchronizedProcessingOutputWatermark() {
-        return synchronizedCompletionTime;
-      }
-
-      @Override
-      public String toString() {
-        return MoreObjects.toStringHelper(this)
-            .omitNullValues()
-            .add("pcollection", pcollection)
-            .add("key", key)
-            .add("elements", committedElements)
-            .toString();
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java
deleted file mode 100644
index 7ca1b60..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-/**
- * A factory that produces bundles that perform no additional validation.
- */
-class InProcessBundleFactory implements BundleFactory {
-  public static InProcessBundleFactory create() {
-    return new InProcessBundleFactory();
-  }
-
-  private InProcessBundleFactory() {}
-
-  @Override
-  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
-    return InProcessBundle.unkeyed(output);
-  }
-
-  @Override
-  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
-    return input.isKeyed()
-        ? InProcessBundle.keyed(output, input.getKey())
-        : InProcessBundle.unkeyed(output);
-  }
-
-  @Override
-  public <T> UncommittedBundle<T> createKeyedBundle(
-      CommittedBundle<?> input, Object key, PCollection<T> output) {
-    return InProcessBundle.keyed(output, key);
-  }
-
-  /**
-   * A {@link UncommittedBundle} that buffers elements in memory.
-   */
-  private static final class InProcessBundle<T> implements UncommittedBundle<T> {
-    private final PCollection<T> pcollection;
-    private final boolean keyed;
-    private final Object key;
-    private boolean committed = false;
-    private ImmutableList.Builder<WindowedValue<T>> elements;
-
-    /**
-     * Create a new {@link InProcessBundle} for the specified {@link PCollection} without a key.
-     */
-    public static <T> InProcessBundle<T> unkeyed(PCollection<T> pcollection) {
-      return new InProcessBundle<T>(pcollection, false, null);
-    }
-
-    /**
-     * Create a new {@link InProcessBundle} for the specified {@link PCollection} with the specified
-     * key.
-     *
-     * <p>See {@link CommittedBundle#getKey()} and {@link CommittedBundle#isKeyed()} for more
-     * information.
-     */
-    public static <T> InProcessBundle<T> keyed(PCollection<T> pcollection, Object key) {
-      return new InProcessBundle<T>(pcollection, true, key);
-    }
-
-    private InProcessBundle(PCollection<T> pcollection, boolean keyed, Object key) {
-      this.pcollection = pcollection;
-      this.keyed = keyed;
-      this.key = key;
-      this.elements = ImmutableList.builder();
-    }
-
-    @Override
-    public PCollection<T> getPCollection() {
-      return pcollection;
-    }
-
-    @Override
-    public InProcessBundle<T> add(WindowedValue<T> element) {
-      checkState(
-          !committed,
-          "Can't add element %s to committed bundle in PCollection %s",
-          element,
-          pcollection);
-      elements.add(element);
-      return this;
-    }
-
-    @Override
-    public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) {
-      checkState(!committed, "Can't commit already committed bundle %s", this);
-      committed = true;
-      final Iterable<WindowedValue<T>> committedElements = elements.build();
-      return new CommittedBundle<T>() {
-        @Override
-        @Nullable
-        public Object getKey() {
-          return key;
-        }
-
-        @Override
-        public boolean isKeyed() {
-          return keyed;
-        }
-
-        @Override
-        public Iterable<WindowedValue<T>> getElements() {
-          return committedElements;
-        }
-
-        @Override
-        public PCollection<T> getPCollection() {
-          return pcollection;
-        }
-
-        @Override
-        public Instant getSynchronizedProcessingOutputWatermark() {
-          return synchronizedCompletionTime;
-        }
-
-        @Override
-        public String toString() {
-          return MoreObjects.toStringHelper(this)
-              .omitNullValues()
-              .add("pcollection", pcollection)
-              .add("key", key)
-              .add("elements", committedElements)
-              .toString();
-        }
-      };
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleOutputManager.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleOutputManager.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleOutputManager.java
deleted file mode 100644
index de6a3dc..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleOutputManager.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import java.util.Map;
-
-/**
- * An {@link OutputManager} that outputs to {@link CommittedBundle Bundles} used by the
- * {@link InProcessPipelineRunner}.
- */
-public class InProcessBundleOutputManager implements OutputManager {
-  private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
-
-  public static InProcessBundleOutputManager create(
-      Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
-    return new InProcessBundleOutputManager(outputBundles);
-  }
-
-  public InProcessBundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
-    this.bundles = bundles;
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-    @SuppressWarnings("rawtypes")
-    UncommittedBundle bundle = bundles.get(tag);
-    bundle.add(output);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
deleted file mode 100644
index 28b4de7..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessCreate.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.OffsetBasedSource;
-import com.google.cloud.dataflow.sdk.io.OffsetBasedSource.OffsetBasedReader;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.Create.Values;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-/**
- * An in-process implementation of the {@link Values Create.Values} {@link PTransform}, implemented
- * using a {@link BoundedSource}.
- *
- * The coder is inferred via the {@link Values#getDefaultOutputCoder(PInput)} method on the original
- * transform.
- */
-class InProcessCreate<T> extends ForwardingPTransform<PInput, PCollection<T>> {
-  private final Create.Values<T> original;
-
-  /**
-   * A {@link PTransformOverrideFactory} for {@link InProcessCreate}.
-   */
-  public static class InProcessCreateOverrideFactory implements PTransformOverrideFactory {
-    @Override
-    public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
-        PTransform<InputT, OutputT> transform) {
-      if (transform instanceof Create.Values) {
-        @SuppressWarnings("unchecked")
-        PTransform<InputT, OutputT> override =
-            (PTransform<InputT, OutputT>) from((Create.Values<OutputT>) transform);
-        return override;
-      }
-      return transform;
-    }
-  }
-
-  public static <T> InProcessCreate<T> from(Create.Values<T> original) {
-    return new InProcessCreate<>(original);
-  }
-
-  private InProcessCreate(Values<T> original) {
-    this.original = original;
-  }
-
-  @Override
-  public PCollection<T> apply(PInput input) {
-    Coder<T> elementCoder;
-    try {
-      elementCoder = original.getDefaultOutputCoder(input);
-    } catch (CannotProvideCoderException e) {
-      throw new IllegalArgumentException(
-          "Unable to infer a coder and no Coder was specified. "
-          + "Please set a coder by invoking Create.withCoder() explicitly.",
-          e);
-    }
-    InMemorySource<T> source;
-    try {
-      source = InMemorySource.fromIterable(original.getElements(), elementCoder);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    PCollection<T> result = input.getPipeline().apply(Read.from(source));
-    result.setCoder(elementCoder);
-    return result;
-  }
-
-  @Override
-  public PTransform<PInput, PCollection<T>> delegate() {
-    return original;
-  }
-
-  @VisibleForTesting
-  static class InMemorySource<T> extends OffsetBasedSource<T> {
-    private final List<byte[]> allElementsBytes;
-    private final long totalSize;
-    private final Coder<T> coder;
-
-    public static <T> InMemorySource<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder)
-        throws CoderException, IOException {
-      ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder();
-      long totalSize = 0L;
-      for (T element : elements) {
-        byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
-        allElementsBytes.add(bytes);
-        totalSize += bytes.length;
-      }
-      return new InMemorySource<>(allElementsBytes.build(), totalSize, elemCoder);
-    }
-
-    /**
-     * Create a new source with the specified bytes. The new source owns the input element bytes,
-     * which must not be modified after this constructor is called.
-     */
-    private InMemorySource(List<byte[]> elementBytes, long totalSize, Coder<T> coder) {
-      super(0, elementBytes.size(), 1);
-      this.allElementsBytes = ImmutableList.copyOf(elementBytes);
-      this.totalSize = totalSize;
-      this.coder = coder;
-    }
-
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      return totalSize;
-    }
-
-    @Override
-    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-      return false;
-    }
-
-    @Override
-    public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
-      return new BytesReader<>(this);
-    }
-
-    @Override
-    public void validate() {}
-
-    @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return coder;
-    }
-
-    @Override
-    public long getMaxEndOffset(PipelineOptions options) throws Exception {
-      return allElementsBytes.size();
-    }
-
-    @Override
-    public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
-      List<byte[]> primaryElems = allElementsBytes.subList((int) start, (int) end);
-      long primarySizeEstimate =
-          (long) (totalSize * primaryElems.size() / (double) allElementsBytes.size());
-      return new InMemorySource<>(primaryElems, primarySizeEstimate, coder);
-    }
-
-    @Override
-    public long getBytesPerOffset() {
-      if (allElementsBytes.size() == 0) {
-        return 0L;
-      }
-      return totalSize / allElementsBytes.size();
-    }
-  }
-
-  private static class BytesReader<T> extends OffsetBasedReader<T> {
-    private int index;
-    /**
-     * Use an optional to distinguish between null next element (as Optional.absent()) and no next
-     * element (next is null).
-     */
-    @Nullable private Optional<T> next;
-
-    public BytesReader(InMemorySource<T> source) {
-      super(source);
-      index = -1;
-    }
-
-    @Override
-    @Nullable
-    public T getCurrent() throws NoSuchElementException {
-      if (next == null) {
-        throw new NoSuchElementException();
-      }
-      return next.orNull();
-    }
-
-    @Override
-    public void close() throws IOException {}
-
-    @Override
-    protected long getCurrentOffset() {
-      return index;
-    }
-
-    @Override
-    protected boolean startImpl() throws IOException {
-      return advanceImpl();
-    }
-
-    @Override
-    public synchronized InMemorySource<T> getCurrentSource() {
-      return (InMemorySource<T>) super.getCurrentSource();
-    }
-
-    @Override
-    protected boolean advanceImpl() throws IOException {
-      InMemorySource<T> source = getCurrentSource();
-      index++;
-      if (index >= source.allElementsBytes.size()) {
-        return false;
-      }
-      next =
-          Optional.fromNullable(
-              CoderUtils.decodeFromByteArray(
-                  source.coder, source.allElementsBytes.get(index)));
-      return true;
-    }
-  }
-}