You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:41:05 UTC

[34/50] [abbrv] incubator-beam git commit: Put classes in runners-core package into runners.core namespace

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
new file mode 100644
index 0000000..114f5e6
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.joda.time.Instant;
+
+/**
+ * Determine the timing and other properties of a new pane for a given computation, key and window.
+ * Incorporates any previous pane, whether the pane has been produced because an
+ * on-time {@link AfterWatermark} trigger firing, and the relation between the element's timestamp
+ * and the current output watermark.
+ */
+public class PaneInfoTracker {
+  private TimerInternals timerInternals;
+
+  public PaneInfoTracker(TimerInternals timerInternals) {
+    this.timerInternals = timerInternals;
+  }
+
+  @VisibleForTesting
+  static final StateTag<Object, ValueState<PaneInfo>> PANE_INFO_TAG =
+      StateTags.makeSystemTagInternal(StateTags.value("pane", PaneInfoCoder.INSTANCE));
+
+  public void clear(StateAccessor<?> state) {
+    state.access(PANE_INFO_TAG).clear();
+  }
+
+  /**
+   * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
+   * info includes the timing for the pane, who's calculation is quite subtle.
+   *
+   * @param isFinal should be {@code true} only if the triggering machinery can guarantee
+   * no further firings for the
+   */
+  public ReadableState<PaneInfo> getNextPaneInfo(
+      ReduceFn<?, ?, ?, ?>.Context context, final boolean isFinal) {
+    final Object key = context.key();
+    final ReadableState<PaneInfo> previousPaneFuture =
+        context.state().access(PaneInfoTracker.PANE_INFO_TAG);
+    final Instant windowMaxTimestamp = context.window().maxTimestamp();
+
+    return new ReadableState<PaneInfo>() {
+      @Override
+      @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+          justification = "prefetch side effect")
+      public ReadableState<PaneInfo> readLater() {
+        previousPaneFuture.readLater();
+        return this;
+      }
+
+      @Override
+      public PaneInfo read() {
+        PaneInfo previousPane = previousPaneFuture.read();
+        return describePane(key, windowMaxTimestamp, previousPane, isFinal);
+      }
+    };
+  }
+
+  public void storeCurrentPaneInfo(ReduceFn<?, ?, ?, ?>.Context context, PaneInfo currentPane) {
+    context.state().access(PANE_INFO_TAG).write(currentPane);
+  }
+
+  private <W> PaneInfo describePane(
+      Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean isFinal) {
+    boolean isFirst = previousPane == null;
+    Timing previousTiming = isFirst ? null : previousPane.getTiming();
+    long index = isFirst ? 0 : previousPane.getIndex() + 1;
+    long nonSpeculativeIndex = isFirst ? 0 : previousPane.getNonSpeculativeIndex() + 1;
+    Instant outputWM = timerInternals.currentOutputWatermarkTime();
+    Instant inputWM = timerInternals.currentInputWatermarkTime();
+
+    // True if it is not possible to assign the element representing this pane a timestamp
+    // which will make an ON_TIME pane for any following computation.
+    // Ie true if the element's latest possible timestamp is before the current output watermark.
+    boolean isLateForOutput = outputWM != null && windowMaxTimestamp.isBefore(outputWM);
+
+    // True if all emitted panes (if any) were EARLY panes.
+    // Once the ON_TIME pane has fired, all following panes must be considered LATE even
+    // if the output watermark is behind the end of the window.
+    boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;
+
+    // True is the input watermark hasn't passed the window's max timestamp.
+    boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp);
+
+    Timing timing;
+    if (isLateForOutput || !onlyEarlyPanesSoFar) {
+      // The output watermark has already passed the end of this window, or we have already
+      // emitted a non-EARLY pane. Irrespective of how this pane was triggered we must
+      // consider this pane LATE.
+      timing = Timing.LATE;
+    } else if (isEarlyForInput) {
+      // This is an EARLY firing.
+      timing = Timing.EARLY;
+      nonSpeculativeIndex = -1;
+    } else {
+      // This is the unique ON_TIME firing for the window.
+      timing = Timing.ON_TIME;
+    }
+
+    WindowTracing.debug(
+        "describePane: {} pane (prev was {}) for key:{}; windowMaxTimestamp:{}; "
+        + "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}",
+        timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isLateForOutput);
+
+    if (previousPane != null) {
+      // Timing transitions should follow EARLY* ON_TIME? LATE*
+      switch (previousTiming) {
+        case EARLY:
+          checkState(
+              timing == Timing.EARLY || timing == Timing.ON_TIME || timing == Timing.LATE,
+              "EARLY cannot transition to %s", timing);
+          break;
+        case ON_TIME:
+          checkState(
+              timing == Timing.LATE, "ON_TIME cannot transition to %s", timing);
+          break;
+        case LATE:
+          checkState(timing == Timing.LATE, "LATE cannot transtion to %s", timing);
+          break;
+        case UNKNOWN:
+          break;
+      }
+      checkState(!previousPane.isLast(), "Last pane was not last after all.");
+    }
+
+    return PaneInfo.createPane(isFirst, isFinal, timing, index, nonSpeculativeIndex);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java
new file mode 100644
index 0000000..fcdff3b
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.util.common.Reiterator;
+
+/**
+ * A {@link Reiterator} that supports one-element lookahead during iteration.
+ *
+ * @param <T> the type of elements returned by this iterator
+ */
+public final class PeekingReiterator<T> implements Reiterator<T> {
+  private T nextElement;
+  private boolean nextElementComputed;
+  private final Reiterator<T> iterator;
+
+  public PeekingReiterator(Reiterator<T> iterator) {
+    this.iterator = checkNotNull(iterator);
+  }
+
+  PeekingReiterator(PeekingReiterator<T> it) {
+    this.iterator = checkNotNull(checkNotNull(it).iterator.copy());
+    this.nextElement = it.nextElement;
+    this.nextElementComputed = it.nextElementComputed;
+  }
+
+  @Override
+  public boolean hasNext() {
+    computeNext();
+    return nextElementComputed;
+  }
+
+  @Override
+  public T next() {
+    T result = peek();
+    nextElementComputed = false;
+    return result;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * <p>If {@link #peek} is called, {@code remove} is disallowed until
+   * {@link #next} has been subsequently called.
+   */
+  @Override
+  public void remove() {
+    checkState(!nextElementComputed,
+        "After peek(), remove() is disallowed until next() is called");
+    iterator.remove();
+  }
+
+  @Override
+  public PeekingReiterator<T> copy() {
+    return new PeekingReiterator<>(this);
+  }
+
+  /**
+   * Returns the element that would be returned by {@link #next}, without
+   * actually consuming the element.
+   * @throws NoSuchElementException if there is no next element
+   */
+  public T peek() {
+    computeNext();
+    if (!nextElementComputed) {
+      throw new NoSuchElementException();
+    }
+    return nextElement;
+  }
+
+  private void computeNext() {
+    if (nextElementComputed) {
+      return;
+    }
+    if (!iterator.hasNext()) {
+      return;
+    }
+    nextElement = iterator.next();
+    nextElementComputed = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
new file mode 100644
index 0000000..deeac3c
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
+ * them via the {@link #processElementInReadyWindows(WindowedValue)}.
+ */
+public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+  private final DoFnRunner<InputT, OutputT> underlying;
+  private final Collection<PCollectionView<?>> views;
+  private final ReadyCheckingSideInputReader sideInputReader;
+
+  private Set<BoundedWindow> notReadyWindows;
+
+  public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT> create(
+      DoFnRunner<InputT, OutputT> underlying,
+      Collection<PCollectionView<?>> views,
+      ReadyCheckingSideInputReader sideInputReader) {
+    return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader);
+  }
+
+  private PushbackSideInputDoFnRunner(
+      DoFnRunner<InputT, OutputT> underlying,
+      Collection<PCollectionView<?>> views,
+      ReadyCheckingSideInputReader sideInputReader) {
+    this.underlying = underlying;
+    this.views = views;
+    this.sideInputReader = sideInputReader;
+  }
+
+  @Override
+  public void startBundle() {
+    notReadyWindows = new HashSet<>();
+    underlying.startBundle();
+  }
+
+  /**
+   * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element
+   * for each window the element is in that is ready.
+   *
+   * @param elem the element to process in all ready windows
+   * @return each element that could not be processed because it requires a side input window
+   * that is not ready.
+   */
+  public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
+    if (views.isEmpty()) {
+      processElement(elem);
+      return Collections.emptyList();
+    }
+    ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
+    for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
+      BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
+      boolean isReady = !notReadyWindows.contains(mainInputWindow);
+      for (PCollectionView<?> view : views) {
+        BoundedWindow sideInputWindow =
+            view.getWindowingStrategyInternal()
+                .getWindowFn()
+                .getSideInputWindow(mainInputWindow);
+        if (!sideInputReader.isReady(view, sideInputWindow)) {
+          isReady = false;
+          break;
+        }
+      }
+      if (isReady) {
+        processElement(windowElem);
+      } else {
+        notReadyWindows.add(mainInputWindow);
+        pushedBack.add(windowElem);
+      }
+    }
+    return pushedBack.build();
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> elem) {
+    underlying.processElement(elem);
+  }
+
+  /**
+   * Call the underlying {@link DoFnRunner#finishBundle()}.
+   */
+  @Override
+  public void finishBundle() {
+    notReadyWindows = null;
+    underlying.finishBundle();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFn.java
new file mode 100644
index 0000000..bb20226
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFn.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.Timers;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.joda.time.Instant;
+
+/**
+ * Specification for processing to happen after elements have been grouped by key.
+ *
+ * @param <K> The type of key being processed.
+ * @param <InputT> The type of input values associated with the key.
+ * @param <OutputT> The output type that will be produced for each key.
+ * @param <W> The type of windows this operates on.
+ */
+public abstract class ReduceFn<K, InputT, OutputT, W extends BoundedWindow>
+    implements Serializable {
+
+  /** Information accessible to all the processing methods in this {@code ReduceFn}. */
+  public abstract class Context {
+    /** Return the key that is being processed. */
+    public abstract K key();
+
+    /** The window that is being processed. */
+    public abstract W window();
+
+    /** Access the current {@link WindowingStrategy}. */
+    public abstract WindowingStrategy<?, W> windowingStrategy();
+
+    /** Return the interface for accessing state. */
+    public abstract StateAccessor<K> state();
+
+    /** Return the interface for accessing timers. */
+    public abstract Timers timers();
+  }
+
+  /** Information accessible within {@link #processValue}. */
+  public abstract class ProcessValueContext extends Context {
+    /** Return the actual value being processed. */
+    public abstract InputT value();
+
+    /** Return the timestamp associated with the value. */
+    public abstract Instant timestamp();
+  }
+
+  /** Information accessible within {@link #onMerge}. */
+  public abstract class OnMergeContext extends Context {
+    /** Return the interface for accessing state. */
+    @Override
+    public abstract MergingStateAccessor<K, W> state();
+  }
+
+  /** Information accessible within {@link #onTrigger}. */
+  public abstract class OnTriggerContext extends Context {
+    /** Returns the {@link PaneInfo} for the trigger firing being processed. */
+    public abstract PaneInfo paneInfo();
+
+    /** Output the given value in the current window. */
+    public abstract void output(OutputT value);
+  }
+
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Called for each value of type {@code InputT} associated with the current key.
+   */
+  public abstract void processValue(ProcessValueContext c) throws Exception;
+
+  /**
+   * Called when windows are merged.
+   */
+  public abstract void onMerge(OnMergeContext context) throws Exception;
+
+  /**
+   * Called when triggers fire.
+   *
+   * <p>Implementations of {@link ReduceFn} should call {@link OnTriggerContext#output} to emit
+   * any results that should be included in the pane produced by this trigger firing.
+   */
+  public abstract void onTrigger(OnTriggerContext context) throws Exception;
+
+  /**
+   * Called before {@link #onMerge} is invoked to provide an opportunity to prefetch any needed
+   * state.
+   *
+   * @param c Context to use prefetch from.
+   */
+  public void prefetchOnMerge(MergingStateAccessor<K, W> c) throws Exception {}
+
+  /**
+   * Called before {@link #onTrigger} is invoked to provide an opportunity to prefetch any needed
+   * state.
+   *
+   * @param context Context to use prefetch from.
+   */
+  public void prefetchOnTrigger(StateAccessor<K> context) {}
+
+  /**
+   * Called to clear any persisted state that the {@link ReduceFn} may be holding. This will be
+   * called when the windowing is closing and will receive no future interactions.
+   */
+  public abstract void clearState(Context context) throws Exception;
+
+  /**
+   * Returns true if the there is no buffered state.
+   */
+  public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
new file mode 100644
index 0000000..2043f14
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ActiveWindowSet;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.Timers;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateContexts;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.joda.time.Instant;
+
+/**
+ * Factory for creating instances of the various {@link ReduceFn} contexts.
+ */
+class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
+  public interface OnTriggerCallbacks<OutputT> {
+    void output(OutputT toOutput);
+  }
+
+  private final K key;
+  private final ReduceFn<K, InputT, OutputT, W> reduceFn;
+  private final WindowingStrategy<?, W> windowingStrategy;
+  private final StateInternals<K> stateInternals;
+  private final ActiveWindowSet<W> activeWindows;
+  private final TimerInternals timerInternals;
+  private final WindowingInternals<?, ?> windowingInternals;
+  private final PipelineOptions options;
+
+  ReduceFnContextFactory(K key, ReduceFn<K, InputT, OutputT, W> reduceFn,
+      WindowingStrategy<?, W> windowingStrategy, StateInternals<K> stateInternals,
+      ActiveWindowSet<W> activeWindows, TimerInternals timerInternals,
+      WindowingInternals<?, ?> windowingInternals, PipelineOptions options) {
+    this.key = key;
+    this.reduceFn = reduceFn;
+    this.windowingStrategy = windowingStrategy;
+    this.stateInternals = stateInternals;
+    this.activeWindows = activeWindows;
+    this.timerInternals = timerInternals;
+    this.windowingInternals = windowingInternals;
+    this.options = options;
+  }
+
+  /** Where should we look for state associated with a given window? */
+  public static enum StateStyle {
+    /** All state is associated with the window itself. */
+    DIRECT,
+    /** State is associated with the 'state address' windows tracked by the active window set. */
+    RENAMED
+  }
+
+  private StateAccessorImpl<K, W> stateAccessor(W window, StateStyle style) {
+    return new StateAccessorImpl<K, W>(
+        activeWindows, windowingStrategy.getWindowFn().windowCoder(),
+        stateInternals, StateContexts.createFromComponents(options, windowingInternals, window),
+        style);
+  }
+
+  public ReduceFn<K, InputT, OutputT, W>.Context base(W window, StateStyle style) {
+    return new ContextImpl(stateAccessor(window, style));
+  }
+
+  public ReduceFn<K, InputT, OutputT, W>.ProcessValueContext forValue(
+      W window, InputT value, Instant timestamp, StateStyle style) {
+    return new ProcessValueContextImpl(stateAccessor(window, style), value, timestamp);
+  }
+
+  public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
+      ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
+    return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks);
+  }
+
+  public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge(
+      Collection<W> activeToBeMerged, W mergeResult, StateStyle style) {
+    return new OnMergeContextImpl(
+        new MergingStateAccessorImpl<K, W>(activeWindows,
+            windowingStrategy.getWindowFn().windowCoder(),
+            stateInternals, style, activeToBeMerged, mergeResult));
+  }
+
+  public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forPremerge(W window) {
+    return new OnPremergeContextImpl(new PremergingStateAccessorImpl<K, W>(
+        activeWindows, windowingStrategy.getWindowFn().windowCoder(), stateInternals, window));
+  }
+
+  private class TimersImpl implements Timers {
+    private final StateNamespace namespace;
+
+    public TimersImpl(StateNamespace namespace) {
+      checkArgument(namespace instanceof WindowNamespace);
+      this.namespace = namespace;
+    }
+
+    @Override
+    public void setTimer(Instant timestamp, TimeDomain timeDomain) {
+      timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
+    }
+
+    @Override
+    public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
+      timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return timerInternals.currentProcessingTime();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentSynchronizedProcessingTime() {
+      return timerInternals.currentSynchronizedProcessingTime();
+    }
+
+    @Override
+    public Instant currentEventTime() {
+      return timerInternals.currentInputWatermarkTime();
+    }
+  }
+
+  // ======================================================================
+  // StateAccessors
+  // ======================================================================
+  static class StateAccessorImpl<K, W extends BoundedWindow> implements StateAccessor<K> {
+
+
+    protected final ActiveWindowSet<W> activeWindows;
+    protected final StateContext<W> context;
+    protected final StateNamespace windowNamespace;
+    protected final Coder<W> windowCoder;
+    protected final StateInternals<K> stateInternals;
+    protected final StateStyle style;
+
+    public StateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
+        StateInternals<K> stateInternals, StateContext<W> context, StateStyle style) {
+
+      this.activeWindows = activeWindows;
+      this.windowCoder = windowCoder;
+      this.stateInternals = stateInternals;
+      this.context = checkNotNull(context);
+      this.windowNamespace = namespaceFor(context.window());
+      this.style = style;
+    }
+
+    protected StateNamespace namespaceFor(W window) {
+      return StateNamespaces.window(windowCoder, window);
+    }
+
+    protected StateNamespace windowNamespace() {
+      return windowNamespace;
+    }
+
+    W window() {
+      return context.window();
+    }
+
+    StateNamespace namespace() {
+      return windowNamespace();
+    }
+
+    @Override
+    public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
+      switch (style) {
+        case DIRECT:
+          return stateInternals.state(windowNamespace(), address, context);
+        case RENAMED:
+          return stateInternals.state(
+              namespaceFor(activeWindows.writeStateAddress(context.window())), address, context);
+      }
+      throw new RuntimeException(); // cases are exhaustive.
+    }
+  }
+
+  static class MergingStateAccessorImpl<K, W extends BoundedWindow>
+      extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
+    private final Collection<W> activeToBeMerged;
+
+    public MergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
+        StateInternals<K> stateInternals, StateStyle style, Collection<W> activeToBeMerged,
+        W mergeResult) {
+      super(activeWindows, windowCoder, stateInternals,
+          StateContexts.windowOnly(mergeResult), style);
+      this.activeToBeMerged = activeToBeMerged;
+    }
+
+    @Override
+    public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
+      switch (style) {
+        case DIRECT:
+          return stateInternals.state(windowNamespace(), address, context);
+        case RENAMED:
+          return stateInternals.state(
+              namespaceFor(activeWindows.mergedWriteStateAddress(
+                  activeToBeMerged, context.window())),
+              address,
+              context);
+      }
+      throw new RuntimeException(); // cases are exhaustive.
+    }
+
+    @Override
+    public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
+        StateTag<? super K, StateT> address) {
+      ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
+      for (W mergingWindow : activeToBeMerged) {
+        StateNamespace namespace = null;
+        switch (style) {
+          case DIRECT:
+            namespace = namespaceFor(mergingWindow);
+            break;
+          case RENAMED:
+            namespace = namespaceFor(activeWindows.writeStateAddress(mergingWindow));
+            break;
+        }
+        checkNotNull(namespace); // cases are exhaustive.
+        builder.put(mergingWindow, stateInternals.state(namespace, address, context));
+      }
+      return builder.build();
+    }
+  }
+
+  static class PremergingStateAccessorImpl<K, W extends BoundedWindow>
+      extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
+    public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
+        StateInternals<K> stateInternals, W window) {
+      super(activeWindows, windowCoder, stateInternals,
+          StateContexts.windowOnly(window), StateStyle.RENAMED);
+    }
+
+    Collection<W> mergingWindows() {
+      return activeWindows.readStateAddresses(context.window());
+    }
+
+    @Override
+    public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
+        StateTag<? super K, StateT> address) {
+      ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
+      for (W stateAddressWindow : activeWindows.readStateAddresses(context.window())) {
+        StateT stateForWindow =
+            stateInternals.state(namespaceFor(stateAddressWindow), address, context);
+        builder.put(stateAddressWindow, stateForWindow);
+      }
+      return builder.build();
+    }
+  }
+
+  // ======================================================================
+  // Contexts
+  // ======================================================================
+
+  private class ContextImpl extends ReduceFn<K, InputT, OutputT, W>.Context {
+    private final StateAccessorImpl<K, W> state;
+    private final TimersImpl timers;
+
+    private ContextImpl(StateAccessorImpl<K, W> state) {
+      reduceFn.super();
+      this.state = state;
+      this.timers = new TimersImpl(state.namespace());
+    }
+
+    @Override
+    public K key() {
+      return key;
+    }
+
+    @Override
+    public W window() {
+      return state.window();
+    }
+
+    @Override
+    public WindowingStrategy<?, W> windowingStrategy() {
+      return windowingStrategy;
+    }
+
+    @Override
+    public StateAccessor<K> state() {
+      return state;
+    }
+
+    @Override
+    public Timers timers() {
+      return timers;
+    }
+  }
+
+  private class ProcessValueContextImpl
+      extends ReduceFn<K, InputT, OutputT, W>.ProcessValueContext {
+    private final InputT value;
+    private final Instant timestamp;
+    private final StateAccessorImpl<K, W> state;
+    private final TimersImpl timers;
+
+    private ProcessValueContextImpl(StateAccessorImpl<K, W> state,
+        InputT value, Instant timestamp) {
+      reduceFn.super();
+      this.state = state;
+      this.value = value;
+      this.timestamp = timestamp;
+      this.timers = new TimersImpl(state.namespace());
+    }
+
+    @Override
+    public K key() {
+      return key;
+    }
+
+    @Override
+    public W window() {
+      return state.window();
+    }
+
+    @Override
+    public WindowingStrategy<?, W> windowingStrategy() {
+      return windowingStrategy;
+    }
+
+    @Override
+    public StateAccessor<K> state() {
+      return state;
+    }
+
+    @Override
+    public InputT value() {
+      return value;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return timestamp;
+    }
+
+    @Override
+    public Timers timers() {
+      return timers;
+    }
+  }
+
+  private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext {
+    private final StateAccessorImpl<K, W> state;
+    private final ReadableState<PaneInfo> pane;
+    private final OnTriggerCallbacks<OutputT> callbacks;
+    private final TimersImpl timers;
+
+    private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane,
+        OnTriggerCallbacks<OutputT> callbacks) {
+      reduceFn.super();
+      this.state = state;
+      this.pane = pane;
+      this.callbacks = callbacks;
+      this.timers = new TimersImpl(state.namespace());
+    }
+
+    @Override
+    public K key() {
+      return key;
+    }
+
+    @Override
+    public W window() {
+      return state.window();
+    }
+
+    @Override
+    public WindowingStrategy<?, W> windowingStrategy() {
+      return windowingStrategy;
+    }
+
+    @Override
+    public StateAccessor<K> state() {
+      return state;
+    }
+
+    @Override
+    public PaneInfo paneInfo() {
+      return pane.read();
+    }
+
+    @Override
+    public void output(OutputT value) {
+      callbacks.output(value);
+    }
+
+    @Override
+    public Timers timers() {
+      return timers;
+    }
+  }
+
+  private class OnMergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext {
+    private final MergingStateAccessorImpl<K, W> state;
+    private final TimersImpl timers;
+
+    private OnMergeContextImpl(MergingStateAccessorImpl<K, W> state) {
+      reduceFn.super();
+      this.state = state;
+      this.timers = new TimersImpl(state.namespace());
+    }
+
+    @Override
+    public K key() {
+      return key;
+    }
+
+    @Override
+    public WindowingStrategy<?, W> windowingStrategy() {
+      return windowingStrategy;
+    }
+
+    @Override
+    public MergingStateAccessor<K, W> state() {
+      return state;
+    }
+
+    @Override
+    public W window() {
+      return state.window();
+    }
+
+    @Override
+    public Timers timers() {
+      return timers;
+    }
+  }
+
+  private class OnPremergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext {
+    private final PremergingStateAccessorImpl<K, W> state;
+    private final TimersImpl timers;
+
+    private OnPremergeContextImpl(PremergingStateAccessorImpl<K, W> state) {
+      reduceFn.super();
+      this.state = state;
+      this.timers = new TimersImpl(state.namespace());
+    }
+
+    @Override
+    public K key() {
+      return key;
+    }
+
+    @Override
+    public WindowingStrategy<?, W> windowingStrategy() {
+      return windowingStrategy;
+    }
+
+    @Override
+    public MergingStateAccessor<K, W> state() {
+      return state;
+    }
+
+    @Override
+    public W window() {
+      return state.window();
+    }
+
+    @Override
+    public Timers timers() {
+      return timers;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
new file mode 100644
index 0000000..96d764a
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -0,0 +1,993 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.runners.core.ReduceFnContextFactory.OnTriggerCallbacks;
+import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ActiveWindowSet;
+import org.apache.beam.sdk.util.MergingActiveWindowSet;
+import org.apache.beam.sdk.util.NonMergingActiveWindowSet;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.TriggerContextFactory;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Manages the execution of a {@link ReduceFn} after a {@link GroupByKeyOnly} has partitioned the
+ * {@link PCollection} by key.
+ *
+ * <p>The {@link #onTrigger} relies on a {@link TriggerRunner} to manage the execution of
+ * the triggering logic. The {@code ReduceFnRunner}s responsibilities are:
+ *
+ * <ul>
+ * <li>Tracking the windows that are active (have buffered data) as elements arrive and
+ * triggers are fired.
+ * <li>Holding the watermark based on the timestamps of elements in a pane and releasing it
+ * when the trigger fires.
+ * <li>Calling the appropriate callbacks on {@link ReduceFn} based on trigger execution, timer
+ * firings, etc, and providing appropriate contexts to the {@link ReduceFn} for actions
+ * such as output.
+ * <li>Scheduling garbage collection of state associated with a specific window, and making that
+ * happen when the appropriate timer fires.
+ * </ul>
+ *
+ * @param <K>       The type of key being processed.
+ * @param <InputT>  The type of values associated with the key.
+ * @param <OutputT> The output type that will be produced for each key.
+ * @param <W>       The type of windows this operates on.
+ */
+public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
+
+  /**
+   * The {@link ReduceFnRunner} depends on most aspects of the {@link WindowingStrategy}.
+   *
+   * <ul>
+   * <li>It runs the trigger from the {@link WindowingStrategy}.</li>
+   * <li>It merges windows according to the {@link WindowingStrategy}.</li>
+   * <li>It chooses how to track active windows and clear out expired windows
+   * according to the {@link WindowingStrategy}, based on the allowed lateness and
+   * whether windows can merge.</li>
+   * <li>It decides whether to emit empty final panes according to whether the
+   * {@link WindowingStrategy} requires it.<li>
+   * <li>It uses discarding or accumulation mode according to the {@link WindowingStrategy}.</li>
+   * </ul>
+   */
+  private final WindowingStrategy<Object, W> windowingStrategy;
+
+  private final OutputWindowedValue<KV<K, OutputT>> outputter;
+
+  private final StateInternals<K> stateInternals;
+
+  private final Aggregator<Long, Long> droppedDueToClosedWindow;
+
+  private final K key;
+
+  /**
+   * Track which windows are still active and the 'state address' windows which hold their state.
+   *
+   * <ul>
+   * <li>State: Global map for all active windows for this computation and key.
+   * <li>Lifetime: Cleared when no active windows need to be tracked. A window lives within
+   * the active window set until its trigger is closed or the window is garbage collected.
+   * </ul>
+   */
+  private final ActiveWindowSet<W> activeWindows;
+
+  /**
+   * Always a {@link SystemReduceFn}.
+   *
+   * <ul>
+   * <li>State: A bag of accumulated values, or the intermediate result of a combiner.
+   * <li>State style: RENAMED
+   * <li>Merging: Concatenate or otherwise combine the state from each merged window.
+   * <li>Lifetime: Cleared when a pane fires if DISCARDING_FIRED_PANES. Otherwise cleared
+   * when trigger is finished or when the window is garbage collected.
+   * </ul>
+   */
+  private final ReduceFn<K, InputT, OutputT, W> reduceFn;
+
+  /**
+   * Manage the setting and firing of timer events.
+   *
+   * <ul>
+   * <li>Merging: End-of-window and garbage collection timers are cancelled when windows are
+   * merged away. Timers created by triggers are never garbage collected and are left to
+   * fire and be ignored.
+   * <li>Lifetime: Timers automatically disappear after they fire.
+   * </ul>
+   */
+  private final TimerInternals timerInternals;
+
+  /**
+   * Manage the execution and state for triggers.
+   *
+   * <ul>
+   * <li>State: Tracks which sub-triggers have finished, and any additional state needed to
+   * determine when the trigger should fire.
+   * <li>State style: DIRECT
+   * <li>Merging: Finished bits are explicitly managed. Other state is eagerly merged as
+   * needed.
+   * <li>Lifetime: Most trigger state is cleared when the final pane is emitted. However
+   * the finished bits are left behind and must be cleared when the window is
+   * garbage collected.
+   * </ul>
+   */
+  private final TriggerRunner<W> triggerRunner;
+
+  /**
+   * Store the output watermark holds for each window.
+   *
+   * <ul>
+   * <li>State: Bag of hold timestamps.
+   * <li>State style: RENAMED
+   * <li>Merging: Depending on {@link OutputTimeFn}, may need to be recalculated on merging.
+   * When a pane fires it may be necessary to add (back) an end-of-window or garbage collection
+   * hold.
+   * <li>Lifetime: Cleared when a pane fires or when the window is garbage collected.
+   * </ul>
+   */
+  private final WatermarkHold<W> watermarkHold;
+
+  private final ReduceFnContextFactory<K, InputT, OutputT, W> contextFactory;
+
+  /**
+   * Store the previously emitted pane (if any) for each window.
+   *
+   * <ul>
+   * <li>State: The previous {@link PaneInfo} passed to the user's {@code DoFn.ProcessElement}
+   * method, if any.
+   * <li>Style style: DIRECT
+   * <li>Merging: Always keyed by actual window, so does not depend on {@link #activeWindows}.
+   * Cleared when window is merged away.
+   * <li>Lifetime: Cleared when trigger is closed or window is garbage collected.
+   * </ul>
+   */
+  private final PaneInfoTracker paneInfoTracker;
+
+  /**
+   * Store whether we've seen any elements for a window since the last pane was emitted.
+   *
+   * <ul>
+   * <li>State: Unless DISCARDING_FIRED_PANES, a count of number of elements added so far.
+   * <li>State style: RENAMED.
+   * <li>Merging: Counts are summed when windows are merged.
+   * <li>Lifetime: Cleared when pane fires or window is garbage collected.
+   * </ul>
+   */
+  private final NonEmptyPanes<K, W> nonEmptyPanes;
+
+  public ReduceFnRunner(
+      K key,
+      WindowingStrategy<?, W> windowingStrategy,
+      StateInternals<K> stateInternals,
+      TimerInternals timerInternals,
+      WindowingInternals<?, KV<K, OutputT>> windowingInternals,
+      Aggregator<Long, Long> droppedDueToClosedWindow,
+      ReduceFn<K, InputT, OutputT, W> reduceFn,
+      PipelineOptions options) {
+    this.key = key;
+    this.timerInternals = timerInternals;
+    this.paneInfoTracker = new PaneInfoTracker(timerInternals);
+    this.stateInternals = stateInternals;
+    this.outputter = new OutputViaWindowingInternals<>(windowingInternals);
+    this.droppedDueToClosedWindow = droppedDueToClosedWindow;
+    this.reduceFn = reduceFn;
+
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<Object, W> objectWindowingStrategy =
+        (WindowingStrategy<Object, W>) windowingStrategy;
+    this.windowingStrategy = objectWindowingStrategy;
+
+    this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, this.reduceFn);
+
+    // Note this may incur I/O to load persisted window set data.
+    this.activeWindows = createActiveWindowSet();
+
+    this.contextFactory =
+        new ReduceFnContextFactory<K, InputT, OutputT, W>(key, reduceFn, this.windowingStrategy,
+            stateInternals, this.activeWindows, timerInternals, windowingInternals, options);
+
+    this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
+    this.triggerRunner =
+        new TriggerRunner<>(
+            windowingStrategy.getTrigger(),
+            new TriggerContextFactory<>(
+                windowingStrategy.getWindowFn(), stateInternals, activeWindows));
+  }
+
+  private ActiveWindowSet<W> createActiveWindowSet() {
+    return windowingStrategy.getWindowFn().isNonMerging()
+        ? new NonMergingActiveWindowSet<W>()
+        : new MergingActiveWindowSet<W>(windowingStrategy.getWindowFn(), stateInternals);
+  }
+
+  @VisibleForTesting
+  boolean isFinished(W window) {
+    return triggerRunner.isClosed(contextFactory.base(window, StateStyle.DIRECT).state());
+  }
+
+  @VisibleForTesting
+  boolean hasNoActiveWindows() {
+    return activeWindows.getActiveAndNewWindows().isEmpty();
+  }
+
+  /**
+   * Incorporate {@code values} into the underlying reduce function, and manage holds, timers,
+   * triggers, and window merging.
+   *
+   * <p>The general strategy is:
+   * <ol>
+   * <li>Use {@link WindowedValue#getWindows} (itself determined using
+   * {@link WindowFn#assignWindows}) to determine which windows each element belongs to. Some
+   * of those windows will already have state associated with them. The rest are considered
+   * NEW.
+   * <li>Use {@link WindowFn#mergeWindows} to attempt to merge currently ACTIVE and NEW windows.
+   * Each NEW window will become either ACTIVE or be discardedL.
+   * (See {@link ActiveWindowSet} for definitions of these terms.)
+   * <li>If at all possible, eagerly substitute NEW windows with their ACTIVE state address
+   * windows before any state is associated with the NEW window. In the common case that
+   * windows for new elements are merged into existing ACTIVE windows then no additional
+   * storage or merging overhead will be incurred.
+   * <li>Otherwise, keep track of the state address windows for ACTIVE windows so that their
+   * states can be merged on-demand when a pane fires.
+   * <li>Process the element for each of the windows it's windows have been merged into according
+   * to {@link ActiveWindowSet}. Processing may require running triggers, setting timers,
+   * setting holds, and invoking {@link ReduceFn#onTrigger}.
+   * </ol>
+   */
+  public void processElements(Iterable<WindowedValue<InputT>> values) throws Exception {
+    // If an incoming element introduces a new window, attempt to merge it into an existing
+    // window eagerly.
+    Map<W, W> windowToMergeResult = collectAndMergeWindows(values);
+
+    Set<W> windowsToConsider = new HashSet<>();
+
+    // Process each element, using the updated activeWindows determined by collectAndMergeWindows.
+    for (WindowedValue<InputT> value : values) {
+      windowsToConsider.addAll(processElement(windowToMergeResult, value));
+    }
+
+    // Trigger output from any window for which the trigger is ready
+    for (W mergedWindow : windowsToConsider) {
+      ReduceFn<K, InputT, OutputT, W>.Context directContext =
+          contextFactory.base(mergedWindow, StateStyle.DIRECT);
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
+          contextFactory.base(mergedWindow, StateStyle.RENAMED);
+      triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
+      emitIfAppropriate(directContext, renamedContext);
+    }
+
+    // We're all done with merging and emitting elements so can compress the activeWindow state.
+    // Any windows which are still NEW must have come in on a new element which was then discarded
+    // due to the window's trigger being closed. We can thus delete them.
+    activeWindows.cleanupTemporaryWindows();
+  }
+
+  public void persist() {
+    activeWindows.persist();
+  }
+
+  /**
+   * Extract the windows associated with the values, and invoke merge. Return a map
+   * from windows to the merge result window. If a window is not in the domain of
+   * the result map then it did not get merged into a different window.
+   */
+  private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> values)
+      throws Exception {
+    // No-op if no merging can take place
+    if (windowingStrategy.getWindowFn().isNonMerging()) {
+      return ImmutableMap.of();
+    }
+
+    // Collect the windows from all elements (except those which are too late) and
+    // make sure they are already in the active window set or are added as NEW windows.
+    for (WindowedValue<?> value : values) {
+      for (BoundedWindow untypedWindow : value.getWindows()) {
+        @SuppressWarnings("unchecked")
+        W window = (W) untypedWindow;
+
+        // For backwards compat with pre 1.4 only.
+        // We may still have ACTIVE windows with multiple state addresses, representing
+        // a window who's state has not yet been eagerly merged.
+        // We'll go ahead and merge that state now so that we don't have to worry about
+        // this legacy case anywhere else.
+        if (activeWindows.isActive(window)) {
+          Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
+          if (stateAddressWindows.size() > 1) {
+            // This is a legacy window who's state has not been eagerly merged.
+            // Do that now.
+            ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
+                contextFactory.forPremerge(window);
+            reduceFn.onMerge(premergeContext);
+            watermarkHold.onMerge(premergeContext);
+            activeWindows.merged(window);
+          }
+        }
+
+        // Add this window as NEW if it is not currently ACTIVE.
+        // If we had already seen this window and closed its trigger, then the
+        // window will not be currently ACTIVE. It will then be added as NEW here,
+        // and fall into the merging logic as usual.
+        activeWindows.ensureWindowExists(window);
+      }
+    }
+
+    // Merge all of the active windows and retain a mapping from source windows to result windows.
+    Map<W, W> windowToMergeResult = new HashMap<>();
+    activeWindows.merge(new OnMergeCallback(windowToMergeResult));
+    return windowToMergeResult;
+  }
+
+  private class OnMergeCallback implements ActiveWindowSet.MergeCallback<W> {
+    private final Map<W, W> windowToMergeResult;
+
+    OnMergeCallback(Map<W, W> windowToMergeResult) {
+      this.windowToMergeResult = windowToMergeResult;
+    }
+
+    /**
+     * Return the subset of {@code windows} which are currently ACTIVE. We only need to worry
+     * about merging state from ACTIVE windows. NEW windows by definition have no existing state.
+     */
+    private List<W> activeWindows(Iterable<W> windows) {
+      List<W> active = new ArrayList<>();
+      for (W window : windows) {
+        if (activeWindows.isActive(window)) {
+          active.add(window);
+        }
+      }
+      return active;
+    }
+
+    /**
+     * Called from the active window set to indicate {@code toBeMerged} (of which only
+     * {@code activeToBeMerged} are ACTIVE and thus have state associated with them) will later
+     * be merged into {@code mergeResult}.
+     */
+    @Override
+    public void prefetchOnMerge(
+        Collection<W> toBeMerged, W mergeResult) throws Exception {
+      List<W> activeToBeMerged = activeWindows(toBeMerged);
+      ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
+          contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT);
+      ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
+          contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED);
+
+      // Prefetch various state.
+      triggerRunner.prefetchForMerge(mergeResult, activeToBeMerged, directMergeContext.state());
+      reduceFn.prefetchOnMerge(renamedMergeContext.state());
+      watermarkHold.prefetchOnMerge(renamedMergeContext.state());
+      nonEmptyPanes.prefetchOnMerge(renamedMergeContext.state());
+    }
+
+    /**
+     * Called from the active window set to indicate {@code toBeMerged} (of which only
+     * {@code activeToBeMerged} are ACTIVE and thus have state associated with them) are about
+     * to be merged into {@code mergeResult}.
+     */
+    @Override
+    public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
+      // Remember we have merged these windows.
+      for (W window : toBeMerged) {
+        windowToMergeResult.put(window, mergeResult);
+      }
+
+      // At this point activeWindows has NOT incorporated the results of the merge.
+      List<W> activeToBeMerged = activeWindows(toBeMerged);
+      ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
+          contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT);
+      ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
+          contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED);
+
+      // Run the reduceFn to perform any needed merging.
+      reduceFn.onMerge(renamedMergeContext);
+
+      // Merge the watermark holds.
+      watermarkHold.onMerge(renamedMergeContext);
+
+      // Merge non-empty pane state.
+      nonEmptyPanes.onMerge(renamedMergeContext.state());
+
+      // Have the trigger merge state as needed.
+      triggerRunner.onMerge(
+          directMergeContext.window(), directMergeContext.timers(), directMergeContext.state());
+
+      for (W active : activeToBeMerged) {
+        if (active.equals(mergeResult)) {
+          // Not merged away.
+          continue;
+        }
+        // Cleanup flavor A: Currently ACTIVE window is about to be merged away.
+        // Clear any state not already cleared by the onMerge calls above.
+        WindowTracing.debug("ReduceFnRunner.onMerge: Merging {} into {}", active, mergeResult);
+        ReduceFn<K, InputT, OutputT, W>.Context directClearContext =
+            contextFactory.base(active, StateStyle.DIRECT);
+        // No need for the end-of-window or garbage collection timers.
+        // We will establish a new end-of-window or garbage collection timer for the mergeResult
+        // window in processElement below. There must be at least one element for the mergeResult
+        // window since a new element with a new window must have triggered this onMerge.
+        cancelEndOfWindowAndGarbageCollectionTimers(directClearContext);
+        // We no longer care about any previous panes of merged away windows. The
+        // merge result window gets to start fresh if it is new.
+        paneInfoTracker.clear(directClearContext.state());
+      }
+    }
+  }
+
+  /**
+   * Process an element.
+   *
+   * @param value the value being processed
+   * @return the set of windows in which the element was actually processed
+   */
+  private Collection<W> processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value)
+      throws Exception {
+    // Redirect element windows to the ACTIVE windows they have been merged into.
+    // The compressed representation (value, {window1, window2, ...}) actually represents
+    // distinct elements (value, window1), (value, window2), ...
+    // so if window1 and window2 merge, the resulting window will contain both copies
+    // of the value.
+    Collection<W> windows = new ArrayList<>();
+    for (BoundedWindow untypedWindow : value.getWindows()) {
+      @SuppressWarnings("unchecked")
+      W window = (W) untypedWindow;
+      W mergeResult = windowToMergeResult.get(window);
+      if (mergeResult == null) {
+        mergeResult = window;
+      }
+      windows.add(mergeResult);
+    }
+
+    // Prefetch in each of the windows if we're going to need to process triggers
+    for (W window : windows) {
+      ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
+          window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
+      triggerRunner.prefetchForValue(window, directContext.state());
+    }
+
+    // Process the element for each (mergeResultWindow, not closed) window it belongs to.
+    List<W> triggerableWindows = new ArrayList<>(windows.size());
+    for (W window : windows) {
+      ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
+          window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
+      if (triggerRunner.isClosed(directContext.state())) {
+        // This window has already been closed.
+        droppedDueToClosedWindow.addValue(1L);
+        WindowTracing.debug(
+            "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
+            + "since window is no longer active at inputWatermark:{}; outputWatermark:{}",
+            value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
+            timerInternals.currentOutputWatermarkTime());
+        continue;
+      }
+
+      triggerableWindows.add(window);
+      activeWindows.ensureWindowIsActive(window);
+      ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue(
+          window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
+
+      nonEmptyPanes.recordContent(renamedContext.state());
+
+      // Make sure we've scheduled the end-of-window or garbage collection timer for this window.
+      Instant timer = scheduleEndOfWindowOrGarbageCollectionTimer(directContext);
+
+      // Hold back progress of the output watermark until we have processed the pane this
+      // element will be included within. If the element is too late for that, place a hold at
+      // the end-of-window or garbage collection time to allow empty panes to contribute elements
+      // which won't be dropped due to lateness by a following computation (assuming the following
+      // computation uses the same allowed lateness value...)
+      @Nullable Instant hold = watermarkHold.addHolds(renamedContext);
+
+      if (hold != null) {
+        // Assert that holds have a proximate timer.
+        boolean holdInWindow = !hold.isAfter(window.maxTimestamp());
+        boolean timerInWindow = !timer.isAfter(window.maxTimestamp());
+        checkState(
+            holdInWindow == timerInWindow,
+            "set a hold at %s, a timer at %s, which disagree as to whether they are in window %s",
+            hold,
+            timer,
+            directContext.window());
+      }
+
+      // Execute the reduceFn, which will buffer the value as appropriate
+      reduceFn.processValue(renamedContext);
+
+      // Run the trigger to update its state
+      triggerRunner.processValue(
+          directContext.window(),
+          directContext.timestamp(),
+          directContext.timers(),
+          directContext.state());
+
+      // At this point, if triggerRunner.shouldFire before the processValue then
+      // triggerRunner.shouldFire after the processValue. In other words adding values
+      // cannot take a trigger state from firing to non-firing.
+      // (We don't actually assert this since it is too slow.)
+    }
+
+    return triggerableWindows;
+  }
+
+  /**
+   * Called when an end-of-window, garbage collection, or trigger-specific timer fires.
+   */
+  public void onTimer(TimerData timer) throws Exception {
+    // Which window is the timer for?
+    checkArgument(timer.getNamespace() instanceof WindowNamespace,
+        "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
+    @SuppressWarnings("unchecked")
+    WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
+    W window = windowNamespace.getWindow();
+    ReduceFn<K, InputT, OutputT, W>.Context directContext =
+        contextFactory.base(window, StateStyle.DIRECT);
+    ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
+        contextFactory.base(window, StateStyle.RENAMED);
+
+    // Has this window had its trigger finish?
+    // - The trigger may implement isClosed as constant false.
+    // - If the window function does not support windowing then all windows will be considered
+    // active.
+    // So we must take conjunction of activeWindows and triggerRunner state.
+    boolean windowIsActiveAndOpen =
+        activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state());
+
+    if (!windowIsActiveAndOpen) {
+      WindowTracing.debug(
+          "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
+    }
+
+    // If this is an end-of-window timer then we may need to set a garbage collection timer
+    // if allowed lateness is non-zero.
+    boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
+        && timer.getTimestamp().equals(window.maxTimestamp());
+
+    // If this is a garbage collection timer then we should trigger and garbage collect the window.
+    // We'll consider any timer at or after the end-of-window time to be a signal to garbage
+    // collect.
+    Instant cleanupTime = garbageCollectionTime(window);
+    boolean isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain()
+        && !timer.getTimestamp().isBefore(cleanupTime);
+
+    if (isGarbageCollection) {
+      WindowTracing.debug(
+          "ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
+          + "inputWatermark:{}; outputWatermark:{}",
+          key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
+          timerInternals.currentOutputWatermarkTime());
+
+      if (windowIsActiveAndOpen) {
+        // We need to call onTrigger to emit the final pane if required.
+        // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
+        // and the watermark has passed the end of the window.
+        @Nullable Instant newHold =
+            onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
+        checkState(newHold == null,
+            "Hold placed at %s despite isFinished being true.", newHold);
+      }
+
+      // Cleanup flavor B: Clear all the remaining state for this window since we'll never
+      // see elements for it again.
+      clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
+    } else {
+      WindowTracing.debug(
+          "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
+          + "inputWatermark:{}; outputWatermark:{}",
+          key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
+          timerInternals.currentOutputWatermarkTime());
+      if (windowIsActiveAndOpen) {
+        emitIfAppropriate(directContext, renamedContext);
+      }
+
+      if (isEndOfWindow) {
+        // If the window strategy trigger includes a watermark trigger then at this point
+        // there should be no data holds, either because we'd already cleared them on an
+        // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate.
+        // We could assert this but it is very expensive.
+
+        // Since we are processing an on-time firing we should schedule the garbage collection
+        // timer. (If getAllowedLateness is zero then the timer event will be considered a
+        // cleanup event and handled by the above).
+        // Note we must do this even if the trigger is finished so that we are sure to cleanup
+        // any final trigger finished bits.
+        checkState(
+            windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
+            "Unexpected zero getAllowedLateness");
+        WindowTracing.debug(
+            "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
+            + "inputWatermark:{}; outputWatermark:{}",
+            key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
+            timerInternals.currentOutputWatermarkTime());
+        checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+                                 "Cleanup time %s is beyond end-of-time", cleanupTime);
+        directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
+      }
+    }
+  }
+
+  /**
+   * Clear all the state associated with {@code context}'s window.
+   * Should only be invoked if we know all future elements for this window will be considered
+   * beyond allowed lateness.
+   * This is a superset of the clearing done by {@link #emitIfAppropriate} below since:
+   * <ol>
+   * <li>We can clear the trigger finished bits since we'll never need to ask if the trigger is
+   * closed again.
+   * <li>We can clear any remaining garbage collection hold.
+   * </ol>
+   */
+  private void clearAllState(
+      ReduceFn<K, InputT, OutputT, W>.Context directContext,
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
+      boolean windowIsActiveAndOpen)
+      throws Exception {
+    if (windowIsActiveAndOpen) {
+      // Since both the window is in the active window set AND the trigger was not yet closed,
+      // it is possible we still have state.
+      reduceFn.clearState(renamedContext);
+      watermarkHold.clearHolds(renamedContext);
+      nonEmptyPanes.clearPane(renamedContext.state());
+      // These calls work irrespective of whether the window is active or not, but
+      // are unnecessary if the window is not active.
+      triggerRunner.clearState(
+          directContext.window(), directContext.timers(), directContext.state());
+      paneInfoTracker.clear(directContext.state());
+    } else {
+      // If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2).
+      // For (1), if !activeWindows.isActive then the window must be merging and has been
+      // explicitly removed by emitIfAppropriate. But in that case the trigger must have fired
+      // and been closed, so this case reduces to (2).
+      // For (2), if triggerRunner.isClosed then the trigger was fired and entered the
+      // closed state. In that case emitIfAppropriate will have cleared all state in
+      // reduceFn, triggerRunner (except for finished bits), paneInfoTracker and activeWindows.
+      // We also know nonEmptyPanes must have been unconditionally cleared by the trigger.
+      // Since the trigger fired the existing watermark holds must have been cleared, and since
+      // the trigger closed no new end of window or garbage collection hold will have been
+      // placed by WatermarkHold.extractAndRelease.
+      // Thus all the state clearing above is unnecessary.
+      //
+      // But(!) for backwards compatibility we must allow a pipeline to be updated from
+      // an sdk version <= 1.3. In that case it is possible we have an end-of-window or
+      // garbage collection hold keyed by the current window (reached via directContext) rather
+      // than the state address window (reached via renamedContext).
+      // However this can only happen if:
+      // - We have merging windows.
+      // - We are DISCARDING_FIRED_PANES.
+      // - A pane has fired.
+      // - But the trigger is not (yet) closed.
+      if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES
+          && !windowingStrategy.getWindowFn().isNonMerging()) {
+        watermarkHold.clearHolds(directContext);
+      }
+    }
+
+    // Don't need to track address state windows anymore.
+    activeWindows.remove(directContext.window());
+    // We'll never need to test for the trigger being closed again.
+    triggerRunner.clearFinished(directContext.state());
+  }
+
+  /** Should the reduce function state be cleared? */
+  private boolean shouldDiscardAfterFiring(boolean isFinished) {
+    if (isFinished) {
+      // This is the last firing for trigger.
+      return true;
+    }
+    if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
+      // Nothing should be accumulated between panes.
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state.
+   */
+  private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directContext,
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
+      throws Exception {
+    if (!triggerRunner.shouldFire(
+        directContext.window(), directContext.timers(), directContext.state())) {
+      // Ignore unless trigger is ready to fire
+      return;
+    }
+
+    // Inform the trigger of the transition to see if it is finished
+    triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state());
+    boolean isFinished = triggerRunner.isClosed(directContext.state());
+
+    // Will be able to clear all element state after triggering?
+    boolean shouldDiscard = shouldDiscardAfterFiring(isFinished);
+
+    // Run onTrigger to produce the actual pane contents.
+    // As a side effect it will clear all element holds, but not necessarily any
+    // end-of-window or garbage collection holds.
+    onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/);
+
+    // Now that we've triggered, the pane is empty.
+    nonEmptyPanes.clearPane(renamedContext.state());
+
+    // Cleanup buffered data if appropriate
+    if (shouldDiscard) {
+      // Cleanup flavor C: The user does not want any buffered data to persist between panes.
+      reduceFn.clearState(renamedContext);
+    }
+
+    if (isFinished) {
+      // Cleanup flavor D: If trigger is closed we will ignore all new incoming elements.
+      // Clear state not otherwise cleared by onTrigger and clearPane above.
+      // Remember the trigger is, indeed, closed until the window is garbage collected.
+      triggerRunner.clearState(
+          directContext.window(), directContext.timers(), directContext.state());
+      paneInfoTracker.clear(directContext.state());
+      activeWindows.remove(directContext.window());
+    }
+  }
+
+  /**
+   * Do we need to emit a pane?
+   */
+  private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) {
+    if (!isEmpty) {
+      // The pane has elements.
+      return true;
+    }
+    if (timing == Timing.ON_TIME) {
+      // This is the unique ON_TIME pane.
+      return true;
+    }
+    if (isFinished && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_ALWAYS) {
+      // This is known to be the final pane, and the user has requested it even when empty.
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Run the {@link ReduceFn#onTrigger} method and produce any necessary output.
+   *
+   * @return output watermark hold added, or {@literal null} if none.
+   */
+  @Nullable
+  private Instant onTrigger(
+      final ReduceFn<K, InputT, OutputT, W>.Context directContext,
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
+      boolean isFinished, boolean isEndOfWindow)
+          throws Exception {
+    Instant inputWM = timerInternals.currentInputWatermarkTime();
+
+    // Prefetch necessary states
+    ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
+        watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
+    ReadableState<PaneInfo> paneFuture =
+        paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
+    ReadableState<Boolean> isEmptyFuture =
+        nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
+
+    reduceFn.prefetchOnTrigger(directContext.state());
+    triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
+
+    // Calculate the pane info.
+    final PaneInfo pane = paneFuture.read();
+    // Extract the window hold, and as a side effect clear it.
+
+    WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
+    final Instant outputTimestamp = pair.oldHold;
+    @Nullable Instant newHold = pair.newHold;
+
+    if (newHold != null) {
+      // We can't be finished yet.
+      checkState(
+        !isFinished, "new hold at %s but finished %s", newHold, directContext.window());
+      // The hold cannot be behind the input watermark.
+      checkState(
+        !newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM);
+      if (newHold.isAfter(directContext.window().maxTimestamp())) {
+        // The hold must be for garbage collection, which can't have happened yet.
+        checkState(
+          newHold.isEqual(garbageCollectionTime(directContext.window())),
+          "new hold %s should be at garbage collection for window %s plus %s",
+          newHold,
+          directContext.window(),
+          windowingStrategy.getAllowedLateness());
+      } else {
+        // The hold must be for the end-of-window, which can't have happened yet.
+        checkState(
+          newHold.isEqual(directContext.window().maxTimestamp()),
+          "new hold %s should be at end of window %s",
+          newHold,
+          directContext.window());
+        checkState(
+          !isEndOfWindow,
+          "new hold at %s for %s but this is the watermark trigger",
+          newHold,
+          directContext.window());
+      }
+    }
+
+    // Only emit a pane if it has data or empty panes are observable.
+    if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
+      // Run reduceFn.onTrigger method.
+      final List<W> windows = Collections.singletonList(directContext.window());
+      ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
+          contextFactory.forTrigger(directContext.window(), paneFuture, StateStyle.RENAMED,
+              new OnTriggerCallbacks<OutputT>() {
+                @Override
+                public void output(OutputT toOutput) {
+                  // We're going to output panes, so commit the (now used) PaneInfo.
+                  // TODO: This is unnecessary if the trigger isFinished since the saved
+                  // state will be immediately deleted.
+                  paneInfoTracker.storeCurrentPaneInfo(directContext, pane);
+
+                  // Output the actual value.
+                  outputter.outputWindowedValue(
+                      KV.of(key, toOutput), outputTimestamp, windows, pane);
+                }
+              });
+
+      reduceFn.onTrigger(renamedTriggerContext);
+    }
+
+    return newHold;
+  }
+
+  /**
+   * Make sure we'll eventually have a timer fire which will tell us to garbage collect
+   * the window state. For efficiency we may need to do this in two steps rather
+   * than one. Return the time at which the timer will fire.
+   *
+   * <ul>
+   * <li>If allowedLateness is zero then we'll garbage collect at the end of the window.
+   * For simplicity we'll set our own timer for this situation even though an
+   * {@link AfterWatermark} trigger may have also set an end-of-window timer.
+   * ({@code setTimer} is idempotent.)
+   * <li>If allowedLateness is non-zero then we could just always set a timer for the garbage
+   * collection time. However if the windows are large (eg hourly) and the allowedLateness is small
+   * (eg seconds) then we'll end up with nearly twice the number of timers in-flight. So we
+   * instead set an end-of-window timer and then roll that forward to a garbage collection timer
+   * when it fires. We use the input watermark to distinguish those cases.
+   * </ul>
+   */
+  private Instant scheduleEndOfWindowOrGarbageCollectionTimer(
+      ReduceFn<?, ?, ?, W>.Context directContext) {
+    Instant inputWM = timerInternals.currentInputWatermarkTime();
+    Instant endOfWindow = directContext.window().maxTimestamp();
+    String which;
+    Instant timer;
+    if (endOfWindow.isBefore(inputWM)) {
+      timer = garbageCollectionTime(directContext.window());
+      which = "garbage collection";
+    } else {
+      timer = endOfWindow;
+      which = "end-of-window";
+    }
+    WindowTracing.trace(
+        "ReduceFnRunner.scheduleEndOfWindowOrGarbageCollectionTimer: Scheduling {} timer at {} for "
+        + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
+        which,
+        timer,
+        key,
+        directContext.window(),
+        inputWM,
+        timerInternals.currentOutputWatermarkTime());
+    checkState(!timer.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+                             "Timer %s is beyond end-of-time", timer);
+    directContext.timers().setTimer(timer, TimeDomain.EVENT_TIME);
+    return timer;
+  }
+
+  private void cancelEndOfWindowAndGarbageCollectionTimers(
+      ReduceFn<?, ?, ?, W>.Context directContext) {
+    WindowTracing.debug(
+        "ReduceFnRunner.cancelEndOfWindowAndGarbageCollectionTimers: Deleting timers for "
+        + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
+        key, directContext.window(), timerInternals.currentInputWatermarkTime(),
+        timerInternals.currentOutputWatermarkTime());
+    Instant eow = directContext.window().maxTimestamp();
+    directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
+    Instant gc = garbageCollectionTime(directContext.window());
+    if (gc.isAfter(eow)) {
+      directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
+    }
+  }
+
+  /**
+   * Return when {@code window} should be garbage collected. If the window's expiration time is on
+   * or after the end of the global window, it will be truncated to the end of the global window.
+   */
+  private Instant garbageCollectionTime(W window) {
+
+    // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the
+    // global window, then we truncate it. The conditional is phrased like it is because the
+    // addition of EOW + allowed lateness might even overflow the maximum allowed Instant
+    if (GlobalWindow.INSTANCE
+        .maxTimestamp()
+        .minus(windowingStrategy.getAllowedLateness())
+        .isBefore(window.maxTimestamp())) {
+      return GlobalWindow.INSTANCE.maxTimestamp();
+    } else {
+      return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+    }
+  }
+
+  /**
+   * An object that can output a value with all of its windowing information. This is a deliberately
+   * restricted subinterface of {@link WindowingInternals} to express how it is used here.
+   */
+  private interface OutputWindowedValue<OutputT> {
+    void outputWindowedValue(OutputT output, Instant timestamp,
+        Collection<? extends BoundedWindow> windows, PaneInfo pane);
+  }
+
+  private static class OutputViaWindowingInternals<OutputT>
+      implements OutputWindowedValue<OutputT> {
+
+    private final WindowingInternals<?, OutputT> windowingInternals;
+
+    public OutputViaWindowingInternals(WindowingInternals<?, OutputT> windowingInternals) {
+      this.windowingInternals = windowingInternals;
+    }
+
+    @Override
+    public void outputWindowedValue(
+        OutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      windowingInternals.outputWindowedValue(output, timestamp, windows, pane);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
new file mode 100644
index 0000000..df74ed3
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.List;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.util.ExecutionContext.StepContext;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Runs a {@link OldDoFn} by constructing the appropriate contexts and passing them in.
+ *
+ * @param <InputT> the type of the {@link OldDoFn} (main) input elements
+ * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
+ */
+public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, OutputT> {
+
+  protected SimpleDoFnRunner(PipelineOptions options, OldDoFn<InputT, OutputT> fn,
+      SideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext,
+      AggregatorFactory aggregatorFactory, WindowingStrategy<?, ?> windowingStrategy) {
+    super(options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext,
+        aggregatorFactory, windowingStrategy);
+  }
+
+  @Override
+  protected void invokeProcessElement(WindowedValue<InputT> elem) {
+    final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
+    // This can contain user code. Wrap it in case it throws an exception.
+    try {
+      fn.processElement(processContext);
+    } catch (Exception ex) {
+      throw wrapUserCodeException(ex);
+    }
+  }
+}