You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/22 02:22:24 UTC

[10/12] incubator-beam git commit: Move some easy stuff into runners/core-java

Move some easy stuff into runners/core-java

This change moves a set of classes with no dependents,
leaving them in the same Java packages for now.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0fef8e63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0fef8e63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0fef8e63

Branch: refs/heads/master
Commit: 0fef8e6349216374ef60ef1d3356bdbdcc6f32ee
Parents: efaad32
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jun 20 11:54:20 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 21 14:09:49 2016 -0700

----------------------------------------------------------------------
 runners/core-java/pom.xml                       |   30 +
 .../org/apache/beam/sdk/util/AssignWindows.java |   46 +
 .../apache/beam/sdk/util/AssignWindowsDoFn.java |   75 +
 .../beam/sdk/util/BatchTimerInternals.java      |  140 ++
 .../org/apache/beam/sdk/util/DoFnRunner.java    |   62 +
 .../apache/beam/sdk/util/DoFnRunnerBase.java    |  558 +++++++
 .../org/apache/beam/sdk/util/DoFnRunners.java   |  144 ++
 .../beam/sdk/util/GroupAlsoByWindowsDoFn.java   |   59 +
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |  100 ++
 .../sdk/util/GroupByKeyViaGroupByKeyOnly.java   |  212 +++
 .../sdk/util/LateDataDroppingDoFnRunner.java    |  147 ++
 .../org/apache/beam/sdk/util/NonEmptyPanes.java |  150 ++
 .../apache/beam/sdk/util/PaneInfoTracker.java   |  154 ++
 .../sdk/util/PushbackSideInputDoFnRunner.java   |  115 ++
 .../java/org/apache/beam/sdk/util/ReduceFn.java |  130 ++
 .../beam/sdk/util/ReduceFnContextFactory.java   |  497 ++++++
 .../apache/beam/sdk/util/ReduceFnRunner.java    |  985 ++++++++++++
 .../apache/beam/sdk/util/SimpleDoFnRunner.java  |   56 +
 .../apache/beam/sdk/util/SystemReduceFn.java    |  135 ++
 .../org/apache/beam/sdk/util/TriggerRunner.java |  234 +++
 .../org/apache/beam/sdk/util/WatermarkHold.java |  536 +++++++
 .../beam/sdk/util/BatchTimerInternalsTest.java  |  118 ++
 .../sdk/util/GroupAlsoByWindowsProperties.java  |  619 ++++++++
 ...oupAlsoByWindowsViaOutputBufferDoFnTest.java |  106 ++
 .../util/LateDataDroppingDoFnRunnerTest.java    |  117 ++
 .../util/PushbackSideInputDoFnRunnerTest.java   |  234 +++
 .../beam/sdk/util/ReduceFnRunnerTest.java       | 1448 ++++++++++++++++++
 .../apache/beam/sdk/util/ReduceFnTester.java    |  784 ++++++++++
 .../beam/sdk/util/SimpleDoFnRunnerTest.java     |   86 ++
 .../beam/runners/direct/DirectGroupByKey.java   |    2 +-
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |    2 +-
 .../GroupByKeyOnlyEvaluatorFactoryTest.java     |    2 +-
 .../org/apache/beam/sdk/util/AssignWindows.java |   46 -
 .../apache/beam/sdk/util/AssignWindowsDoFn.java |   75 -
 .../beam/sdk/util/BatchTimerInternals.java      |  140 --
 .../org/apache/beam/sdk/util/DoFnRunner.java    |   62 -
 .../apache/beam/sdk/util/DoFnRunnerBase.java    |  558 -------
 .../org/apache/beam/sdk/util/DoFnRunners.java   |  144 --
 .../apache/beam/sdk/util/GatherAllPanes.java    |    1 -
 .../beam/sdk/util/GroupAlsoByWindowsDoFn.java   |   59 -
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |  100 --
 .../sdk/util/GroupByKeyViaGroupByKeyOnly.java   |  247 ---
 .../sdk/util/LateDataDroppingDoFnRunner.java    |  147 --
 .../org/apache/beam/sdk/util/NonEmptyPanes.java |  150 --
 .../apache/beam/sdk/util/PaneInfoTracker.java   |  154 --
 .../sdk/util/PushbackSideInputDoFnRunner.java   |  115 --
 .../java/org/apache/beam/sdk/util/ReduceFn.java |  130 --
 .../beam/sdk/util/ReduceFnContextFactory.java   |  497 ------
 .../apache/beam/sdk/util/ReduceFnRunner.java    |  985 ------------
 .../sdk/util/ReifyTimestampsAndWindows.java     |   63 +
 .../apache/beam/sdk/util/SimpleDoFnRunner.java  |   56 -
 .../apache/beam/sdk/util/SystemReduceFn.java    |  135 --
 .../org/apache/beam/sdk/util/TriggerRunner.java |  234 ---
 .../org/apache/beam/sdk/util/WatermarkHold.java |  536 -------
 .../beam/sdk/util/BatchTimerInternalsTest.java  |  118 --
 .../sdk/util/GroupAlsoByWindowsProperties.java  |  619 --------
 ...oupAlsoByWindowsViaOutputBufferDoFnTest.java |  106 --
 .../util/LateDataDroppingDoFnRunnerTest.java    |  117 --
 .../util/PushbackSideInputDoFnRunnerTest.java   |  234 ---
 .../beam/sdk/util/ReduceFnRunnerTest.java       | 1448 ------------------
 .../apache/beam/sdk/util/ReduceFnTester.java    |  784 ----------
 .../beam/sdk/util/SimpleDoFnRunnerTest.java     |   86 --
 62 files changed, 8143 insertions(+), 8086 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index 8ede60b..1587a1a 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -197,7 +197,31 @@
 
     <!-- build dependencies -->
 
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
     <!-- test dependencies -->
+
+    <!-- Utilities such as WindowMatchers -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
@@ -205,6 +229,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java
new file mode 100644
index 0000000..af28052
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * {@link PTransform} that uses privileged (non-user-facing) APIs to assign elements of a
+ * {@link PCollection} to windows according to the provided {@link WindowFn}.
+ *
+ * @param <T> Type of elements being windowed
+ * @param <W> Window type
+ */
+public class AssignWindows<T, W extends BoundedWindow>
+    extends PTransform<PCollection<T>, PCollection<T>> {
+
+  private WindowFn<? super T, W> fn;
+
+  public AssignWindows(WindowFn<? super T, W> fn) {
+    this.fn = fn;
+  }
+
+  @Override
+  public PCollection<T> apply(PCollection<T> input) {
+    return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
new file mode 100644
index 0000000..caec40e
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+/**
+ * {@link DoFn} that tags elements of a PCollection with windows, according
+ * to the provided {@link WindowFn}.
+ * @param <T> Type of elements being windowed
+ * @param <W> Window type
+ */
+@SystemDoFnInternal
+public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T> {
+  private WindowFn<? super T, W> fn;
+
+  public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
+    this.fn =
+        checkNotNull(
+            fn,
+            "%s provided to %s cannot be null",
+            WindowFn.class.getSimpleName(),
+            AssignWindowsDoFn.class.getSimpleName());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void processElement(final ProcessContext c) throws Exception {
+    Collection<W> windows =
+        ((WindowFn<T, W>) fn).assignWindows(
+            ((WindowFn<T, W>) fn).new AssignContext() {
+                @Override
+                public T element() {
+                  return c.element();
+                }
+
+                @Override
+                public Instant timestamp() {
+                  return c.timestamp();
+                }
+
+                @Override
+                public Collection<? extends BoundedWindow> windows() {
+                  return c.windowingInternals().windows();
+                }
+              });
+
+    c.windowingInternals()
+        .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java
new file mode 100644
index 0000000..d0c0b2f
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+
+import org.joda.time.Instant;
+
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+/**
+ * TimerInternals that uses priority queues to manage the timers that are ready to fire.
+ */
+public class BatchTimerInternals implements TimerInternals {
+  /** Set of timers that are scheduled used for deduplicating timers. */
+  private Set<TimerData> existingTimers = new HashSet<>();
+
+  // Keep these queues separate so we can advance over them separately.
+  private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
+  private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
+
+  private Instant inputWatermarkTime;
+  private Instant processingTime;
+
+  private PriorityQueue<TimerData> queue(TimeDomain domain) {
+    return TimeDomain.EVENT_TIME.equals(domain) ? watermarkTimers : processingTimers;
+  }
+
+  public BatchTimerInternals(Instant processingTime) {
+    this.processingTime = processingTime;
+    this.inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+  }
+
+  @Override
+  public void setTimer(TimerData timer) {
+    if (existingTimers.add(timer)) {
+      queue(timer.getDomain()).add(timer);
+    }
+  }
+
+  @Override
+  public void deleteTimer(TimerData timer) {
+    existingTimers.remove(timer);
+    queue(timer.getDomain()).remove(timer);
+  }
+
+  @Override
+  public Instant currentProcessingTime() {
+    return processingTime;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return {@link BoundedWindow#TIMESTAMP_MAX_VALUE}: in batch mode, upstream processing
+   * is already complete.
+   */
+  @Override
+  @Nullable
+  public Instant currentSynchronizedProcessingTime() {
+    return BoundedWindow.TIMESTAMP_MAX_VALUE;
+  }
+
+  @Override
+  public Instant currentInputWatermarkTime() {
+    return inputWatermarkTime;
+  }
+
+  @Override
+  @Nullable
+  public Instant currentOutputWatermarkTime() {
+    // The output watermark is always undefined in batch mode.
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(getClass())
+        .add("watermarkTimers", watermarkTimers)
+        .add("processingTimers", processingTimers)
+        .toString();
+  }
+
+  public void advanceInputWatermark(ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark)
+      throws Exception {
+    Preconditions.checkState(!newInputWatermark.isBefore(inputWatermarkTime),
+        "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
+        newInputWatermark);
+    inputWatermarkTime = newInputWatermark;
+    advance(runner, newInputWatermark, TimeDomain.EVENT_TIME);
+  }
+
+  public void advanceProcessingTime(ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime)
+      throws Exception {
+    Preconditions.checkState(!newProcessingTime.isBefore(processingTime),
+        "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime);
+    processingTime = newProcessingTime;
+    advance(runner, newProcessingTime, TimeDomain.PROCESSING_TIME);
+  }
+
+  private void advance(ReduceFnRunner<?, ?, ?, ?> runner, Instant newTime, TimeDomain domain)
+      throws Exception {
+    PriorityQueue<TimerData> timers = queue(domain);
+    boolean shouldFire = false;
+
+    do {
+      TimerData timer = timers.peek();
+      // Timers fire if the new time is ahead of the timer
+      shouldFire = timer != null && newTime.isAfter(timer.getTimestamp());
+      if (shouldFire) {
+        // Remove before firing, so that if the trigger adds another identical
+        // timer we don't remove it.
+        timers.remove();
+        runner.onTimer(timer);
+      }
+    } while (shouldFire);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java
new file mode 100644
index 0000000..4ec8920
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * An wrapper interface that represents the execution of a {@link DoFn}.
+ */
+public interface DoFnRunner<InputT, OutputT> {
+  /**
+   * Prepares and calls {@link DoFn#startBundle}.
+   */
+  public void startBundle();
+
+  /**
+   * Calls {@link DoFn#processElement} with a {@link ProcessContext} containing the current element.
+   */
+  public void processElement(WindowedValue<InputT> elem);
+
+  /**
+   * Calls {@link DoFn#finishBundle} and performs additional tasks, such as
+   * flushing in-memory states.
+   */
+  public void finishBundle();
+
+  /**
+   * An internal interface for signaling that a {@link DoFn} requires late data dropping.
+   */
+  public interface ReduceFnExecutor<K, InputT, OutputT, W> {
+    /**
+     * Gets this object as a {@link DoFn}.
+     *
+     * Most implementors of this interface are expected to be {@link DoFn} instances, and will
+     * return themselves.
+     */
+    DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn();
+
+    /**
+     * Returns an aggregator that tracks elements that are dropped due to being late.
+     */
+    Aggregator<Long, Long> getDroppedDueToLatenessAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
new file mode 100644
index 0000000..1ebe72b
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.util.ExecutionContext.StepContext;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A base implementation of {@link DoFnRunner}.
+ *
+ * <p> Sub-classes should override {@link #invokeProcessElement}.
+ */
+public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+
+  /** The DoFn being run. */
+  public final DoFn<InputT, OutputT> fn;
+
+  /** The context used for running the DoFn. */
+  public final DoFnContext<InputT, OutputT> context;
+
+  protected DoFnRunnerBase(
+      PipelineOptions options,
+      DoFn<InputT, OutputT> fn,
+      SideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      StepContext stepContext,
+      CounterSet.AddCounterMutator addCounterMutator,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    this.fn = fn;
+    this.context = new DoFnContext<>(
+        options,
+        fn,
+        sideInputReader,
+        outputManager,
+        mainOutputTag,
+        sideOutputTags,
+        stepContext,
+        addCounterMutator,
+        windowingStrategy == null ? null : windowingStrategy.getWindowFn());
+  }
+
+  /**
+   * An implementation of {@code OutputManager} using simple lists, for testing and in-memory
+   * contexts such as the {@link DirectRunner}.
+   */
+  public static class ListOutputManager implements OutputManager {
+
+    private Map<TupleTag<?>, List<WindowedValue<?>>> outputLists = Maps.newHashMap();
+
+    @Override
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      List<WindowedValue<T>> outputList = (List) outputLists.get(tag);
+
+      if (outputList == null) {
+        outputList = Lists.newArrayList();
+        @SuppressWarnings({"rawtypes", "unchecked"})
+        List<WindowedValue<?>> untypedList = (List) outputList;
+        outputLists.put(tag, untypedList);
+      }
+
+      outputList.add(output);
+    }
+
+    public <T> List<WindowedValue<T>> getOutput(TupleTag<T> tag) {
+      // Safe cast by design, inexpressible in Java without rawtypes
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      List<WindowedValue<T>> outputList = (List) outputLists.get(tag);
+      return (outputList != null) ? outputList : Collections.<WindowedValue<T>>emptyList();
+    }
+  }
+
+  @Override
+  public void startBundle() {
+    // This can contain user code. Wrap it in case it throws an exception.
+    try {
+      fn.startBundle(context);
+    } catch (Throwable t) {
+      // Exception in user code.
+      throw wrapUserCodeException(t);
+    }
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> elem) {
+    if (elem.getWindows().size() <= 1
+        || (!RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
+            && context.sideInputReader.isEmpty())) {
+      invokeProcessElement(elem);
+    } else {
+      // We could modify the windowed value (and the processContext) to
+      // avoid repeated allocations, but this is more straightforward.
+      for (WindowedValue<InputT> windowedValue : elem.explodeWindows()) {
+        invokeProcessElement(windowedValue);
+      }
+    }
+  }
+
+  /**
+   * Invokes {@link DoFn#processElement} after certain pre-processings has been done in
+   * {@link DoFnRunnerBase#processElement}.
+   */
+  protected abstract void invokeProcessElement(WindowedValue<InputT> elem);
+
+  @Override
+  public void finishBundle() {
+    // This can contain user code. Wrap it in case it throws an exception.
+    try {
+      fn.finishBundle(context);
+    } catch (Throwable t) {
+      // Exception in user code.
+      throw wrapUserCodeException(t);
+    }
+  }
+
+  /**
+   * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}.
+   *
+   * @param <InputT> the type of the DoFn's (main) input elements
+   * @param <OutputT> the type of the DoFn's (main) output elements
+   */
+  private static class DoFnContext<InputT, OutputT>
+      extends DoFn<InputT, OutputT>.Context {
+    private static final int MAX_SIDE_OUTPUTS = 1000;
+
+    final PipelineOptions options;
+    final DoFn<InputT, OutputT> fn;
+    final SideInputReader sideInputReader;
+    final OutputManager outputManager;
+    final TupleTag<OutputT> mainOutputTag;
+    final StepContext stepContext;
+    final CounterSet.AddCounterMutator addCounterMutator;
+    final WindowFn<?, ?> windowFn;
+
+    /**
+     * The set of known output tags, some of which may be undeclared, so we can throw an
+     * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}.
+     */
+    private Set<TupleTag<?>> outputTags;
+
+    public DoFnContext(PipelineOptions options,
+                       DoFn<InputT, OutputT> fn,
+                       SideInputReader sideInputReader,
+                       OutputManager outputManager,
+                       TupleTag<OutputT> mainOutputTag,
+                       List<TupleTag<?>> sideOutputTags,
+                       StepContext stepContext,
+                       CounterSet.AddCounterMutator addCounterMutator,
+                       WindowFn<?, ?> windowFn) {
+      fn.super();
+      this.options = options;
+      this.fn = fn;
+      this.sideInputReader = sideInputReader;
+      this.outputManager = outputManager;
+      this.mainOutputTag = mainOutputTag;
+      this.outputTags = Sets.newHashSet();
+
+      outputTags.add(mainOutputTag);
+      for (TupleTag<?> sideOutputTag : sideOutputTags) {
+        outputTags.add(sideOutputTag);
+      }
+
+      this.stepContext = stepContext;
+      this.addCounterMutator = addCounterMutator;
+      this.windowFn = windowFn;
+      super.setupDelegateAggregators();
+    }
+
+    //////////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return options;
+    }
+
+    <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
+        T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
+      final Instant inputTimestamp = timestamp;
+
+      if (timestamp == null) {
+        timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      }
+
+      if (windows == null) {
+        try {
+          // The windowFn can never succeed at accessing the element, so its type does not
+          // matter here
+          @SuppressWarnings("unchecked")
+          WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
+          windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() {
+            @Override
+            public Object element() {
+              throw new UnsupportedOperationException(
+                  "WindowFn attempted to access input element when none was available");
+            }
+
+            @Override
+            public Instant timestamp() {
+              if (inputTimestamp == null) {
+                throw new UnsupportedOperationException(
+                    "WindowFn attempted to access input timestamp when none was available");
+              }
+              return inputTimestamp;
+            }
+
+            @Override
+            public Collection<? extends BoundedWindow> windows() {
+              throw new UnsupportedOperationException(
+                  "WindowFn attempted to access input windows when none were available");
+            }
+          });
+        } catch (Exception e) {
+          throw UserCodeException.wrap(e);
+        }
+      }
+
+      return WindowedValue.of(output, timestamp, windows, pane);
+    }
+
+    public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+      if (!sideInputReader.contains(view)) {
+        throw new IllegalArgumentException("calling sideInput() with unknown view");
+      }
+      BoundedWindow sideInputWindow =
+          view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
+      return sideInputReader.get(view, sideInputWindow);
+    }
+
+    void outputWindowedValue(
+        OutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
+    }
+
+    void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
+      outputManager.output(mainOutputTag, windowedElem);
+      if (stepContext != null) {
+        stepContext.noteOutput(windowedElem);
+      }
+    }
+
+    protected <T> void sideOutputWindowedValue(TupleTag<T> tag,
+                                               T output,
+                                               Instant timestamp,
+                                               Collection<? extends BoundedWindow> windows,
+                                               PaneInfo pane) {
+      sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
+    }
+
+    protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
+      if (!outputTags.contains(tag)) {
+        // This tag wasn't declared nor was it seen before during this execution.
+        // Thus, this must be a new, undeclared and unconsumed output.
+        // To prevent likely user errors, enforce the limit on the number of side
+        // outputs.
+        if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
+          throw new IllegalArgumentException(
+              "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
+        }
+        outputTags.add(tag);
+      }
+
+      outputManager.output(tag, windowedElem);
+      if (stepContext != null) {
+        stepContext.noteSideOutput(tag, windowedElem);
+      }
+    }
+
+    // Following implementations of output, outputWithTimestamp, and sideOutput
+    // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by
+    // ProcessContext's versions in DoFn.processElement.
+    @Override
+    public void output(OutputT output) {
+      outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      Preconditions.checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
+      sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      Preconditions.checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
+      sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
+    }
+
+    private String generateInternalAggregatorName(String userName) {
+      boolean system = fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
+      return (system ? "" : "user-") + stepContext.getStepName() + "-" + userName;
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      Preconditions.checkNotNull(combiner,
+          "Combiner passed to createAggregator cannot be null");
+      return new CounterAggregator<>(generateInternalAggregatorName(name),
+          combiner, addCounterMutator);
+    }
+  }
+
+  /**
+   * Returns a new {@code DoFn.ProcessContext} for the given element.
+   */
+  protected DoFn<InputT, OutputT>.ProcessContext createProcessContext(WindowedValue<InputT> elem) {
+    return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
+  }
+
+  protected RuntimeException wrapUserCodeException(Throwable t) {
+    throw UserCodeException.wrapIf(!isSystemDoFn(), t);
+  }
+
+  private boolean isSystemDoFn() {
+    return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
+  }
+
+  /**
+   * A concrete implementation of {@code DoFn.ProcessContext} used for
+   * running a {@link DoFn} over a single element.
+   *
+   * @param <InputT> the type of the DoFn's (main) input elements
+   * @param <OutputT> the type of the DoFn's (main) output elements
+   */
+  static class DoFnProcessContext<InputT, OutputT>
+      extends DoFn<InputT, OutputT>.ProcessContext {
+
+
+    final DoFn<InputT, OutputT> fn;
+    final DoFnContext<InputT, OutputT> context;
+    final WindowedValue<InputT> windowedValue;
+
+    public DoFnProcessContext(DoFn<InputT, OutputT> fn,
+                              DoFnContext<InputT, OutputT> context,
+                              WindowedValue<InputT> windowedValue) {
+      fn.super();
+      this.fn = fn;
+      this.context = context;
+      this.windowedValue = windowedValue;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public InputT element() {
+      return windowedValue.getValue();
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      Preconditions.checkNotNull(view, "View passed to sideInput cannot be null");
+      Iterator<? extends BoundedWindow> windowIter = windows().iterator();
+      BoundedWindow window;
+      if (!windowIter.hasNext()) {
+        if (context.windowFn instanceof GlobalWindows) {
+          // TODO: Remove this once GroupByKeyOnly no longer outputs elements
+          // without windows
+          window = GlobalWindow.INSTANCE;
+        } else {
+          throw new IllegalStateException(
+              "sideInput called when main input element is not in any windows");
+        }
+      } else {
+        window = windowIter.next();
+        if (windowIter.hasNext()) {
+          throw new IllegalStateException(
+              "sideInput called when main input element is in multiple windows");
+        }
+      }
+      return context.sideInput(view, window);
+    }
+
+    @Override
+    public BoundedWindow window() {
+      if (!(fn instanceof RequiresWindowAccess)) {
+        throw new UnsupportedOperationException(
+            "window() is only available in the context of a DoFn marked as RequiresWindow.");
+      }
+      return Iterables.getOnlyElement(windows());
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return windowedValue.getPane();
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.outputWindowedValue(windowedValue.withValue(output));
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      checkTimestamp(timestamp);
+      context.outputWindowedValue(output, timestamp,
+          windowedValue.getWindows(), windowedValue.getPane());
+    }
+
+    void outputWindowedValue(
+        OutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      context.outputWindowedValue(output, timestamp, windows, pane);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      Preconditions.checkNotNull(tag, "Tag passed to sideOutput cannot be null");
+      context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      Preconditions.checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
+      checkTimestamp(timestamp);
+      context.sideOutputWindowedValue(
+          tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
+    }
+
+    @Override
+    public Instant timestamp() {
+      return windowedValue.getTimestamp();
+    }
+
+    public Collection<? extends BoundedWindow> windows() {
+      return windowedValue.getWindows();
+    }
+
+    private void checkTimestamp(Instant timestamp) {
+      if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
+        throw new IllegalArgumentException(String.format(
+            "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+            + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
+            + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.",
+            timestamp, windowedValue.getTimestamp(),
+            PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
+      }
+    }
+
+    @Override
+    public WindowingInternals<InputT, OutputT> windowingInternals() {
+      return new WindowingInternals<InputT, OutputT>() {
+        @Override
+        public void outputWindowedValue(OutputT output, Instant timestamp,
+            Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+          context.outputWindowedValue(output, timestamp, windows, pane);
+        }
+
+        @Override
+        public Collection<? extends BoundedWindow> windows() {
+          return windowedValue.getWindows();
+        }
+
+        @Override
+        public PaneInfo pane() {
+          return windowedValue.getPane();
+        }
+
+        @Override
+        public TimerInternals timerInternals() {
+          return context.stepContext.timerInternals();
+        }
+
+        @Override
+        public <T> void writePCollectionViewData(
+            TupleTag<?> tag,
+            Iterable<WindowedValue<T>> data,
+            Coder<T> elemCoder) throws IOException {
+          @SuppressWarnings("unchecked")
+          Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder();
+
+          context.stepContext.writePCollectionViewData(
+              tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)),
+              window(), windowCoder);
+        }
+
+        @Override
+        public StateInternals<?> stateInternals() {
+          return context.stepContext.stateInternals();
+        }
+
+        @Override
+        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+          return context.sideInput(view, mainInputWindow);
+        }
+      };
+    }
+
+    @Override
+    protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
+        createAggregatorInternal(
+            String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
+      return context.createAggregatorInternal(name, combiner);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
new file mode 100644
index 0000000..648a281
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
+import org.apache.beam.sdk.util.ExecutionContext.StepContext;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.common.CounterSet.AddCounterMutator;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.List;
+
+/**
+ * Static utility methods that provide {@link DoFnRunner} implementations.
+ */
+public class DoFnRunners {
+  /**
+   * Information about how to create output receivers and output to them.
+   */
+  public interface OutputManager {
+    /**
+     * Outputs a single element to the receiver indicated by the given {@link TupleTag}.
+     */
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output);
+  }
+
+  /**
+   * Returns a basic implementation of {@link DoFnRunner} that works for most {@link DoFn DoFns}.
+   *
+   * <p>It invokes {@link DoFn#processElement} for each input.
+   */
+  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
+      PipelineOptions options,
+      DoFn<InputT, OutputT> fn,
+      SideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      StepContext stepContext,
+      CounterSet.AddCounterMutator addCounterMutator,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    return new SimpleDoFnRunner<>(
+        options,
+        fn,
+        sideInputReader,
+        outputManager,
+        mainOutputTag,
+        sideOutputTags,
+        stepContext,
+        addCounterMutator,
+        windowingStrategy);
+  }
+
+  /**
+   * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
+   *
+   * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}.
+   */
+  public static <K, InputT, OutputT, W extends BoundedWindow>
+      DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
+          PipelineOptions options,
+          ReduceFnExecutor<K, InputT, OutputT, W> reduceFnExecutor,
+          SideInputReader sideInputReader,
+          OutputManager outputManager,
+          TupleTag<KV<K, OutputT>> mainOutputTag,
+          List<TupleTag<?>> sideOutputTags,
+          StepContext stepContext,
+          CounterSet.AddCounterMutator addCounterMutator,
+          WindowingStrategy<?, W> windowingStrategy) {
+    DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> simpleDoFnRunner =
+        simpleRunner(
+            options,
+            reduceFnExecutor.asDoFn(),
+            sideInputReader,
+            outputManager,
+            mainOutputTag,
+            sideOutputTags,
+            stepContext,
+            addCounterMutator,
+            windowingStrategy);
+    return new LateDataDroppingDoFnRunner<>(
+        simpleDoFnRunner,
+        windowingStrategy,
+        stepContext.timerInternals(),
+        reduceFnExecutor.getDroppedDueToLatenessAggregator());
+  }
+
+  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
+      PipelineOptions options,
+      DoFn<InputT, OutputT> doFn,
+      SideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      StepContext stepContext,
+      AddCounterMutator addCounterMutator,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    if (doFn instanceof ReduceFnExecutor) {
+      @SuppressWarnings("rawtypes")
+      ReduceFnExecutor fn = (ReduceFnExecutor) doFn;
+      @SuppressWarnings({"unchecked", "cast", "rawtypes"})
+      DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner(
+          options,
+          fn,
+          sideInputReader,
+          outputManager,
+          (TupleTag) mainOutputTag,
+          sideOutputTags,
+          stepContext,
+          addCounterMutator,
+          (WindowingStrategy) windowingStrategy);
+      return runner;
+    }
+    return simpleRunner(
+        options,
+        doFn,
+        sideInputReader,
+        outputManager,
+        mainOutputTag,
+        sideOutputTags,
+        stepContext,
+        addCounterMutator,
+        windowingStrategy);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
new file mode 100644
index 0000000..f5de0bc
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * DoFn that merges windows and groups elements in those windows, optionally
+ * combining values.
+ *
+ * @param <K> key type
+ * @param <InputT> input value element type
+ * @param <OutputT> output value element type
+ * @param <W> window type
+ */
+@SystemDoFnInternal
+public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow>
+    extends DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
+  public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
+  public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
+
+  protected final Aggregator<Long, Long> droppedDueToClosedWindow =
+      createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
+  protected final Aggregator<Long, Long> droppedDueToLateness =
+      createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
+
+  /**
+   * Create the default {@link GroupAlsoByWindowsDoFn}, which uses window sets to implement the
+   * grouping.
+   *
+   * @param windowingStrategy The window function and trigger to use for grouping
+   * @param inputCoder the input coder to use
+   */
+  public static <K, V, W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, V, Iterable<V>, W>
+      createDefault(WindowingStrategy<?, W> windowingStrategy, Coder<V> inputCoder) {
+    return new GroupAlsoByWindowsViaOutputBufferDoFn<>(
+        windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
new file mode 100644
index 0000000..d364168
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.common.collect.Iterables;
+
+import org.joda.time.Instant;
+
+import java.util.List;
+
+/**
+ * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no specialized "fast path"
+ * implementation is applicable.
+ */
+@SystemDoFnInternal
+public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow>
+   extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
+
+  private final WindowingStrategy<?, W> strategy;
+  private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
+
+  public GroupAlsoByWindowsViaOutputBufferDoFn(
+      WindowingStrategy<?, W> windowingStrategy,
+      SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
+    this.strategy = windowingStrategy;
+    this.reduceFn = reduceFn;
+  }
+
+  @Override
+  public void processElement(
+      DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext c)
+          throws Exception {
+    K key = c.element().getKey();
+    // Used with Batch, we know that all the data is available for this key. We can't use the
+    // timer manager from the context because it doesn't exist. So we create one and emulate the
+    // watermark, knowing that we have all data and it is in timestamp order.
+    BatchTimerInternals timerInternals = new BatchTimerInternals(Instant.now());
+
+    // It is the responsibility of the user of GroupAlsoByWindowsViaOutputBufferDoFn to only
+    // provide a WindowingInternals instance with the appropriate key type for StateInternals.
+    @SuppressWarnings("unchecked")
+    StateInternals<K> stateInternals = (StateInternals<K>) c.windowingInternals().stateInternals();
+
+    ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
+        new ReduceFnRunner<K, InputT, OutputT, W>(
+            key,
+            strategy,
+            stateInternals,
+            timerInternals,
+            c.windowingInternals(),
+            droppedDueToClosedWindow,
+            reduceFn,
+            c.getPipelineOptions());
+
+    Iterable<List<WindowedValue<InputT>>> chunks =
+        Iterables.partition(c.element().getValue(), 1000);
+    for (Iterable<WindowedValue<InputT>> chunk : chunks) {
+      // Process the chunk of elements.
+      reduceFnRunner.processElements(chunk);
+
+      // Then, since elements are sorted by their timestamp, advance the input watermark
+      // to the first element, and fire any timers that may have been scheduled.
+      timerInternals.advanceInputWatermark(reduceFnRunner, chunk.iterator().next().getTimestamp());
+
+      // Fire any processing timers that need to fire
+      timerInternals.advanceProcessingTime(reduceFnRunner, Instant.now());
+
+      // Leave the output watermark undefined. Since there's no late data in batch mode
+      // there's really no need to track it as we do for streaming.
+    }
+
+    // Finish any pending windows by advancing the input watermark to infinity.
+    timerInternals.advanceInputWatermark(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    // Finally, advance the processing time to infinity to fire any timers.
+    timerInternals.advanceProcessingTime(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    reduceFnRunner.persist();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
new file mode 100644
index 0000000..9450495
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * An implementation of {@link GroupByKey} built on top of a lower-level {@link GroupByKeyOnly}
+ * primitive.
+ *
+ * <p>This implementation of {@link GroupByKey} proceeds via the following steps:
+ * <ol>
+ *   <li>{@code ReifyTimestampsAndWindowsDoFn ParDo(ReifyTimestampsAndWindows)}: This embeds
+ *       the previously-implicit timestamp and window into the elements themselves, so a
+ *       window-and-timestamp-unaware transform can operate on them.</li>
+ *   <li>{@code GroupByKeyOnly}: This lower-level primitive groups by keys, ignoring windows
+ *       and timestamps. Many window-unaware runners have such a primitive already.</li>
+ *   <li>{@code SortValuesByTimestamp ParDo(SortValuesByTimestamp)}: The values in the iterables
+ *       output by {@link GroupByKeyOnly} are sorted by timestamp.</li>
+ *   <li>{@code GroupAlsoByWindow}: This primitive processes the sorted values. Today it is
+ *       implemented as a {@link ParDo} that calls reserved internal methods.</li>
+ * </ol>
+ *
+ * <p>This implementation of {@link GroupByKey} has severe limitations unless its component
+ * transforms are replaced. As-is, it is only applicable for in-memory runners using a batch-style
+ * execution strategy. Specifically:
+ *
+ * <ul>
+ *   <li>Every iterable output by {@link GroupByKeyOnly} must contain all elements for that key.
+ *       A streaming-style partition, with multiple elements for the same key, will not yield
+ *       correct results.</li>
+ *   <li>Sorting of values by timestamp is performed on an in-memory list. It will not succeed
+ *       for large iterables.</li>
+ *   <li>The implementation of {@code GroupAlsoByWindow} does not support timers. This is only
+ *       appropriate for runners which also do not support timers.</li>
+ * </ul>
+ */
+public class GroupByKeyViaGroupByKeyOnly<K, V>
+    extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+  private GroupByKey<K, V> gbkTransform;
+
+  public GroupByKeyViaGroupByKeyOnly(GroupByKey<K, V> originalTransform) {
+    this.gbkTransform = originalTransform;
+  }
+
+  @Override
+  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+    return input
+        // Make each input element's timestamp and assigned windows
+        // explicit, in the value part.
+        .apply(new ReifyTimestampsAndWindows<K, V>())
+
+        // Group by just the key.
+        // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
+        // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
+        // introduced in here.
+        .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
+
+        // Sort each key's values by timestamp. GroupAlsoByWindow requires
+        // its input to be sorted by timestamp.
+        .apply(new SortValuesByTimestamp<K, V>())
+
+        // Group each key's values by window, merging windows as needed.
+        .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
+
+        // And update the windowing strategy as appropriate.
+        .setWindowingStrategyInternal(
+            gbkTransform.updateWindowingStrategy(windowingStrategy));
+  }
+
+  /**
+   * Runner-specific primitive that groups by key only, ignoring any window assignments. A
+   * runner that uses {@link GroupByKeyViaGroupByKeyOnly} should have a primitive way to translate
+   * or evaluate this class.
+   */
+  public static class GroupByKeyOnly<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    @Override
+    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+    }
+
+    @Override
+    public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
+      return GroupByKey.getOutputKvCoder(input.getCoder());
+    }
+  }
+
+  /**
+   * Helper transform that sorts the values associated with each key by timestamp.
+   */
+  private static class SortValuesByTimestamp<K, V>
+      extends PTransform<
+          PCollection<KV<K, Iterable<WindowedValue<V>>>>,
+          PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
+    @Override
+    public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
+        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+      return input
+          .apply(
+              ParDo.of(
+                  new DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<WindowedValue<V>>>>() {
+                    @Override
+                    public void processElement(ProcessContext c) {
+                      KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
+                      K key = kvs.getKey();
+                      Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
+                      List<WindowedValue<V>> sortedValues = new ArrayList<>();
+                      for (WindowedValue<V> value : unsortedValues) {
+                        sortedValues.add(value);
+                      }
+                      Collections.sort(
+                          sortedValues,
+                          new Comparator<WindowedValue<V>>() {
+                            @Override
+                            public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
+                              return e1.getTimestamp().compareTo(e2.getTimestamp());
+                            }
+                          });
+                      c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
+                    }
+                  }))
+          .setCoder(input.getCoder());
+    }
+  }
+
+  /**
+   * Helper transform that takes a collection of timestamp-ordered
+   * values associated with each key, groups the values by window,
+   * combines windows as needed, and for each window in each key,
+   * outputs a collection of key/value-list pairs implicitly assigned
+   * to the window and with the timestamp derived from that window.
+   */
+  private static class GroupAlsoByWindow<K, V>
+      extends PTransform<
+          PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<V>>>> {
+    private final WindowingStrategy<?, ?> windowingStrategy;
+
+    public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
+      this.windowingStrategy = windowingStrategy;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public PCollection<KV<K, Iterable<V>>> apply(
+        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+      @SuppressWarnings("unchecked")
+      KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
+          (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
+
+      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+      Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder();
+
+      IterableCoder<WindowedValue<V>> inputIterableValueCoder =
+          (IterableCoder<WindowedValue<V>>) inputValueCoder;
+      Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
+      WindowedValueCoder<V> inputIterableWindowedValueCoder =
+          (WindowedValueCoder<V>) inputIterableElementCoder;
+
+      Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
+      Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
+      Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
+
+      return input
+          .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
+          .setCoder(outputKvCoder);
+    }
+
+    private <W extends BoundedWindow>
+        GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn(
+            WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
+      return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
+          strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
new file mode 100644
index 0000000..4815162
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+import org.joda.time.Instant;
+
+/**
+ * A customized {@link DoFnRunner} that handles late data dropping for
+ * a {@link KeyedWorkItem} input {@link DoFn}.
+ *
+ * <p>It expands windows before checking data lateness.
+ *
+ * <p>{@link KeyedWorkItem KeyedWorkItems} are always in empty windows.
+ *
+ * @param <K> key type
+ * @param <InputT> input value element type
+ * @param <OutputT> output value element type
+ * @param <W> window type
+ */
+public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWindow>
+    implements DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
+  private final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner;
+  private final LateDataFilter lateDataFilter;
+
+  public LateDataDroppingDoFnRunner(
+      DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner,
+      WindowingStrategy<?, ?> windowingStrategy,
+      TimerInternals timerInternals,
+      Aggregator<Long, Long> droppedDueToLateness) {
+    this.doFnRunner = doFnRunner;
+    lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals, droppedDueToLateness);
+  }
+
+  @Override
+  public void startBundle() {
+    doFnRunner.startBundle();
+  }
+
+  @Override
+  public void processElement(WindowedValue<KeyedWorkItem<K, InputT>> elem) {
+    Iterable<WindowedValue<InputT>> nonLateElements = lateDataFilter.filter(
+        elem.getValue().key(), elem.getValue().elementsIterable());
+    KeyedWorkItem<K, InputT> keyedWorkItem = KeyedWorkItems.workItem(
+        elem.getValue().key(), elem.getValue().timersIterable(), nonLateElements);
+    doFnRunner.processElement(elem.withValue(keyedWorkItem));
+  }
+
+  @Override
+  public void finishBundle() {
+    doFnRunner.finishBundle();
+  }
+
+  /**
+   * It filters late data in a {@link KeyedWorkItem}.
+   */
+  @VisibleForTesting
+  static class LateDataFilter {
+    private final WindowingStrategy<?, ?> windowingStrategy;
+    private final TimerInternals timerInternals;
+    private final Aggregator<Long, Long> droppedDueToLateness;
+
+    public LateDataFilter(
+        WindowingStrategy<?, ?> windowingStrategy,
+        TimerInternals timerInternals,
+        Aggregator<Long, Long> droppedDueToLateness) {
+      this.windowingStrategy = windowingStrategy;
+      this.timerInternals = timerInternals;
+      this.droppedDueToLateness = droppedDueToLateness;
+    }
+
+    /**
+     * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains
+     * non-late input elements.
+     */
+    public <K, InputT> Iterable<WindowedValue<InputT>> filter(
+        final K key, Iterable<WindowedValue<InputT>> elements) {
+      Iterable<Iterable<WindowedValue<InputT>>> windowsExpandedElements = Iterables.transform(
+          elements,
+          new Function<WindowedValue<InputT>, Iterable<WindowedValue<InputT>>>() {
+            @Override
+            public Iterable<WindowedValue<InputT>> apply(final WindowedValue<InputT> input) {
+              return Iterables.transform(
+                  input.getWindows(),
+                  new Function<BoundedWindow, WindowedValue<InputT>>() {
+                    @Override
+                    public WindowedValue<InputT> apply(BoundedWindow window) {
+                      return WindowedValue.of(
+                          input.getValue(), input.getTimestamp(), window, input.getPane());
+                    }
+                  });
+            }});
+
+      Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter(
+          Iterables.concat(windowsExpandedElements),
+          new Predicate<WindowedValue<InputT>>() {
+            @Override
+            public boolean apply(WindowedValue<InputT> input) {
+              BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
+              if (canDropDueToExpiredWindow(window)) {
+                // The element is too late for this window.
+                droppedDueToLateness.addValue(1L);
+                WindowTracing.debug(
+                    "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
+                    + "since too far behind inputWatermark:{}; outputWatermark:{}",
+                    input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
+                    timerInternals.currentOutputWatermarkTime());
+                return false;
+              } else {
+                return true;
+              }
+            }
+          });
+      return nonLateElements;
+    }
+
+    /** Is {@code window} expired w.r.t. the garbage collection watermark? */
+    private boolean canDropDueToExpiredWindow(BoundedWindow window) {
+      Instant inputWM = timerInternals.currentInputWatermarkTime();
+      return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java
new file mode 100644
index 0000000..e809c24
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+
+/**
+ * Tracks which windows have non-empty panes. Specifically, which windows have new elements since
+ * their last triggering.
+ *
+ * @param <W> The kind of windows being tracked.
+ */
+public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
+
+  static <K, W extends BoundedWindow> NonEmptyPanes<K, W> create(
+      WindowingStrategy<?, W> strategy, ReduceFn<K, ?, ?, W> reduceFn) {
+    if (strategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
+      return new DiscardingModeNonEmptyPanes<>(reduceFn);
+    } else {
+      return new GeneralNonEmptyPanes<>();
+    }
+  }
+
+  /**
+   * Record that some content has been added to the window in {@code context}, and therefore the
+   * current pane is not empty.
+   */
+  public abstract void recordContent(StateAccessor<K> context);
+
+  /**
+   * Record that the given pane is empty.
+   */
+  public abstract void clearPane(StateAccessor<K> state);
+
+  /**
+   * Return true if the current pane for the window in {@code context} is empty.
+   */
+  public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context);
+
+  /**
+   * Prefetch in preparation for merging.
+   */
+  public abstract void prefetchOnMerge(MergingStateAccessor<K, W> state);
+
+  /**
+   * Eagerly merge backing state.
+   */
+  public abstract void onMerge(MergingStateAccessor<K, W> context);
+
+  /**
+   * An implementation of {@code NonEmptyPanes} optimized for use with discarding mode. Uses the
+   * presence of data in the accumulation buffer to record non-empty panes.
+   */
+  private static class DiscardingModeNonEmptyPanes<K, W extends BoundedWindow>
+      extends NonEmptyPanes<K, W> {
+
+    private ReduceFn<K, ?, ?, W> reduceFn;
+
+    private DiscardingModeNonEmptyPanes(ReduceFn<K, ?, ?, W> reduceFn) {
+      this.reduceFn = reduceFn;
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
+      return reduceFn.isEmpty(state);
+    }
+
+    @Override
+    public void recordContent(StateAccessor<K> state) {
+      // Nothing to do -- the reduceFn is tracking contents
+    }
+
+    @Override
+    public void clearPane(StateAccessor<K> state) {
+      // Nothing to do -- the reduceFn is tracking contents
+    }
+
+    @Override
+    public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
+      // Nothing to do -- the reduceFn is tracking contents
+    }
+
+    @Override
+    public void onMerge(MergingStateAccessor<K, W> context) {
+      // Nothing to do -- the reduceFn is tracking contents
+    }
+  }
+
+  /**
+   * An implementation of {@code NonEmptyPanes} for general use.
+   */
+  private static class GeneralNonEmptyPanes<K, W extends BoundedWindow>
+      extends NonEmptyPanes<K, W> {
+
+    private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
+        PANE_ADDITIONS_TAG =
+        StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
+            "count", VarLongCoder.of(), new Sum.SumLongFn()));
+
+    @Override
+    public void recordContent(StateAccessor<K> state) {
+      state.access(PANE_ADDITIONS_TAG).add(1L);
+    }
+
+    @Override
+    public void clearPane(StateAccessor<K> state) {
+      state.access(PANE_ADDITIONS_TAG).clear();
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
+      return state.access(PANE_ADDITIONS_TAG).isEmpty();
+    }
+
+    @Override
+    public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
+      StateMerging.prefetchCombiningValues(state, PANE_ADDITIONS_TAG);
+    }
+
+    @Override
+    public void onMerge(MergingStateAccessor<K, W> context) {
+      StateMerging.mergeCombiningValues(context, PANE_ADDITIONS_TAG);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
new file mode 100644
index 0000000..5e08031
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.joda.time.Instant;
+
+/**
+ * Determine the timing and other properties of a new pane for a given computation, key and window.
+ * Incorporates any previous pane, whether the pane has been produced because an
+ * on-time {@link AfterWatermark} trigger firing, and the relation between the element's timestamp
+ * and the current output watermark.
+ */
+public class PaneInfoTracker {
+  private TimerInternals timerInternals;
+
+  public PaneInfoTracker(TimerInternals timerInternals) {
+    this.timerInternals = timerInternals;
+  }
+
+  @VisibleForTesting
+  static final StateTag<Object, ValueState<PaneInfo>> PANE_INFO_TAG =
+      StateTags.makeSystemTagInternal(StateTags.value("pane", PaneInfoCoder.INSTANCE));
+
+  public void clear(StateAccessor<?> state) {
+    state.access(PANE_INFO_TAG).clear();
+  }
+
+  /**
+   * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
+   * info includes the timing for the pane, who's calculation is quite subtle.
+   *
+   * @param isFinal should be {@code true} only if the triggering machinery can guarantee
+   * no further firings for the
+   */
+  public ReadableState<PaneInfo> getNextPaneInfo(
+      ReduceFn<?, ?, ?, ?>.Context context, final boolean isFinal) {
+    final Object key = context.key();
+    final ReadableState<PaneInfo> previousPaneFuture =
+        context.state().access(PaneInfoTracker.PANE_INFO_TAG);
+    final Instant windowMaxTimestamp = context.window().maxTimestamp();
+
+    return new ReadableState<PaneInfo>() {
+      @Override
+      public ReadableState<PaneInfo> readLater() {
+        previousPaneFuture.readLater();
+        return this;
+      }
+
+      @Override
+      public PaneInfo read() {
+        PaneInfo previousPane = previousPaneFuture.read();
+        return describePane(key, windowMaxTimestamp, previousPane, isFinal);
+      }
+    };
+  }
+
+  public void storeCurrentPaneInfo(ReduceFn<?, ?, ?, ?>.Context context, PaneInfo currentPane) {
+    context.state().access(PANE_INFO_TAG).write(currentPane);
+  }
+
+  private <W> PaneInfo describePane(
+      Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean isFinal) {
+    boolean isFirst = previousPane == null;
+    Timing previousTiming = isFirst ? null : previousPane.getTiming();
+    long index = isFirst ? 0 : previousPane.getIndex() + 1;
+    long nonSpeculativeIndex = isFirst ? 0 : previousPane.getNonSpeculativeIndex() + 1;
+    Instant outputWM = timerInternals.currentOutputWatermarkTime();
+    Instant inputWM = timerInternals.currentInputWatermarkTime();
+
+    // True if it is not possible to assign the element representing this pane a timestamp
+    // which will make an ON_TIME pane for any following computation.
+    // Ie true if the element's latest possible timestamp is before the current output watermark.
+    boolean isLateForOutput = outputWM != null && windowMaxTimestamp.isBefore(outputWM);
+
+    // True if all emitted panes (if any) were EARLY panes.
+    // Once the ON_TIME pane has fired, all following panes must be considered LATE even
+    // if the output watermark is behind the end of the window.
+    boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;
+
+    // True is the input watermark hasn't passed the window's max timestamp.
+    boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp);
+
+    Timing timing;
+    if (isLateForOutput || !onlyEarlyPanesSoFar) {
+      // The output watermark has already passed the end of this window, or we have already
+      // emitted a non-EARLY pane. Irrespective of how this pane was triggered we must
+      // consider this pane LATE.
+      timing = Timing.LATE;
+    } else if (isEarlyForInput) {
+      // This is an EARLY firing.
+      timing = Timing.EARLY;
+      nonSpeculativeIndex = -1;
+    } else {
+      // This is the unique ON_TIME firing for the window.
+      timing = Timing.ON_TIME;
+    }
+
+    WindowTracing.debug(
+        "describePane: {} pane (prev was {}) for key:{}; windowMaxTimestamp:{}; "
+        + "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}",
+        timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isLateForOutput);
+
+    if (previousPane != null) {
+      // Timing transitions should follow EARLY* ON_TIME? LATE*
+      switch (previousTiming) {
+        case EARLY:
+          Preconditions.checkState(
+              timing == Timing.EARLY || timing == Timing.ON_TIME || timing == Timing.LATE,
+              "EARLY cannot transition to %s", timing);
+          break;
+        case ON_TIME:
+          Preconditions.checkState(
+              timing == Timing.LATE, "ON_TIME cannot transition to %s", timing);
+          break;
+        case LATE:
+          Preconditions.checkState(timing == Timing.LATE, "LATE cannot transtion to %s", timing);
+          break;
+        case UNKNOWN:
+          break;
+      }
+      Preconditions.checkState(!previousPane.isLast(), "Last pane was not last after all.");
+    }
+
+    return PaneInfo.createPane(isFirst, isFinal, timing, index, nonSpeculativeIndex);
+  }
+}