You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/22 02:22:18 UTC

[04/12] incubator-beam git commit: Move some easy stuff into runners/core-java

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
deleted file mode 100644
index 864e8e7..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ /dev/null
@@ -1,985 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
-import org.apache.beam.sdk.util.ReduceFnContextFactory.OnTriggerCallbacks;
-import org.apache.beam.sdk.util.ReduceFnContextFactory.StateStyle;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * Manages the execution of a {@link ReduceFn} after a {@link GroupByKeyOnly} has partitioned the
- * {@link PCollection} by key.
- *
- * <p>The {@link #onTrigger} relies on a {@link TriggerRunner} to manage the execution of
- * the triggering logic. The {@code ReduceFnRunner}s responsibilities are:
- *
- * <ul>
- * <li>Tracking the windows that are active (have buffered data) as elements arrive and
- * triggers are fired.
- * <li>Holding the watermark based on the timestamps of elements in a pane and releasing it
- * when the trigger fires.
- * <li>Calling the appropriate callbacks on {@link ReduceFn} based on trigger execution, timer
- * firings, etc, and providing appropriate contexts to the {@link ReduceFn} for actions
- * such as output.
- * <li>Scheduling garbage collection of state associated with a specific window, and making that
- * happen when the appropriate timer fires.
- * </ul>
- *
- * @param <K>       The type of key being processed.
- * @param <InputT>  The type of values associated with the key.
- * @param <OutputT> The output type that will be produced for each key.
- * @param <W>       The type of windows this operates on.
- */
-public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
-
-  /**
-   * The {@link ReduceFnRunner} depends on most aspects of the {@link WindowingStrategy}.
-   *
-   * <ul>
-   * <li>It runs the trigger from the {@link WindowingStrategy}.</li>
-   * <li>It merges windows according to the {@link WindowingStrategy}.</li>
-   * <li>It chooses how to track active windows and clear out expired windows
-   * according to the {@link WindowingStrategy}, based on the allowed lateness and
-   * whether windows can merge.</li>
-   * <li>It decides whether to emit empty final panes according to whether the
-   * {@link WindowingStrategy} requires it.<li>
-   * <li>It uses discarding or accumulation mode according to the {@link WindowingStrategy}.</li>
-   * </ul>
-   */
-  private final WindowingStrategy<Object, W> windowingStrategy;
-
-  private final OutputWindowedValue<KV<K, OutputT>> outputter;
-
-  private final StateInternals<K> stateInternals;
-
-  private final Aggregator<Long, Long> droppedDueToClosedWindow;
-
-  private final K key;
-
-  /**
-   * Track which windows are still active and the 'state address' windows which hold their state.
-   *
-   * <ul>
-   * <li>State: Global map for all active windows for this computation and key.
-   * <li>Lifetime: Cleared when no active windows need to be tracked. A window lives within
-   * the active window set until its trigger is closed or the window is garbage collected.
-   * </ul>
-   */
-  private final ActiveWindowSet<W> activeWindows;
-
-  /**
-   * Always a {@link SystemReduceFn}.
-   *
-   * <ul>
-   * <li>State: A bag of accumulated values, or the intermediate result of a combiner.
-   * <li>State style: RENAMED
-   * <li>Merging: Concatenate or otherwise combine the state from each merged window.
-   * <li>Lifetime: Cleared when a pane fires if DISCARDING_FIRED_PANES. Otherwise cleared
-   * when trigger is finished or when the window is garbage collected.
-   * </ul>
-   */
-  private final ReduceFn<K, InputT, OutputT, W> reduceFn;
-
-  /**
-   * Manage the setting and firing of timer events.
-   *
-   * <ul>
-   * <li>Merging: End-of-window and garbage collection timers are cancelled when windows are
-   * merged away. Timers created by triggers are never garbage collected and are left to
-   * fire and be ignored.
-   * <li>Lifetime: Timers automatically disappear after they fire.
-   * </ul>
-   */
-  private final TimerInternals timerInternals;
-
-  /**
-   * Manage the execution and state for triggers.
-   *
-   * <ul>
-   * <li>State: Tracks which sub-triggers have finished, and any additional state needed to
-   * determine when the trigger should fire.
-   * <li>State style: DIRECT
-   * <li>Merging: Finished bits are explicitly managed. Other state is eagerly merged as
-   * needed.
-   * <li>Lifetime: Most trigger state is cleared when the final pane is emitted. However
-   * the finished bits are left behind and must be cleared when the window is
-   * garbage collected.
-   * </ul>
-   */
-  private final TriggerRunner<W> triggerRunner;
-
-  /**
-   * Store the output watermark holds for each window.
-   *
-   * <ul>
-   * <li>State: Bag of hold timestamps.
-   * <li>State style: RENAMED
-   * <li>Merging: Depending on {@link OutputTimeFn}, may need to be recalculated on merging.
-   * When a pane fires it may be necessary to add (back) an end-of-window or garbage collection
-   * hold.
-   * <li>Lifetime: Cleared when a pane fires or when the window is garbage collected.
-   * </ul>
-   */
-  private final WatermarkHold<W> watermarkHold;
-
-  private final ReduceFnContextFactory<K, InputT, OutputT, W> contextFactory;
-
-  /**
-   * Store the previously emitted pane (if any) for each window.
-   *
-   * <ul>
-   * <li>State: The previous {@link PaneInfo} passed to the user's {@link DoFn#processElement},
-   * if any.
-   * <li>Style style: DIRECT
-   * <li>Merging: Always keyed by actual window, so does not depend on {@link #activeWindows}.
-   * Cleared when window is merged away.
-   * <li>Lifetime: Cleared when trigger is closed or window is garbage collected.
-   * </ul>
-   */
-  private final PaneInfoTracker paneInfoTracker;
-
-  /**
-   * Store whether we've seen any elements for a window since the last pane was emitted.
-   *
-   * <ul>
-   * <li>State: Unless DISCARDING_FIRED_PANES, a count of number of elements added so far.
-   * <li>State style: RENAMED.
-   * <li>Merging: Counts are summed when windows are merged.
-   * <li>Lifetime: Cleared when pane fires or window is garbage collected.
-   * </ul>
-   */
-  private final NonEmptyPanes<K, W> nonEmptyPanes;
-
-  public ReduceFnRunner(
-      K key,
-      WindowingStrategy<?, W> windowingStrategy,
-      StateInternals<K> stateInternals,
-      TimerInternals timerInternals,
-      WindowingInternals<?, KV<K, OutputT>> windowingInternals,
-      Aggregator<Long, Long> droppedDueToClosedWindow,
-      ReduceFn<K, InputT, OutputT, W> reduceFn,
-      PipelineOptions options) {
-    this.key = key;
-    this.timerInternals = timerInternals;
-    this.paneInfoTracker = new PaneInfoTracker(timerInternals);
-    this.stateInternals = stateInternals;
-    this.outputter = new OutputViaWindowingInternals<>(windowingInternals);
-    this.droppedDueToClosedWindow = droppedDueToClosedWindow;
-    this.reduceFn = reduceFn;
-
-    @SuppressWarnings("unchecked")
-    WindowingStrategy<Object, W> objectWindowingStrategy =
-        (WindowingStrategy<Object, W>) windowingStrategy;
-    this.windowingStrategy = objectWindowingStrategy;
-
-    this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, this.reduceFn);
-
-    // Note this may incur I/O to load persisted window set data.
-    this.activeWindows = createActiveWindowSet();
-
-    this.contextFactory =
-        new ReduceFnContextFactory<K, InputT, OutputT, W>(key, reduceFn, this.windowingStrategy,
-            stateInternals, this.activeWindows, timerInternals, windowingInternals, options);
-
-    this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
-    this.triggerRunner =
-        new TriggerRunner<>(
-            windowingStrategy.getTrigger(),
-            new TriggerContextFactory<>(windowingStrategy, stateInternals, activeWindows));
-  }
-
-  private ActiveWindowSet<W> createActiveWindowSet() {
-    return windowingStrategy.getWindowFn().isNonMerging()
-        ? new NonMergingActiveWindowSet<W>()
-        : new MergingActiveWindowSet<W>(windowingStrategy.getWindowFn(), stateInternals);
-  }
-
-  @VisibleForTesting
-  boolean isFinished(W window) {
-    return triggerRunner.isClosed(contextFactory.base(window, StateStyle.DIRECT).state());
-  }
-
-  @VisibleForTesting
-  boolean hasNoActiveWindows() {
-    return activeWindows.getActiveAndNewWindows().isEmpty();
-  }
-
-  /**
-   * Incorporate {@code values} into the underlying reduce function, and manage holds, timers,
-   * triggers, and window merging.
-   *
-   * <p>The general strategy is:
-   * <ol>
-   * <li>Use {@link WindowedValue#getWindows} (itself determined using
-   * {@link WindowFn#assignWindows}) to determine which windows each element belongs to. Some
-   * of those windows will already have state associated with them. The rest are considered
-   * NEW.
-   * <li>Use {@link WindowFn#mergeWindows} to attempt to merge currently ACTIVE and NEW windows.
-   * Each NEW window will become either ACTIVE or be discardedL.
-   * (See {@link ActiveWindowSet} for definitions of these terms.)
-   * <li>If at all possible, eagerly substitute NEW windows with their ACTIVE state address
-   * windows before any state is associated with the NEW window. In the common case that
-   * windows for new elements are merged into existing ACTIVE windows then no additional
-   * storage or merging overhead will be incurred.
-   * <li>Otherwise, keep track of the state address windows for ACTIVE windows so that their
-   * states can be merged on-demand when a pane fires.
-   * <li>Process the element for each of the windows it's windows have been merged into according
-   * to {@link ActiveWindowSet}. Processing may require running triggers, setting timers,
-   * setting holds, and invoking {@link ReduceFn#onTrigger}.
-   * </ol>
-   */
-  public void processElements(Iterable<WindowedValue<InputT>> values) throws Exception {
-    // If an incoming element introduces a new window, attempt to merge it into an existing
-    // window eagerly.
-    Map<W, W> windowToMergeResult = collectAndMergeWindows(values);
-
-    Set<W> windowsToConsider = new HashSet<>();
-
-    // Process each element, using the updated activeWindows determined by collectAndMergeWindows.
-    for (WindowedValue<InputT> value : values) {
-      windowsToConsider.addAll(processElement(windowToMergeResult, value));
-    }
-
-    // Trigger output from any window for which the trigger is ready
-    for (W mergedWindow : windowsToConsider) {
-      ReduceFn<K, InputT, OutputT, W>.Context directContext =
-          contextFactory.base(mergedWindow, StateStyle.DIRECT);
-      ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
-          contextFactory.base(mergedWindow, StateStyle.RENAMED);
-      triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
-      emitIfAppropriate(directContext, renamedContext);
-    }
-
-    // We're all done with merging and emitting elements so can compress the activeWindow state.
-    // Any windows which are still NEW must have come in on a new element which was then discarded
-    // due to the window's trigger being closed. We can thus delete them.
-    activeWindows.cleanupTemporaryWindows();
-  }
-
-  public void persist() {
-    activeWindows.persist();
-  }
-
-  /**
-   * Extract the windows associated with the values, and invoke merge. Return a map
-   * from windows to the merge result window. If a window is not in the domain of
-   * the result map then it did not get merged into a different window.
-   */
-  private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> values)
-      throws Exception {
-    // No-op if no merging can take place
-    if (windowingStrategy.getWindowFn().isNonMerging()) {
-      return ImmutableMap.of();
-    }
-
-    // Collect the windows from all elements (except those which are too late) and
-    // make sure they are already in the active window set or are added as NEW windows.
-    for (WindowedValue<?> value : values) {
-      for (BoundedWindow untypedWindow : value.getWindows()) {
-        @SuppressWarnings("unchecked")
-        W window = (W) untypedWindow;
-
-        // For backwards compat with pre 1.4 only.
-        // We may still have ACTIVE windows with multiple state addresses, representing
-        // a window who's state has not yet been eagerly merged.
-        // We'll go ahead and merge that state now so that we don't have to worry about
-        // this legacy case anywhere else.
-        if (activeWindows.isActive(window)) {
-          Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
-          if (stateAddressWindows.size() > 1) {
-            // This is a legacy window who's state has not been eagerly merged.
-            // Do that now.
-            ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
-                contextFactory.forPremerge(window);
-            reduceFn.onMerge(premergeContext);
-            watermarkHold.onMerge(premergeContext);
-            activeWindows.merged(window);
-          }
-        }
-
-        // Add this window as NEW if it is not currently ACTIVE.
-        // If we had already seen this window and closed its trigger, then the
-        // window will not be currently ACTIVE. It will then be added as NEW here,
-        // and fall into the merging logic as usual.
-        activeWindows.ensureWindowExists(window);
-      }
-    }
-
-    // Merge all of the active windows and retain a mapping from source windows to result windows.
-    Map<W, W> windowToMergeResult = new HashMap<>();
-    activeWindows.merge(new OnMergeCallback(windowToMergeResult));
-    return windowToMergeResult;
-  }
-
-  private class OnMergeCallback implements ActiveWindowSet.MergeCallback<W> {
-    private final Map<W, W> windowToMergeResult;
-
-    OnMergeCallback(Map<W, W> windowToMergeResult) {
-      this.windowToMergeResult = windowToMergeResult;
-    }
-
-    /**
-     * Return the subset of {@code windows} which are currently ACTIVE. We only need to worry
-     * about merging state from ACTIVE windows. NEW windows by definition have no existing state.
-     */
-    private List<W> activeWindows(Iterable<W> windows) {
-      List<W> active = new ArrayList<>();
-      for (W window : windows) {
-        if (activeWindows.isActive(window)) {
-          active.add(window);
-        }
-      }
-      return active;
-    }
-
-    /**
-     * Called from the active window set to indicate {@code toBeMerged} (of which only
-     * {@code activeToBeMerged} are ACTIVE and thus have state associated with them) will later
-     * be merged into {@code mergeResult}.
-     */
-    @Override
-    public void prefetchOnMerge(
-        Collection<W> toBeMerged, W mergeResult) throws Exception {
-      List<W> activeToBeMerged = activeWindows(toBeMerged);
-      ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
-          contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT);
-      ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
-          contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED);
-
-      // Prefetch various state.
-      triggerRunner.prefetchForMerge(mergeResult, activeToBeMerged, directMergeContext.state());
-      reduceFn.prefetchOnMerge(renamedMergeContext.state());
-      watermarkHold.prefetchOnMerge(renamedMergeContext.state());
-      nonEmptyPanes.prefetchOnMerge(renamedMergeContext.state());
-    }
-
-    /**
-     * Called from the active window set to indicate {@code toBeMerged} (of which only
-     * {@code activeToBeMerged} are ACTIVE and thus have state associated with them) are about
-     * to be merged into {@code mergeResult}.
-     */
-    @Override
-    public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
-      // Remember we have merged these windows.
-      for (W window : toBeMerged) {
-        windowToMergeResult.put(window, mergeResult);
-      }
-
-      // At this point activeWindows has NOT incorporated the results of the merge.
-      List<W> activeToBeMerged = activeWindows(toBeMerged);
-      ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
-          contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT);
-      ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
-          contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED);
-
-      // Run the reduceFn to perform any needed merging.
-      reduceFn.onMerge(renamedMergeContext);
-
-      // Merge the watermark holds.
-      watermarkHold.onMerge(renamedMergeContext);
-
-      // Merge non-empty pane state.
-      nonEmptyPanes.onMerge(renamedMergeContext.state());
-
-      // Have the trigger merge state as needed.
-      triggerRunner.onMerge(
-          directMergeContext.window(), directMergeContext.timers(), directMergeContext.state());
-
-      for (W active : activeToBeMerged) {
-        if (active.equals(mergeResult)) {
-          // Not merged away.
-          continue;
-        }
-        // Cleanup flavor A: Currently ACTIVE window is about to be merged away.
-        // Clear any state not already cleared by the onMerge calls above.
-        WindowTracing.debug("ReduceFnRunner.onMerge: Merging {} into {}", active, mergeResult);
-        ReduceFn<K, InputT, OutputT, W>.Context directClearContext =
-            contextFactory.base(active, StateStyle.DIRECT);
-        // No need for the end-of-window or garbage collection timers.
-        // We will establish a new end-of-window or garbage collection timer for the mergeResult
-        // window in processElement below. There must be at least one element for the mergeResult
-        // window since a new element with a new window must have triggered this onMerge.
-        cancelEndOfWindowAndGarbageCollectionTimers(directClearContext);
-        // We no longer care about any previous panes of merged away windows. The
-        // merge result window gets to start fresh if it is new.
-        paneInfoTracker.clear(directClearContext.state());
-      }
-    }
-  }
-
-  /**
-   * Process an element.
-   *
-   * @param value the value being processed
-   * @return the set of windows in which the element was actually processed
-   */
-  private Collection<W> processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value)
-      throws Exception {
-    // Redirect element windows to the ACTIVE windows they have been merged into.
-    // The compressed representation (value, {window1, window2, ...}) actually represents
-    // distinct elements (value, window1), (value, window2), ...
-    // so if window1 and window2 merge, the resulting window will contain both copies
-    // of the value.
-    Collection<W> windows = new ArrayList<>();
-    for (BoundedWindow untypedWindow : value.getWindows()) {
-      @SuppressWarnings("unchecked")
-      W window = (W) untypedWindow;
-      W mergeResult = windowToMergeResult.get(window);
-      if (mergeResult == null) {
-        mergeResult = window;
-      }
-      windows.add(mergeResult);
-    }
-
-    // Prefetch in each of the windows if we're going to need to process triggers
-    for (W window : windows) {
-      ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
-          window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
-      triggerRunner.prefetchForValue(window, directContext.state());
-    }
-
-    // Process the element for each (mergeResultWindow, not closed) window it belongs to.
-    List<W> triggerableWindows = new ArrayList<>(windows.size());
-    for (W window : windows) {
-      ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
-          window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
-      if (triggerRunner.isClosed(directContext.state())) {
-        // This window has already been closed.
-        droppedDueToClosedWindow.addValue(1L);
-        WindowTracing.debug(
-            "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
-            + "since window is no longer active at inputWatermark:{}; outputWatermark:{}",
-            value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
-            timerInternals.currentOutputWatermarkTime());
-        continue;
-      }
-
-      triggerableWindows.add(window);
-      activeWindows.ensureWindowIsActive(window);
-      ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue(
-          window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
-
-      nonEmptyPanes.recordContent(renamedContext.state());
-
-      // Make sure we've scheduled the end-of-window or garbage collection timer for this window.
-      Instant timer = scheduleEndOfWindowOrGarbageCollectionTimer(directContext);
-
-      // Hold back progress of the output watermark until we have processed the pane this
-      // element will be included within. If the element is too late for that, place a hold at
-      // the end-of-window or garbage collection time to allow empty panes to contribute elements
-      // which won't be dropped due to lateness by a following computation (assuming the following
-      // computation uses the same allowed lateness value...)
-      @Nullable Instant hold = watermarkHold.addHolds(renamedContext);
-
-      if (hold != null) {
-        // Assert that holds have a proximate timer.
-        boolean holdInWindow = !hold.isAfter(window.maxTimestamp());
-        boolean timerInWindow = !timer.isAfter(window.maxTimestamp());
-        Preconditions.checkState(
-            holdInWindow == timerInWindow,
-            "set a hold at %s, a timer at %s, which disagree as to whether they are in window %s",
-            hold,
-            timer,
-            directContext.window());
-      }
-
-      // Execute the reduceFn, which will buffer the value as appropriate
-      reduceFn.processValue(renamedContext);
-
-      // Run the trigger to update its state
-      triggerRunner.processValue(
-          directContext.window(),
-          directContext.timestamp(),
-          directContext.timers(),
-          directContext.state());
-
-      // At this point, if triggerRunner.shouldFire before the processValue then
-      // triggerRunner.shouldFire after the processValue. In other words adding values
-      // cannot take a trigger state from firing to non-firing.
-      // (We don't actually assert this since it is too slow.)
-    }
-
-    return triggerableWindows;
-  }
-
-  /**
-   * Called when an end-of-window, garbage collection, or trigger-specific timer fires.
-   */
-  public void onTimer(TimerData timer) throws Exception {
-    // Which window is the timer for?
-    Preconditions.checkArgument(timer.getNamespace() instanceof WindowNamespace,
-        "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
-    @SuppressWarnings("unchecked")
-    WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
-    W window = windowNamespace.getWindow();
-    ReduceFn<K, InputT, OutputT, W>.Context directContext =
-        contextFactory.base(window, StateStyle.DIRECT);
-    ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
-        contextFactory.base(window, StateStyle.RENAMED);
-
-    // Has this window had its trigger finish?
-    // - The trigger may implement isClosed as constant false.
-    // - If the window function does not support windowing then all windows will be considered
-    // active.
-    // So we must take conjunction of activeWindows and triggerRunner state.
-    boolean windowIsActiveAndOpen =
-        activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state());
-
-    if (!windowIsActiveAndOpen) {
-      WindowTracing.debug(
-          "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
-    }
-
-    // If this is an end-of-window timer then we may need to set a garbage collection timer
-    // if allowed lateness is non-zero.
-    boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
-        && timer.getTimestamp().equals(window.maxTimestamp());
-
-    // If this is a garbage collection timer then we should trigger and garbage collect the window.
-    // We'll consider any timer at or after the end-of-window time to be a signal to garbage
-    // collect.
-    Instant cleanupTime = garbageCollectionTime(window);
-    boolean isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain()
-        && !timer.getTimestamp().isBefore(cleanupTime);
-
-    if (isGarbageCollection) {
-      WindowTracing.debug(
-          "ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
-          + "inputWatermark:{}; outputWatermark:{}",
-          key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
-          timerInternals.currentOutputWatermarkTime());
-
-      if (windowIsActiveAndOpen) {
-        // We need to call onTrigger to emit the final pane if required.
-        // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
-        // and the watermark has passed the end of the window.
-        @Nullable Instant newHold =
-            onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
-        Preconditions.checkState(newHold == null,
-            "Hold placed at %s despite isFinished being true.", newHold);
-      }
-
-      // Cleanup flavor B: Clear all the remaining state for this window since we'll never
-      // see elements for it again.
-      clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
-    } else {
-      WindowTracing.debug(
-          "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
-          + "inputWatermark:{}; outputWatermark:{}",
-          key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
-          timerInternals.currentOutputWatermarkTime());
-      if (windowIsActiveAndOpen) {
-        emitIfAppropriate(directContext, renamedContext);
-      }
-
-      if (isEndOfWindow) {
-        // If the window strategy trigger includes a watermark trigger then at this point
-        // there should be no data holds, either because we'd already cleared them on an
-        // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate.
-        // We could assert this but it is very expensive.
-
-        // Since we are processing an on-time firing we should schedule the garbage collection
-        // timer. (If getAllowedLateness is zero then the timer event will be considered a
-        // cleanup event and handled by the above).
-        // Note we must do this even if the trigger is finished so that we are sure to cleanup
-        // any final trigger finished bits.
-        Preconditions.checkState(
-            windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
-            "Unexpected zero getAllowedLateness");
-        WindowTracing.debug(
-            "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
-            + "inputWatermark:{}; outputWatermark:{}",
-            key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
-            timerInternals.currentOutputWatermarkTime());
-        Preconditions.checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
-                                 "Cleanup time %s is beyond end-of-time", cleanupTime);
-        directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
-      }
-    }
-  }
-
-  /**
-   * Clear all the state associated with {@code context}'s window.
-   * Should only be invoked if we know all future elements for this window will be considered
-   * beyond allowed lateness.
-   * This is a superset of the clearing done by {@link #emitIfAppropriate} below since:
-   * <ol>
-   * <li>We can clear the trigger finished bits since we'll never need to ask if the trigger is
-   * closed again.
-   * <li>We can clear any remaining garbage collection hold.
-   * </ol>
-   */
-  private void clearAllState(
-      ReduceFn<K, InputT, OutputT, W>.Context directContext,
-      ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
-      boolean windowIsActiveAndOpen)
-      throws Exception {
-    if (windowIsActiveAndOpen) {
-      // Since both the window is in the active window set AND the trigger was not yet closed,
-      // it is possible we still have state.
-      reduceFn.clearState(renamedContext);
-      watermarkHold.clearHolds(renamedContext);
-      nonEmptyPanes.clearPane(renamedContext.state());
-      // These calls work irrespective of whether the window is active or not, but
-      // are unnecessary if the window is not active.
-      triggerRunner.clearState(
-          directContext.window(), directContext.timers(), directContext.state());
-      paneInfoTracker.clear(directContext.state());
-    } else {
-      // If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2).
-      // For (1), if !activeWindows.isActive then the window must be merging and has been
-      // explicitly removed by emitIfAppropriate. But in that case the trigger must have fired
-      // and been closed, so this case reduces to (2).
-      // For (2), if triggerRunner.isClosed then the trigger was fired and entered the
-      // closed state. In that case emitIfAppropriate will have cleared all state in
-      // reduceFn, triggerRunner (except for finished bits), paneInfoTracker and activeWindows.
-      // We also know nonEmptyPanes must have been unconditionally cleared by the trigger.
-      // Since the trigger fired the existing watermark holds must have been cleared, and since
-      // the trigger closed no new end of window or garbage collection hold will have been
-      // placed by WatermarkHold.extractAndRelease.
-      // Thus all the state clearing above is unnecessary.
-      //
-      // But(!) for backwards compatibility we must allow a pipeline to be updated from
-      // an sdk version <= 1.3. In that case it is possible we have an end-of-window or
-      // garbage collection hold keyed by the current window (reached via directContext) rather
-      // than the state address window (reached via renamedContext).
-      // However this can only happen if:
-      // - We have merging windows.
-      // - We are DISCARDING_FIRED_PANES.
-      // - A pane has fired.
-      // - But the trigger is not (yet) closed.
-      if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES
-          && !windowingStrategy.getWindowFn().isNonMerging()) {
-        watermarkHold.clearHolds(directContext);
-      }
-    }
-
-    // Don't need to track address state windows anymore.
-    activeWindows.remove(directContext.window());
-    // We'll never need to test for the trigger being closed again.
-    triggerRunner.clearFinished(directContext.state());
-  }
-
-  /** Should the reduce function state be cleared? */
-  private boolean shouldDiscardAfterFiring(boolean isFinished) {
-    if (isFinished) {
-      // This is the last firing for trigger.
-      return true;
-    }
-    if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
-      // Nothing should be accumulated between panes.
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state.
-   */
-  private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directContext,
-      ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
-      throws Exception {
-    if (!triggerRunner.shouldFire(
-        directContext.window(), directContext.timers(), directContext.state())) {
-      // Ignore unless trigger is ready to fire
-      return;
-    }
-
-    // Inform the trigger of the transition to see if it is finished
-    triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state());
-    boolean isFinished = triggerRunner.isClosed(directContext.state());
-
-    // Will be able to clear all element state after triggering?
-    boolean shouldDiscard = shouldDiscardAfterFiring(isFinished);
-
-    // Run onTrigger to produce the actual pane contents.
-    // As a side effect it will clear all element holds, but not necessarily any
-    // end-of-window or garbage collection holds.
-    onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/);
-
-    // Now that we've triggered, the pane is empty.
-    nonEmptyPanes.clearPane(renamedContext.state());
-
-    // Cleanup buffered data if appropriate
-    if (shouldDiscard) {
-      // Cleanup flavor C: The user does not want any buffered data to persist between panes.
-      reduceFn.clearState(renamedContext);
-    }
-
-    if (isFinished) {
-      // Cleanup flavor D: If trigger is closed we will ignore all new incoming elements.
-      // Clear state not otherwise cleared by onTrigger and clearPane above.
-      // Remember the trigger is, indeed, closed until the window is garbage collected.
-      triggerRunner.clearState(
-          directContext.window(), directContext.timers(), directContext.state());
-      paneInfoTracker.clear(directContext.state());
-      activeWindows.remove(directContext.window());
-    }
-  }
-
-  /**
-   * Do we need to emit a pane?
-   */
-  private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) {
-    if (!isEmpty) {
-      // The pane has elements.
-      return true;
-    }
-    if (timing == Timing.ON_TIME) {
-      // This is the unique ON_TIME pane.
-      return true;
-    }
-    if (isFinished && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_ALWAYS) {
-      // This is known to be the final pane, and the user has requested it even when empty.
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Run the {@link ReduceFn#onTrigger} method and produce any necessary output.
-   *
-   * @return output watermark hold added, or {@literal null} if none.
-   */
-  @Nullable
-  private Instant onTrigger(
-      final ReduceFn<K, InputT, OutputT, W>.Context directContext,
-      ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
-      boolean isFinished, boolean isEndOfWindow)
-          throws Exception {
-    Instant inputWM = timerInternals.currentInputWatermarkTime();
-
-    // Prefetch necessary states
-    ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
-        watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
-    ReadableState<PaneInfo> paneFuture =
-        paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
-    ReadableState<Boolean> isEmptyFuture =
-        nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
-
-    reduceFn.prefetchOnTrigger(directContext.state());
-    triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
-
-    // Calculate the pane info.
-    final PaneInfo pane = paneFuture.read();
-    // Extract the window hold, and as a side effect clear it.
-
-    WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
-    final Instant outputTimestamp = pair.oldHold;
-    @Nullable Instant newHold = pair.newHold;
-
-    if (newHold != null) {
-      // We can't be finished yet.
-      Preconditions.checkState(
-        !isFinished, "new hold at %s but finished %s", newHold, directContext.window());
-      // The hold cannot be behind the input watermark.
-      Preconditions.checkState(
-        !newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM);
-      if (newHold.isAfter(directContext.window().maxTimestamp())) {
-        // The hold must be for garbage collection, which can't have happened yet.
-        Preconditions.checkState(
-          newHold.isEqual(garbageCollectionTime(directContext.window())),
-          "new hold %s should be at garbage collection for window %s plus %s",
-          newHold,
-          directContext.window(),
-          windowingStrategy.getAllowedLateness());
-      } else {
-        // The hold must be for the end-of-window, which can't have happened yet.
-        Preconditions.checkState(
-          newHold.isEqual(directContext.window().maxTimestamp()),
-          "new hold %s should be at end of window %s",
-          newHold,
-          directContext.window());
-        Preconditions.checkState(
-          !isEndOfWindow,
-          "new hold at %s for %s but this is the watermark trigger",
-          newHold,
-          directContext.window());
-      }
-    }
-
-    // Only emit a pane if it has data or empty panes are observable.
-    if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
-      // Run reduceFn.onTrigger method.
-      final List<W> windows = Collections.singletonList(directContext.window());
-      ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
-          contextFactory.forTrigger(directContext.window(), paneFuture, StateStyle.RENAMED,
-              new OnTriggerCallbacks<OutputT>() {
-                @Override
-                public void output(OutputT toOutput) {
-                  // We're going to output panes, so commit the (now used) PaneInfo.
-                  // TODO: This is unnecessary if the trigger isFinished since the saved
-                  // state will be immediately deleted.
-                  paneInfoTracker.storeCurrentPaneInfo(directContext, pane);
-
-                  // Output the actual value.
-                  outputter.outputWindowedValue(
-                      KV.of(key, toOutput), outputTimestamp, windows, pane);
-                }
-              });
-
-      reduceFn.onTrigger(renamedTriggerContext);
-    }
-
-    return newHold;
-  }
-
-  /**
-   * Make sure we'll eventually have a timer fire which will tell us to garbage collect
-   * the window state. For efficiency we may need to do this in two steps rather
-   * than one. Return the time at which the timer will fire.
-   *
-   * <ul>
-   * <li>If allowedLateness is zero then we'll garbage collect at the end of the window.
-   * For simplicity we'll set our own timer for this situation even though an
-   * {@link AfterWatermark} trigger may have also set an end-of-window timer.
-   * ({@code setTimer} is idempotent.)
-   * <li>If allowedLateness is non-zero then we could just always set a timer for the garbage
-   * collection time. However if the windows are large (eg hourly) and the allowedLateness is small
-   * (eg seconds) then we'll end up with nearly twice the number of timers in-flight. So we
-   * instead set an end-of-window timer and then roll that forward to a garbage collection timer
-   * when it fires. We use the input watermark to distinguish those cases.
-   * </ul>
-   */
-  private Instant scheduleEndOfWindowOrGarbageCollectionTimer(
-      ReduceFn<?, ?, ?, W>.Context directContext) {
-    Instant inputWM = timerInternals.currentInputWatermarkTime();
-    Instant endOfWindow = directContext.window().maxTimestamp();
-    String which;
-    Instant timer;
-    if (endOfWindow.isBefore(inputWM)) {
-      timer = garbageCollectionTime(directContext.window());
-      which = "garbage collection";
-    } else {
-      timer = endOfWindow;
-      which = "end-of-window";
-    }
-    WindowTracing.trace(
-        "ReduceFnRunner.scheduleEndOfWindowOrGarbageCollectionTimer: Scheduling {} timer at {} for "
-        + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
-        which,
-        timer,
-        key,
-        directContext.window(),
-        inputWM,
-        timerInternals.currentOutputWatermarkTime());
-    Preconditions.checkState(!timer.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
-                             "Timer %s is beyond end-of-time", timer);
-    directContext.timers().setTimer(timer, TimeDomain.EVENT_TIME);
-    return timer;
-  }
-
-  private void cancelEndOfWindowAndGarbageCollectionTimers(
-      ReduceFn<?, ?, ?, W>.Context directContext) {
-    WindowTracing.debug(
-        "ReduceFnRunner.cancelEndOfWindowAndGarbageCollectionTimers: Deleting timers for "
-        + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
-        key, directContext.window(), timerInternals.currentInputWatermarkTime(),
-        timerInternals.currentOutputWatermarkTime());
-    Instant eow = directContext.window().maxTimestamp();
-    directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
-    Instant gc = garbageCollectionTime(directContext.window());
-    if (gc.isAfter(eow)) {
-      directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
-    }
-  }
-
-  /**
-   * Return when {@code window} should be garbage collected. If the window's expiration time is on
-   * or after the end of the global window, it will be truncated to the end of the global window.
-   */
-  private Instant garbageCollectionTime(W window) {
-
-    // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the
-    // global window, then we truncate it. The conditional is phrased like it is because the
-    // addition of EOW + allowed lateness might even overflow the maximum allowed Instant
-    if (GlobalWindow.INSTANCE
-        .maxTimestamp()
-        .minus(windowingStrategy.getAllowedLateness())
-        .isBefore(window.maxTimestamp())) {
-      return GlobalWindow.INSTANCE.maxTimestamp();
-    } else {
-      return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
-    }
-  }
-
-  /**
-   * An object that can output a value with all of its windowing information. This is a deliberately
-   * restricted subinterface of {@link WindowingInternals} to express how it is used here.
-   */
-  private interface OutputWindowedValue<OutputT> {
-    void outputWindowedValue(OutputT output, Instant timestamp,
-        Collection<? extends BoundedWindow> windows, PaneInfo pane);
-  }
-
-  private static class OutputViaWindowingInternals<OutputT>
-      implements OutputWindowedValue<OutputT> {
-
-    private final WindowingInternals<?, OutputT> windowingInternals;
-
-    public OutputViaWindowingInternals(WindowingInternals<?, OutputT> windowingInternals) {
-      this.windowingInternals = windowingInternals;
-    }
-
-    @Override
-    public void outputWindowedValue(
-        OutputT output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      windowingInternals.outputWindowedValue(output, timestamp, windows, pane);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java
new file mode 100644
index 0000000..d129c8e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Helper transform that makes timestamps and window assignments explicit in the value part of
+ * each key/value pair.
+ */
+public class ReifyTimestampsAndWindows<K, V>
+    extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, WindowedValue<V>>>> {
+
+  @Override
+  public PCollection<KV<K, WindowedValue<V>>> apply(PCollection<KV<K, V>> input) {
+
+    // The requirement to use a KvCoder *is* actually a model-level requirement, not specific
+    // to this implementation of GBK. All runners need a way to get the key.
+    checkArgument(
+        input.getCoder() instanceof KvCoder,
+        "%s requires its input to use a %s",
+        GroupByKey.class.getSimpleName(),
+        KvCoder.class.getSimpleName());
+
+    @SuppressWarnings("unchecked")
+    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
+    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+    Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+    Coder<WindowedValue<V>> outputValueCoder =
+        FullWindowedValueCoder.of(
+            inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
+    Coder<KV<K, WindowedValue<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
+    return input
+        .apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
+        .setCoder(outputKvCoder);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
deleted file mode 100644
index 78377c8..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SimpleDoFnRunner.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
-import org.apache.beam.sdk.util.ExecutionContext.StepContext;
-import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
-import org.apache.beam.sdk.values.TupleTag;
-
-import java.util.List;
-
-/**
- * Runs a {@link DoFn} by constructing the appropriate contexts and passing them in.
- *
- * @param <InputT> the type of the DoFn's (main) input elements
- * @param <OutputT> the type of the DoFn's (main) output elements
- */
-public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, OutputT>{
-
-  protected SimpleDoFnRunner(PipelineOptions options, DoFn<InputT, OutputT> fn,
-      SideInputReader sideInputReader,
-      OutputManager outputManager,
-      TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext,
-      AddCounterMutator addCounterMutator, WindowingStrategy<?, ?> windowingStrategy) {
-    super(options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext,
-        addCounterMutator, windowingStrategy);
-  }
-
-  @Override
-  protected void invokeProcessElement(WindowedValue<InputT> elem) {
-    final DoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
-    // This can contain user code. Wrap it in case it throws an exception.
-    try {
-      fn.processElement(processContext);
-    } catch (Exception ex) {
-      throw wrapUserCodeException(ex);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java
deleted file mode 100644
index 2eeee54..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemReduceFn.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-
-/**
- * {@link ReduceFn} implementing the default reduction behaviors of {@link GroupByKey}.
- *
- * @param <K> The type of key being processed.
- * @param <InputT> The type of values associated with the key.
- * @param <OutputT> The output type that will be produced for each key.
- * @param <W> The type of windows this operates on.
- */
-public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends BoundedWindow>
-    extends ReduceFn<K, InputT, OutputT, W> {
-  private static final String BUFFER_NAME = "buf";
-
-  /**
-   * Create a factory that produces {@link SystemReduceFn} instances that that buffer all of the
-   * input values in persistent state and produces an {@code Iterable<T>}.
-   */
-  public static <K, T, W extends BoundedWindow> SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>
-      buffering(final Coder<T> inputCoder) {
-    final StateTag<Object, BagState<T>> bufferTag =
-        StateTags.makeSystemTagInternal(StateTags.bag(BUFFER_NAME, inputCoder));
-    return new SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>(bufferTag) {
-      @Override
-      public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception {
-        StateMerging.prefetchBags(state, bufferTag);
-      }
-
-      @Override
-      public void onMerge(OnMergeContext c) throws Exception {
-        StateMerging.mergeBags(c.state(), bufferTag);
-      }
-    };
-  }
-
-  /**
-   * Create a factory that produces {@link SystemReduceFn} instances that combine all of the input
-   * values using a {@link CombineFn}.
-   */
-  public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> SystemReduceFn<K, InputT,
-      AccumT, OutputT, W>
-      combining(
-          final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    final StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> bufferTag;
-    if (combineFn.getFn() instanceof KeyedCombineFnWithContext) {
-      bufferTag = StateTags.makeSystemTagInternal(
-          StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext(
-              BUFFER_NAME, combineFn.getAccumulatorCoder(),
-              (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) combineFn.getFn()));
-
-    } else {
-      bufferTag = StateTags.makeSystemTagInternal(
-            StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValue(
-                BUFFER_NAME, combineFn.getAccumulatorCoder(),
-                (KeyedCombineFn<K, InputT, AccumT, OutputT>) combineFn.getFn()));
-    }
-    return new SystemReduceFn<K, InputT, AccumT, OutputT, W>(bufferTag) {
-      @Override
-      public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception {
-        StateMerging.prefetchCombiningValues(state, bufferTag);
-      }
-
-      @Override
-      public void onMerge(OnMergeContext c) throws Exception {
-        StateMerging.mergeCombiningValues(c.state(), bufferTag);
-      }
-    };
-  }
-
-  private StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag;
-
-  public SystemReduceFn(
-      StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag) {
-    this.bufferTag = bufferTag;
-  }
-
-  @Override
-  public void processValue(ProcessValueContext c) throws Exception {
-    c.state().access(bufferTag).add(c.value());
-  }
-
-  @Override
-  public void prefetchOnTrigger(StateAccessor<K> state) {
-    state.access(bufferTag).readLater();
-  }
-
-  @Override
-  public void onTrigger(OnTriggerContext c) throws Exception {
-    c.output(c.state().access(bufferTag).read());
-  }
-
-  @Override
-  public void clearState(Context c) throws Exception {
-    c.state().access(bufferTag).clear();
-  }
-
-  @Override
-  public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
-    return state.access(bufferTag).isEmpty();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
deleted file mode 100644
index f104f6a..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerRunner.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.ValueState;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Instant;
-
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * Executes a trigger while managing persistence of information about which subtriggers are
- * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger.
- *
- * <p>Specifically, the responsibilities are:
- *
- * <ul>
- *   <li>Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by
- *       constructing the appropriate trigger contexts.</li>
- *   <li>Committing a record of which subtriggers are finished to persistent state.</li>
- *   <li>Restoring the record of which subtriggers are finished from persistent state.</li>
- *   <li>Clearing out the persisted finished set when a caller indicates
- *       (via {#link #clearFinished}) that it is no longer needed.</li>
- * </ul>
- *
- * <p>These responsibilities are intertwined: trigger contexts include mutable information about
- * which subtriggers are finished. This class provides the information when building the contexts
- * and commits the information when the method of the {@link ExecutableTrigger} returns.
- *
- * @param <W> The kind of windows being processed.
- */
-public class TriggerRunner<W extends BoundedWindow> {
-  @VisibleForTesting
-  static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
-      StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
-
-  private final ExecutableTrigger rootTrigger;
-  private final TriggerContextFactory<W> contextFactory;
-
-  public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory<W> contextFactory) {
-    Preconditions.checkState(rootTrigger.getTriggerIndex() == 0);
-    this.rootTrigger = rootTrigger;
-    this.contextFactory = contextFactory;
-  }
-
-  private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
-    if (!isFinishedSetNeeded()) {
-      // If no trigger in the tree will ever have finished bits, then we don't need to read them.
-      // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not
-      // finished) for each trigger in the tree.
-      return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
-    }
-
-    BitSet bitSet = state.read();
-    return bitSet == null
-        ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
-            : FinishedTriggersBitSet.fromBitSet(bitSet);
-  }
-
-
-  private void clearFinishedBits(ValueState<BitSet> state) {
-    if (!isFinishedSetNeeded()) {
-      // Nothing to clear.
-      return;
-    }
-    state.clear();
-  }
-
-  /** Return true if the trigger is closed in the window corresponding to the specified state. */
-  public boolean isClosed(StateAccessor<?> state) {
-    return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
-  }
-
-  public void prefetchForValue(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchOnElement(
-        contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  public void prefetchOnFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  public void prefetchShouldFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchShouldFire(
-        contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  /**
-   * Run the trigger logic to deal with a new value.
-   */
-  public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state)
-      throws Exception {
-    // Clone so that we can detect changes and so that changes here don't pollute merging.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext(
-        window, timers, timestamp, rootTrigger, finishedSet);
-    rootTrigger.invokeOnElement(triggerContext);
-    persistFinishedSet(state, finishedSet);
-  }
-
-  public void prefetchForMerge(
-      W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) {
-    if (isFinishedSetNeeded()) {
-      for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
-        value.readLater();
-      }
-    }
-    rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor(
-        window, mergingWindows, rootTrigger));
-  }
-
-  /**
-   * Run the trigger merging logic as part of executing the specified merge.
-   */
-  public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception {
-    // Clone so that we can detect changes and so that changes here don't pollute merging.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-
-    // And read the finished bits in each merging window.
-    ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
-    for (Map.Entry<W, ValueState<BitSet>> entry :
-        state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
-      // Don't need to clone these, since the trigger context doesn't allow modification
-      builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
-      // Clear the underlying finished bits.
-      clearFinishedBits(entry.getValue());
-    }
-    ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
-
-    Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext(
-        window, timers, rootTrigger, finishedSet, mergingFinishedSets);
-
-    // Run the merge from the trigger
-    rootTrigger.invokeOnMerge(mergeContext);
-
-    persistFinishedSet(state, finishedSet);
-  }
-
-  public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
-    FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger.TriggerContext context = contextFactory.base(window, timers,
-        rootTrigger, finishedSet);
-    return rootTrigger.invokeShouldFire(context);
-  }
-
-  public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
-    // shouldFire should be false.
-    // However it is too expensive to assert.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger.TriggerContext context = contextFactory.base(window, timers,
-        rootTrigger, finishedSet);
-    rootTrigger.invokeOnFire(context);
-    persistFinishedSet(state, finishedSet);
-  }
-
-  private void persistFinishedSet(
-      StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
-    if (!isFinishedSetNeeded()) {
-      return;
-    }
-
-    ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
-    if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
-      if (modifiedFinishedSet.getBitSet().isEmpty()) {
-        finishedSetState.clear();
-      } else {
-        finishedSetState.write(modifiedFinishedSet.getBitSet());
-      }
-    }
-  }
-
-  /**
-   * Clear the finished bits.
-   */
-  public void clearFinished(StateAccessor<?> state) {
-    clearFinishedBits(state.access(FINISHED_BITS_TAG));
-  }
-
-  /**
-   * Clear the state used for executing triggers, but leave the finished set to indicate
-   * the window is closed.
-   */
-  public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception {
-    // Don't need to clone, because we'll be clearing the finished bits anyways.
-    FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG));
-    rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet));
-  }
-
-  private boolean isFinishedSetNeeded() {
-    // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the
-    // lookup. Right now, we special case this for the DefaultTrigger.
-    return !(rootTrigger.getSpec() instanceof DefaultTrigger);
-  }
-}