You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:41:06 UTC

[35/50] [abbrv] incubator-beam git commit: Put classes in runners-core package into runners.core namespace

Put classes in runners-core package into runners.core namespace


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4bf3a3b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4bf3a3b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4bf3a3b3

Branch: refs/heads/gearpump-runner
Commit: 4bf3a3b345d94ecea4c77ebdfaed9dd7ef0f39e5
Parents: 60d8cd9
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Aug 25 14:57:26 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Sep 12 17:40:12 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/core/AssignWindows.java |   46 +
 .../beam/runners/core/AssignWindowsDoFn.java    |   79 +
 .../beam/runners/core/BatchTimerInternals.java  |  140 ++
 .../apache/beam/runners/core/DoFnRunner.java    |   65 +
 .../beam/runners/core/DoFnRunnerBase.java       |  559 +++++++
 .../apache/beam/runners/core/DoFnRunners.java   |  148 ++
 .../runners/core/ElementByteSizeObservable.java |   44 +
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |    5 +-
 .../runners/core/GroupAlsoByWindowsDoFn.java    |   66 +
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |  100 ++
 .../core/GroupByKeyViaGroupByKeyOnly.java       |  271 ++++
 .../core/LateDataDroppingDoFnRunner.java        |  151 ++
 .../apache/beam/runners/core/NonEmptyPanes.java |  151 ++
 .../beam/runners/core/PaneInfoTracker.java      |  158 ++
 .../beam/runners/core/PeekingReiterator.java    |  100 ++
 .../core/PushbackSideInputDoFnRunner.java       |  116 ++
 .../org/apache/beam/runners/core/ReduceFn.java  |  130 ++
 .../runners/core/ReduceFnContextFactory.java    |  499 ++++++
 .../beam/runners/core/ReduceFnRunner.java       |  993 ++++++++++++
 .../beam/runners/core/SimpleDoFnRunner.java     |   58 +
 .../beam/runners/core/SystemReduceFn.java       |  139 ++
 .../apache/beam/runners/core/TriggerRunner.java |  247 +++
 .../apache/beam/runners/core/WatermarkHold.java |  539 +++++++
 .../org/apache/beam/sdk/util/AssignWindows.java |   46 -
 .../apache/beam/sdk/util/AssignWindowsDoFn.java |   77 -
 .../beam/sdk/util/BatchTimerInternals.java      |  137 --
 .../org/apache/beam/sdk/util/DoFnRunner.java    |   63 -
 .../apache/beam/sdk/util/DoFnRunnerBase.java    |  551 -------
 .../org/apache/beam/sdk/util/DoFnRunners.java   |  143 --
 .../beam/sdk/util/GroupAlsoByWindowsDoFn.java   |   63 -
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   97 --
 .../sdk/util/GroupByKeyViaGroupByKeyOnly.java   |  268 ----
 .../sdk/util/LateDataDroppingDoFnRunner.java    |  145 --
 .../org/apache/beam/sdk/util/NonEmptyPanes.java |  150 --
 .../apache/beam/sdk/util/PaneInfoTracker.java   |  156 --
 .../sdk/util/PushbackSideInputDoFnRunner.java   |  113 --
 .../java/org/apache/beam/sdk/util/ReduceFn.java |  128 --
 .../beam/sdk/util/ReduceFnContextFactory.java   |  493 ------
 .../apache/beam/sdk/util/ReduceFnRunner.java    |  983 ------------
 .../apache/beam/sdk/util/SimpleDoFnRunner.java  |   55 -
 .../apache/beam/sdk/util/SystemReduceFn.java    |  138 --
 .../org/apache/beam/sdk/util/TriggerRunner.java |  241 ---
 .../org/apache/beam/sdk/util/WatermarkHold.java |  536 -------
 .../util/common/ElementByteSizeObservable.java  |   42 -
 .../beam/sdk/util/common/PeekingReiterator.java |   99 --
 .../beam/sdk/util/common/package-info.java      |   20 -
 .../org/apache/beam/sdk/util/package-info.java  |   20 -
 .../runners/core/BatchTimerInternalsTest.java   |  118 ++
 .../core/GroupAlsoByWindowsProperties.java      |  660 ++++++++
 ...oupAlsoByWindowsViaOutputBufferDoFnTest.java |  110 ++
 .../core/LateDataDroppingDoFnRunnerTest.java    |  117 ++
 .../core/PushbackSideInputDoFnRunnerTest.java   |  235 +++
 .../beam/runners/core/ReduceFnRunnerTest.java   | 1446 ++++++++++++++++++
 .../beam/runners/core/ReduceFnTester.java       |  796 ++++++++++
 .../beam/runners/core/SimpleDoFnRunnerTest.java |   88 ++
 .../beam/sdk/util/BatchTimerInternalsTest.java  |  117 --
 .../sdk/util/GroupAlsoByWindowsProperties.java  |  658 --------
 ...oupAlsoByWindowsViaOutputBufferDoFnTest.java |  109 --
 .../util/LateDataDroppingDoFnRunnerTest.java    |  114 --
 .../util/PushbackSideInputDoFnRunnerTest.java   |  231 ---
 .../beam/sdk/util/ReduceFnRunnerTest.java       | 1442 -----------------
 .../apache/beam/sdk/util/ReduceFnTester.java    |  784 ----------
 .../beam/sdk/util/SimpleDoFnRunnerTest.java     |   84 -
 .../GroupAlsoByWindowEvaluatorFactory.java      |    8 +-
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  |    4 +-
 .../beam/runners/direct/ParDoEvaluator.java     |    8 +-
 .../direct/UncommittedBundleOutputManager.java  |    2 +-
 .../FlinkStreamingTransformTranslators.java     |    2 +-
 .../wrappers/streaming/DoFnOperator.java        |    6 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |    2 +-
 .../apache/beam/runners/spark/SparkRunner.java  |    2 +-
 .../spark/translation/TransformTranslator.java  |   10 +-
 .../streaming/StreamingTransformTranslator.java |    2 +-
 .../src/main/resources/beam/findbugs-filter.xml |    2 +-
 .../org/apache/beam/sdk/util/BitSetCoder.java   |    2 +-
 75 files changed, 8395 insertions(+), 8332 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
new file mode 100644
index 0000000..f2387f5
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * {@link PTransform} that uses privileged (non-user-facing) APIs to assign elements of a
+ * {@link PCollection} to windows according to the provided {@link WindowFn}.
+ *
+ * @param <T> Type of elements being windowed
+ * @param <W> Window type
+ */
+public class AssignWindows<T, W extends BoundedWindow>
+    extends PTransform<PCollection<T>, PCollection<T>> {
+
+  private WindowFn<? super T, W> fn;
+
+  public AssignWindows(WindowFn<? super T, W> fn) {
+    this.fn = fn;
+  }
+
+  @Override
+  public PCollection<T> apply(PCollection<T> input) {
+    return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
new file mode 100644
index 0000000..0eb1667
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+
+/**
+ * {@link OldDoFn} that tags elements of a {@link PCollection} with windows, according to the
+ * provided {@link WindowFn}.
+ *
+ * @param <T> Type of elements being windowed
+ * @param <W> Window type
+ */
+@SystemDoFnInternal
+public class AssignWindowsDoFn<T, W extends BoundedWindow> extends OldDoFn<T, T>
+    implements RequiresWindowAccess {
+  private WindowFn<? super T, W> fn;
+
+  public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
+    this.fn =
+        checkNotNull(
+            fn,
+            "%s provided to %s cannot be null",
+            WindowFn.class.getSimpleName(),
+            AssignWindowsDoFn.class.getSimpleName());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void processElement(final ProcessContext c) throws Exception {
+    Collection<W> windows =
+        ((WindowFn<T, W>) fn).assignWindows(
+            ((WindowFn<T, W>) fn).new AssignContext() {
+                @Override
+                public T element() {
+                  return c.element();
+                }
+
+                @Override
+                public Instant timestamp() {
+                  return c.timestamp();
+                }
+
+                @Override
+                public BoundedWindow window() {
+                  return Iterables.getOnlyElement(c.windowingInternals().windows());
+                }
+              });
+
+    c.windowingInternals()
+        .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java
new file mode 100644
index 0000000..829dbde
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+
+import org.joda.time.Instant;
+
+/**
+ * TimerInternals that uses priority queues to manage the timers that are ready to fire.
+ */
+public class BatchTimerInternals implements TimerInternals {
+  /** Set of timers that are scheduled used for deduplicating timers. */
+  private Set<TimerData> existingTimers = new HashSet<>();
+
+  // Keep these queues separate so we can advance over them separately.
+  private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
+  private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
+
+  private Instant inputWatermarkTime;
+  private Instant processingTime;
+
+  private PriorityQueue<TimerData> queue(TimeDomain domain) {
+    return TimeDomain.EVENT_TIME.equals(domain) ? watermarkTimers : processingTimers;
+  }
+
+  public BatchTimerInternals(Instant processingTime) {
+    this.processingTime = processingTime;
+    this.inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+  }
+
+  @Override
+  public void setTimer(TimerData timer) {
+    if (existingTimers.add(timer)) {
+      queue(timer.getDomain()).add(timer);
+    }
+  }
+
+  @Override
+  public void deleteTimer(TimerData timer) {
+    existingTimers.remove(timer);
+    queue(timer.getDomain()).remove(timer);
+  }
+
+  @Override
+  public Instant currentProcessingTime() {
+    return processingTime;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return {@link BoundedWindow#TIMESTAMP_MAX_VALUE}: in batch mode, upstream processing
+   * is already complete.
+   */
+  @Override
+  @Nullable
+  public Instant currentSynchronizedProcessingTime() {
+    return BoundedWindow.TIMESTAMP_MAX_VALUE;
+  }
+
+  @Override
+  public Instant currentInputWatermarkTime() {
+    return inputWatermarkTime;
+  }
+
+  @Override
+  @Nullable
+  public Instant currentOutputWatermarkTime() {
+    // The output watermark is always undefined in batch mode.
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(getClass())
+        .add("watermarkTimers", watermarkTimers)
+        .add("processingTimers", processingTimers)
+        .toString();
+  }
+
+  public void advanceInputWatermark(ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark)
+      throws Exception {
+    checkState(!newInputWatermark.isBefore(inputWatermarkTime),
+        "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime,
+        newInputWatermark);
+    inputWatermarkTime = newInputWatermark;
+    advance(runner, newInputWatermark, TimeDomain.EVENT_TIME);
+  }
+
+  public void advanceProcessingTime(ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime)
+      throws Exception {
+    checkState(!newProcessingTime.isBefore(processingTime),
+        "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime);
+    processingTime = newProcessingTime;
+    advance(runner, newProcessingTime, TimeDomain.PROCESSING_TIME);
+  }
+
+  private void advance(ReduceFnRunner<?, ?, ?, ?> runner, Instant newTime, TimeDomain domain)
+      throws Exception {
+    PriorityQueue<TimerData> timers = queue(domain);
+    boolean shouldFire = false;
+
+    do {
+      TimerData timer = timers.peek();
+      // Timers fire if the new time is ahead of the timer
+      shouldFire = timer != null && newTime.isAfter(timer.getTimestamp());
+      if (shouldFire) {
+        // Remove before firing, so that if the trigger adds another identical
+        // timer we don't remove it.
+        timers.remove();
+        runner.onTimer(timer);
+      }
+    } while (shouldFire);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
new file mode 100644
index 0000000..f4c8eea
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * An wrapper interface that represents the execution of a {@link OldDoFn}.
+ */
+public interface DoFnRunner<InputT, OutputT> {
+  /**
+   * Prepares and calls {@link OldDoFn#startBundle}.
+   */
+  public void startBundle();
+
+  /**
+   * Calls {@link OldDoFn#processElement} with a {@link ProcessContext} containing the current
+   * element.
+   */
+  public void processElement(WindowedValue<InputT> elem);
+
+  /**
+   * Calls {@link OldDoFn#finishBundle} and performs additional tasks, such as
+   * flushing in-memory states.
+   */
+  public void finishBundle();
+
+  /**
+   * An internal interface for signaling that a {@link OldDoFn} requires late data dropping.
+   */
+  public interface ReduceFnExecutor<K, InputT, OutputT, W> {
+    /**
+     * Gets this object as a {@link OldDoFn}.
+     *
+     * Most implementors of this interface are expected to be {@link OldDoFn} instances, and will
+     * return themselves.
+     */
+    OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn();
+
+    /**
+     * Returns an aggregator that tracks elements that are dropped due to being late.
+     */
+    Aggregator<Long, Long> getDroppedDueToLatenessAggregator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunnerBase.java
new file mode 100644
index 0000000..71472da
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunnerBase.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ExecutionContext.StepContext;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
+
+/**
+ * A base implementation of {@link DoFnRunner}.
+ *
+ * <p> Sub-classes should override {@link #invokeProcessElement}.
+ */
+public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+
+  /** The {@link OldDoFn} being run. */
+  public final OldDoFn<InputT, OutputT> fn;
+
+  /** The context used for running the {@link OldDoFn}. */
+  public final DoFnContext<InputT, OutputT> context;
+
+  protected DoFnRunnerBase(
+      PipelineOptions options,
+      OldDoFn<InputT, OutputT> fn,
+      SideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      StepContext stepContext,
+      AggregatorFactory aggregatorFactory,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    this.fn = fn;
+    this.context = new DoFnContext<>(
+        options,
+        fn,
+        sideInputReader,
+        outputManager,
+        mainOutputTag,
+        sideOutputTags,
+        stepContext,
+        aggregatorFactory,
+        windowingStrategy == null ? null : windowingStrategy.getWindowFn());
+  }
+
+  /**
+   * An implementation of {@code OutputManager} using simple lists, for testing and in-memory
+   * contexts such as the {@code DirectRunner}.
+   */
+  public static class ListOutputManager implements OutputManager {
+
+    private Map<TupleTag<?>, List<WindowedValue<?>>> outputLists = Maps.newHashMap();
+
+    @Override
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      List<WindowedValue<T>> outputList = (List) outputLists.get(tag);
+
+      if (outputList == null) {
+        outputList = Lists.newArrayList();
+        @SuppressWarnings({"rawtypes", "unchecked"})
+        List<WindowedValue<?>> untypedList = (List) outputList;
+        outputLists.put(tag, untypedList);
+      }
+
+      outputList.add(output);
+    }
+
+    public <T> List<WindowedValue<T>> getOutput(TupleTag<T> tag) {
+      // Safe cast by design, inexpressible in Java without rawtypes
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      List<WindowedValue<T>> outputList = (List) outputLists.get(tag);
+      return (outputList != null) ? outputList : Collections.<WindowedValue<T>>emptyList();
+    }
+  }
+
+  @Override
+  public void startBundle() {
+    // This can contain user code. Wrap it in case it throws an exception.
+    try {
+      fn.startBundle(context);
+    } catch (Throwable t) {
+      // Exception in user code.
+      throw wrapUserCodeException(t);
+    }
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> elem) {
+    if (elem.getWindows().size() <= 1
+        || (!RequiresWindowAccess.class.isAssignableFrom(fn.getClass())
+            && context.sideInputReader.isEmpty())) {
+      invokeProcessElement(elem);
+    } else {
+      // We could modify the windowed value (and the processContext) to
+      // avoid repeated allocations, but this is more straightforward.
+      for (WindowedValue<InputT> windowedValue : elem.explodeWindows()) {
+        invokeProcessElement(windowedValue);
+      }
+    }
+  }
+
+  /**
+   * Invokes {@link OldDoFn#processElement} after certain pre-processings has been done in
+   * {@link DoFnRunnerBase#processElement}.
+   */
+  protected abstract void invokeProcessElement(WindowedValue<InputT> elem);
+
+  @Override
+  public void finishBundle() {
+    // This can contain user code. Wrap it in case it throws an exception.
+    try {
+      fn.finishBundle(context);
+    } catch (Throwable t) {
+      // Exception in user code.
+      throw wrapUserCodeException(t);
+    }
+  }
+
+  /**
+   * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}.
+   *
+   * @param <InputT> the type of the {@link OldDoFn} (main) input elements
+   * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
+   */
+  private static class DoFnContext<InputT, OutputT>
+      extends OldDoFn<InputT, OutputT>.Context {
+    private static final int MAX_SIDE_OUTPUTS = 1000;
+
+    final PipelineOptions options;
+    final OldDoFn<InputT, OutputT> fn;
+    final SideInputReader sideInputReader;
+    final OutputManager outputManager;
+    final TupleTag<OutputT> mainOutputTag;
+    final StepContext stepContext;
+    final AggregatorFactory aggregatorFactory;
+    final WindowFn<?, ?> windowFn;
+
+    /**
+     * The set of known output tags, some of which may be undeclared, so we can throw an
+     * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}.
+     */
+    private Set<TupleTag<?>> outputTags;
+
+    public DoFnContext(PipelineOptions options,
+                       OldDoFn<InputT, OutputT> fn,
+                       SideInputReader sideInputReader,
+                       OutputManager outputManager,
+                       TupleTag<OutputT> mainOutputTag,
+                       List<TupleTag<?>> sideOutputTags,
+                       StepContext stepContext,
+                       AggregatorFactory aggregatorFactory,
+                       WindowFn<?, ?> windowFn) {
+      fn.super();
+      this.options = options;
+      this.fn = fn;
+      this.sideInputReader = sideInputReader;
+      this.outputManager = outputManager;
+      this.mainOutputTag = mainOutputTag;
+      this.outputTags = Sets.newHashSet();
+
+      outputTags.add(mainOutputTag);
+      for (TupleTag<?> sideOutputTag : sideOutputTags) {
+        outputTags.add(sideOutputTag);
+      }
+
+      this.stepContext = stepContext;
+      this.aggregatorFactory = aggregatorFactory;
+      this.windowFn = windowFn;
+      super.setupDelegateAggregators();
+    }
+
+    //////////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return options;
+    }
+
+    <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
+        T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
+      final Instant inputTimestamp = timestamp;
+
+      if (timestamp == null) {
+        timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      }
+
+      if (windows == null) {
+        try {
+          // The windowFn can never succeed at accessing the element, so its type does not
+          // matter here
+          @SuppressWarnings("unchecked")
+          WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
+          windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() {
+            @Override
+            public Object element() {
+              throw new UnsupportedOperationException(
+                  "WindowFn attempted to access input element when none was available");
+            }
+
+            @Override
+            public Instant timestamp() {
+              if (inputTimestamp == null) {
+                throw new UnsupportedOperationException(
+                    "WindowFn attempted to access input timestamp when none was available");
+              }
+              return inputTimestamp;
+            }
+
+            @Override
+            public W window() {
+              throw new UnsupportedOperationException(
+                  "WindowFn attempted to access input windows when none were available");
+            }
+          });
+        } catch (Exception e) {
+          throw UserCodeException.wrap(e);
+        }
+      }
+
+      return WindowedValue.of(output, timestamp, windows, pane);
+    }
+
+    public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+      if (!sideInputReader.contains(view)) {
+        throw new IllegalArgumentException("calling sideInput() with unknown view");
+      }
+      BoundedWindow sideInputWindow =
+          view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
+      return sideInputReader.get(view, sideInputWindow);
+    }
+
+    void outputWindowedValue(
+        OutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
+    }
+
+    void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
+      outputManager.output(mainOutputTag, windowedElem);
+      if (stepContext != null) {
+        stepContext.noteOutput(windowedElem);
+      }
+    }
+
+    protected <T> void sideOutputWindowedValue(TupleTag<T> tag,
+                                               T output,
+                                               Instant timestamp,
+                                               Collection<? extends BoundedWindow> windows,
+                                               PaneInfo pane) {
+      sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane));
+    }
+
+    protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
+      if (!outputTags.contains(tag)) {
+        // This tag wasn't declared nor was it seen before during this execution.
+        // Thus, this must be a new, undeclared and unconsumed output.
+        // To prevent likely user errors, enforce the limit on the number of side
+        // outputs.
+        if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
+          throw new IllegalArgumentException(
+              "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS);
+        }
+        outputTags.add(tag);
+      }
+
+      outputManager.output(tag, windowedElem);
+      if (stepContext != null) {
+        stepContext.noteSideOutput(tag, windowedElem);
+      }
+    }
+
+    // Following implementations of output, outputWithTimestamp, and sideOutput
+    // are only accessible in OldDoFn.startBundle and OldDoFn.finishBundle, and will be shadowed by
+    // ProcessContext's versions in OldDoFn.processElement.
+    @Override
+    public void output(OutputT output) {
+      outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
+      sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
+      sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+        String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null");
+      return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner);
+    }
+  }
+
+  /**
+   * Returns a new {@link OldDoFn.ProcessContext} for the given element.
+   */
+  protected OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(
+      WindowedValue<InputT> elem) {
+    return new DoFnProcessContext<InputT, OutputT>(fn, context, elem);
+  }
+
+  protected RuntimeException wrapUserCodeException(Throwable t) {
+    throw UserCodeException.wrapIf(!isSystemDoFn(), t);
+  }
+
+  private boolean isSystemDoFn() {
+    return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
+  }
+
+  /**
+   * A concrete implementation of {@link OldDoFn.ProcessContext} used for
+   * running a {@link OldDoFn} over a single element.
+   *
+   * @param <InputT> the type of the {@link OldDoFn} (main) input elements
+   * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
+   */
+  static class DoFnProcessContext<InputT, OutputT>
+      extends OldDoFn<InputT, OutputT>.ProcessContext {
+
+
+    final OldDoFn<InputT, OutputT> fn;
+    final DoFnContext<InputT, OutputT> context;
+    final WindowedValue<InputT> windowedValue;
+
+    public DoFnProcessContext(OldDoFn<InputT, OutputT> fn,
+                              DoFnContext<InputT, OutputT> context,
+                              WindowedValue<InputT> windowedValue) {
+      fn.super();
+      this.fn = fn;
+      this.context = context;
+      this.windowedValue = windowedValue;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return context.getPipelineOptions();
+    }
+
+    @Override
+    public InputT element() {
+      return windowedValue.getValue();
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      checkNotNull(view, "View passed to sideInput cannot be null");
+      Iterator<? extends BoundedWindow> windowIter = windows().iterator();
+      BoundedWindow window;
+      if (!windowIter.hasNext()) {
+        if (context.windowFn instanceof GlobalWindows) {
+          // TODO: Remove this once GroupByKeyOnly no longer outputs elements
+          // without windows
+          window = GlobalWindow.INSTANCE;
+        } else {
+          throw new IllegalStateException(
+              "sideInput called when main input element is not in any windows");
+        }
+      } else {
+        window = windowIter.next();
+        if (windowIter.hasNext()) {
+          throw new IllegalStateException(
+              "sideInput called when main input element is in multiple windows");
+        }
+      }
+      return context.sideInput(view, window);
+    }
+
+    @Override
+    public BoundedWindow window() {
+      if (!(fn instanceof RequiresWindowAccess)) {
+        throw new UnsupportedOperationException(
+            "window() is only available in the context of a OldDoFn marked as"
+                + "RequiresWindowAccess.");
+      }
+      return Iterables.getOnlyElement(windows());
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return windowedValue.getPane();
+    }
+
+    @Override
+    public void output(OutputT output) {
+      context.outputWindowedValue(windowedValue.withValue(output));
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      checkTimestamp(timestamp);
+      context.outputWindowedValue(output, timestamp,
+          windowedValue.getWindows(), windowedValue.getPane());
+    }
+
+    void outputWindowedValue(
+        OutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      context.outputWindowedValue(output, timestamp, windows, pane);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      checkNotNull(tag, "Tag passed to sideOutput cannot be null");
+      context.sideOutputWindowedValue(tag, windowedValue.withValue(output));
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
+      checkTimestamp(timestamp);
+      context.sideOutputWindowedValue(
+          tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane());
+    }
+
+    @Override
+    public Instant timestamp() {
+      return windowedValue.getTimestamp();
+    }
+
+    public Collection<? extends BoundedWindow> windows() {
+      return windowedValue.getWindows();
+    }
+
+    private void checkTimestamp(Instant timestamp) {
+      if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
+        throw new IllegalArgumentException(String.format(
+            "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+            + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
+            + "OldDoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.",
+            timestamp, windowedValue.getTimestamp(),
+            PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
+      }
+    }
+
+    @Override
+    public WindowingInternals<InputT, OutputT> windowingInternals() {
+      return new WindowingInternals<InputT, OutputT>() {
+        @Override
+        public void outputWindowedValue(OutputT output, Instant timestamp,
+            Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+          context.outputWindowedValue(output, timestamp, windows, pane);
+        }
+
+        @Override
+        public Collection<? extends BoundedWindow> windows() {
+          return windowedValue.getWindows();
+        }
+
+        @Override
+        public PaneInfo pane() {
+          return windowedValue.getPane();
+        }
+
+        @Override
+        public TimerInternals timerInternals() {
+          return context.stepContext.timerInternals();
+        }
+
+        @Override
+        public <T> void writePCollectionViewData(
+            TupleTag<?> tag,
+            Iterable<WindowedValue<T>> data,
+            Coder<T> elemCoder) throws IOException {
+          @SuppressWarnings("unchecked")
+          Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder();
+
+          context.stepContext.writePCollectionViewData(
+              tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)),
+              window(), windowCoder);
+        }
+
+        @Override
+        public StateInternals<?> stateInternals() {
+          return context.stepContext.stateInternals();
+        }
+
+        @Override
+        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+          return context.sideInput(view, mainInputWindow);
+        }
+      };
+    }
+
+    @Override
+    protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
+        createAggregatorInternal(
+            String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
+      return context.createAggregatorInternal(name, combiner);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
new file mode 100644
index 0000000..7726374
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.List;
+
+import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ExecutionContext.StepContext;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Static utility methods that provide {@link DoFnRunner} implementations.
+ */
+public class DoFnRunners {
+  /**
+   * Information about how to create output receivers and output to them.
+   */
+  public interface OutputManager {
+    /**
+     * Outputs a single element to the receiver indicated by the given {@link TupleTag}.
+     */
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output);
+  }
+
+  /**
+   * Returns a basic implementation of {@link DoFnRunner} that works for most {@link OldDoFn DoFns}.
+   *
+   * <p>It invokes {@link OldDoFn#processElement} for each input.
+   */
+  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner(
+      PipelineOptions options,
+      OldDoFn<InputT, OutputT> fn,
+      SideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      StepContext stepContext,
+      AggregatorFactory aggregatorFactory,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    return new SimpleDoFnRunner<>(
+        options,
+        fn,
+        sideInputReader,
+        outputManager,
+        mainOutputTag,
+        sideOutputTags,
+        stepContext,
+        aggregatorFactory,
+        windowingStrategy);
+  }
+
+  /**
+   * Returns an implementation of {@link DoFnRunner} that handles late data dropping.
+   *
+   * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}.
+   */
+  public static <K, InputT, OutputT, W extends BoundedWindow>
+      DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner(
+          PipelineOptions options,
+          ReduceFnExecutor<K, InputT, OutputT, W> reduceFnExecutor,
+          SideInputReader sideInputReader,
+          OutputManager outputManager,
+          TupleTag<KV<K, OutputT>> mainOutputTag,
+          List<TupleTag<?>> sideOutputTags,
+          StepContext stepContext,
+          AggregatorFactory aggregatorFactory,
+          WindowingStrategy<?, W> windowingStrategy) {
+    DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> simpleDoFnRunner =
+        simpleRunner(
+            options,
+            reduceFnExecutor.asDoFn(),
+            sideInputReader,
+            outputManager,
+            mainOutputTag,
+            sideOutputTags,
+            stepContext,
+            aggregatorFactory,
+            windowingStrategy);
+    return new LateDataDroppingDoFnRunner<>(
+        simpleDoFnRunner,
+        windowingStrategy,
+        stepContext.timerInternals(),
+        reduceFnExecutor.getDroppedDueToLatenessAggregator());
+  }
+
+
+  public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault(
+      PipelineOptions options,
+      OldDoFn<InputT, OutputT> doFn,
+      SideInputReader sideInputReader,
+      OutputManager outputManager,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      StepContext stepContext,
+      AggregatorFactory aggregatorFactory,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    if (doFn instanceof ReduceFnExecutor) {
+      @SuppressWarnings("rawtypes")
+      ReduceFnExecutor fn = (ReduceFnExecutor) doFn;
+      @SuppressWarnings({"unchecked", "cast", "rawtypes"})
+      DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner(
+          options,
+          fn,
+          sideInputReader,
+          outputManager,
+          (TupleTag) mainOutputTag,
+          sideOutputTags,
+          stepContext,
+          aggregatorFactory,
+          (WindowingStrategy) windowingStrategy);
+      return runner;
+    }
+    return simpleRunner(
+        options,
+        doFn,
+        sideInputReader,
+        outputManager,
+        mainOutputTag,
+        sideOutputTags,
+        stepContext,
+        aggregatorFactory,
+        windowingStrategy);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementByteSizeObservable.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementByteSizeObservable.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementByteSizeObservable.java
new file mode 100644
index 0000000..2380ba9
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementByteSizeObservable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+
+/**
+ * An interface for things that allow observing the size in bytes of
+ * encoded values of type {@code T}.
+ *
+ * @param <T> the type of the values being observed
+ */
+public interface ElementByteSizeObservable<T> {
+  /**
+   * Returns whether {@link #registerByteSizeObserver} is cheap enough
+   * to call for every element, that is, if this
+   * {@code ElementByteSizeObservable} can calculate the byte size of
+   * the element to be coded in roughly constant time (or lazily).
+   */
+  public boolean isRegisterByteSizeObserverCheap(T value);
+
+  /**
+   * Notifies the {@code ElementByteSizeObserver} about the byte size
+   * of the encoded value using this {@code ElementByteSizeObservable}.
+   */
+  public void registerByteSizeObserver(T value,
+                                       ElementByteSizeObserver observer)
+      throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 7cdab00..b427037 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -17,16 +17,13 @@
  */
 package org.apache.beam.runners.core;
 
+import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor;
-import org.apache.beam.sdk.util.GroupAlsoByWindowsDoFn;
 import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.ReduceFnRunner;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.SystemReduceFn;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
new file mode 100644
index 0000000..9851449
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * {@link OldDoFn} that merges windows and groups elements in those windows, optionally
+ * combining values.
+ *
+ * @param <K> key type
+ * @param <InputT> input value element type
+ * @param <OutputT> output value element type
+ * @param <W> window type
+ */
+@SystemDoFnInternal
+public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow>
+    extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
+  public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
+  public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
+
+  protected final Aggregator<Long, Long> droppedDueToClosedWindow =
+      createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
+  protected final Aggregator<Long, Long> droppedDueToLateness =
+      createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
+
+  /**
+   * Create the default {@link GroupAlsoByWindowsDoFn}, which uses window sets to implement the
+   * grouping.
+   *
+   * @param windowingStrategy The window function and trigger to use for grouping
+   * @param inputCoder the input coder to use
+   */
+  public static <K, V, W extends BoundedWindow>
+      GroupAlsoByWindowsDoFn<K, V, Iterable<V>, W> createDefault(
+          WindowingStrategy<?, W> windowingStrategy,
+          StateInternalsFactory<K> stateInternalsFactory,
+          Coder<V> inputCoder) {
+    return new GroupAlsoByWindowsViaOutputBufferDoFn<>(
+        windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, V, W>buffering(inputCoder));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
new file mode 100644
index 0000000..091ad33
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.Iterables;
+import java.util.List;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/**
+ * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no specialized "fast path"
+ * implementation is applicable.
+ */
+@SystemDoFnInternal
+public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow>
+   extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
+
+  private final WindowingStrategy<?, W> strategy;
+  private final StateInternalsFactory<K> stateInternalsFactory;
+  private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
+
+  public GroupAlsoByWindowsViaOutputBufferDoFn(
+      WindowingStrategy<?, W> windowingStrategy,
+      StateInternalsFactory<K> stateInternalsFactory,
+      SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
+    this.strategy = windowingStrategy;
+    this.reduceFn = reduceFn;
+    this.stateInternalsFactory = stateInternalsFactory;
+  }
+
+  @Override
+  public void processElement(
+      OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext c)
+          throws Exception {
+    K key = c.element().getKey();
+    // Used with Batch, we know that all the data is available for this key. We can't use the
+    // timer manager from the context because it doesn't exist. So we create one and emulate the
+    // watermark, knowing that we have all data and it is in timestamp order.
+    BatchTimerInternals timerInternals = new BatchTimerInternals(Instant.now());
+    StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+
+    ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
+        new ReduceFnRunner<K, InputT, OutputT, W>(
+            key,
+            strategy,
+            stateInternals,
+            timerInternals,
+            c.windowingInternals(),
+            droppedDueToClosedWindow,
+            reduceFn,
+            c.getPipelineOptions());
+
+    Iterable<List<WindowedValue<InputT>>> chunks =
+        Iterables.partition(c.element().getValue(), 1000);
+    for (Iterable<WindowedValue<InputT>> chunk : chunks) {
+      // Process the chunk of elements.
+      reduceFnRunner.processElements(chunk);
+
+      // Then, since elements are sorted by their timestamp, advance the input watermark
+      // to the first element, and fire any timers that may have been scheduled.
+      timerInternals.advanceInputWatermark(reduceFnRunner, chunk.iterator().next().getTimestamp());
+
+      // Fire any processing timers that need to fire
+      timerInternals.advanceProcessingTime(reduceFnRunner, Instant.now());
+
+      // Leave the output watermark undefined. Since there's no late data in batch mode
+      // there's really no need to track it as we do for streaming.
+    }
+
+    // Finish any pending windows by advancing the input watermark to infinity.
+    timerInternals.advanceInputWatermark(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    // Finally, advance the processing time to infinity to fire any timers.
+    timerInternals.advanceProcessingTime(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    reduceFnRunner.persist();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
new file mode 100644
index 0000000..b521425
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * An implementation of {@link GroupByKey} built on top of a lower-level {@link GroupByKeyOnly}
+ * primitive.
+ *
+ * <p>This implementation of {@link GroupByKey} proceeds via the following steps:
+ * <ol>
+ *   <li>{@code ReifyTimestampsAndWindowsDoFn ParDo(ReifyTimestampsAndWindows)}: This embeds
+ *       the previously-implicit timestamp and window into the elements themselves, so a
+ *       window-and-timestamp-unaware transform can operate on them.</li>
+ *   <li>{@code GroupByKeyOnly}: This lower-level primitive groups by keys, ignoring windows
+ *       and timestamps. Many window-unaware runners have such a primitive already.</li>
+ *   <li>{@code SortValuesByTimestamp ParDo(SortValuesByTimestamp)}: The values in the iterables
+ *       output by {@link GroupByKeyOnly} are sorted by timestamp.</li>
+ *   <li>{@code GroupAlsoByWindow}: This primitive processes the sorted values. Today it is
+ *       implemented as a {@link ParDo} that calls reserved internal methods.</li>
+ * </ol>
+ *
+ * <p>This implementation of {@link GroupByKey} has severe limitations unless its component
+ * transforms are replaced. As-is, it is only applicable for in-memory runners using a batch-style
+ * execution strategy. Specifically:
+ *
+ * <ul>
+ *   <li>Every iterable output by {@link GroupByKeyOnly} must contain all elements for that key.
+ *       A streaming-style partition, with multiple elements for the same key, will not yield
+ *       correct results.</li>
+ *   <li>Sorting of values by timestamp is performed on an in-memory list. It will not succeed
+ *       for large iterables.</li>
+ *   <li>The implementation of {@code GroupAlsoByWindow} does not support timers. This is only
+ *       appropriate for runners which also do not support timers.</li>
+ * </ul>
+ */
+public class GroupByKeyViaGroupByKeyOnly<K, V>
+    extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+  private final GroupByKey<K, V> gbkTransform;
+
+  public GroupByKeyViaGroupByKeyOnly(GroupByKey<K, V> originalTransform) {
+    this.gbkTransform = originalTransform;
+  }
+
+  @Override
+  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+    return input
+        // Make each input element's timestamp and assigned windows
+        // explicit, in the value part.
+        .apply(new ReifyTimestampsAndWindows<K, V>())
+
+        // Group by just the key.
+        // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
+        // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
+        // introduced in here.
+        .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
+
+        // Sort each key's values by timestamp. GroupAlsoByWindow requires
+        // its input to be sorted by timestamp.
+        .apply(new SortValuesByTimestamp<K, V>())
+
+        // Group each key's values by window, merging windows as needed.
+        .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
+
+        // And update the windowing strategy as appropriate.
+        .setWindowingStrategyInternal(
+            gbkTransform.updateWindowingStrategy(windowingStrategy));
+  }
+
+  /**
+   * Runner-specific primitive that groups by key only, ignoring any window assignments. A
+   * runner that uses {@link GroupByKeyViaGroupByKeyOnly} should have a primitive way to translate
+   * or evaluate this class.
+   */
+  public static class GroupByKeyOnly<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    @Override
+    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+    }
+
+    @Override
+    public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
+      return GroupByKey.getOutputKvCoder(input.getCoder());
+    }
+  }
+
+  /**
+   * Helper transform that sorts the values associated with each key by timestamp.
+   */
+  private static class SortValuesByTimestamp<K, V>
+      extends PTransform<
+          PCollection<KV<K, Iterable<WindowedValue<V>>>>,
+          PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
+    @Override
+    public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
+        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+      return input
+          .apply(
+              ParDo.of(
+                  new OldDoFn<
+                      KV<K, Iterable<WindowedValue<V>>>,
+                      KV<K, Iterable<WindowedValue<V>>>>() {
+                    @Override
+                    public void processElement(ProcessContext c) {
+                      KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
+                      K key = kvs.getKey();
+                      Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
+                      List<WindowedValue<V>> sortedValues = new ArrayList<>();
+                      for (WindowedValue<V> value : unsortedValues) {
+                        sortedValues.add(value);
+                      }
+                      Collections.sort(
+                          sortedValues,
+                          new Comparator<WindowedValue<V>>() {
+                            @Override
+                            public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
+                              return e1.getTimestamp().compareTo(e2.getTimestamp());
+                            }
+                          });
+                      c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
+                    }
+                  }))
+          .setCoder(input.getCoder());
+    }
+  }
+
+  /**
+   * Runner-specific primitive that takes a collection of timestamp-ordered values associated with
+   * each key, groups the values by window, merges windows as needed, and for each window in each
+   * key, outputs a collection of key/value-list pairs implicitly assigned to the window and with
+   * the timestamp derived from that window.
+   */
+  public static class GroupAlsoByWindow<K, V>
+      extends PTransform<
+          PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<V>>>> {
+    private final WindowingStrategy<?, ?> windowingStrategy;
+
+    public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
+      this.windowingStrategy = windowingStrategy;
+    }
+
+    public WindowingStrategy<?, ?> getWindowingStrategy() {
+      return windowingStrategy;
+    }
+
+    private KvCoder<K, Iterable<WindowedValue<V>>> getKvCoder(
+        Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) {
+      // Coder<KV<...>> --> KvCoder<...>
+      checkArgument(inputCoder instanceof KvCoder,
+          "%s requires a %s<...> but got %s",
+          getClass().getSimpleName(),
+          KvCoder.class.getSimpleName(),
+          inputCoder);
+      @SuppressWarnings("unchecked")
+      KvCoder<K, Iterable<WindowedValue<V>>> kvCoder =
+          (KvCoder<K, Iterable<WindowedValue<V>>>) inputCoder;
+      return kvCoder;
+    }
+
+    public Coder<K> getKeyCoder(Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) {
+      return getKvCoder(inputCoder).getKeyCoder();
+    }
+
+    public Coder<V> getValueCoder(Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) {
+      // Coder<Iterable<...>> --> IterableCoder<...>
+      Coder<Iterable<WindowedValue<V>>> iterableWindowedValueCoder =
+          getKvCoder(inputCoder).getValueCoder();
+      checkArgument(iterableWindowedValueCoder instanceof IterableCoder,
+          "%s requires a %s<..., %s> but got a %s",
+          getClass().getSimpleName(),
+          KvCoder.class.getSimpleName(),
+          IterableCoder.class.getSimpleName(),
+          iterableWindowedValueCoder);
+      IterableCoder<WindowedValue<V>> iterableCoder =
+          (IterableCoder<WindowedValue<V>>) iterableWindowedValueCoder;
+
+      // Coder<WindowedValue<...>> --> WindowedValueCoder<...>
+      Coder<WindowedValue<V>> iterableElementCoder = iterableCoder.getElemCoder();
+      checkArgument(iterableElementCoder instanceof WindowedValueCoder,
+          "%s requires a %s<..., %s<%s>> but got a %s",
+          getClass().getSimpleName(),
+          KvCoder.class.getSimpleName(),
+          IterableCoder.class.getSimpleName(),
+          WindowedValueCoder.class.getSimpleName(),
+          iterableElementCoder);
+      WindowedValueCoder<V> windowedValueCoder =
+          (WindowedValueCoder<V>) iterableElementCoder;
+
+      return windowedValueCoder.getValueCoder();
+    }
+
+    @Override
+    public PCollection<KV<K, Iterable<V>>> apply(
+        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+      @SuppressWarnings("unchecked")
+      KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
+          (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
+
+      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+      Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder();
+
+      IterableCoder<WindowedValue<V>> inputIterableValueCoder =
+          (IterableCoder<WindowedValue<V>>) inputValueCoder;
+      Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
+      WindowedValueCoder<V> inputIterableWindowedValueCoder =
+          (WindowedValueCoder<V>) inputIterableElementCoder;
+
+      Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
+      Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
+      Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
+
+      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+          input.getPipeline(), windowingStrategy, input.isBounded())
+          .setCoder(outputKvCoder);
+    }
+
+    private <W extends BoundedWindow>
+        GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn(
+            WindowingStrategy<?, W> strategy,
+            StateInternalsFactory<K> stateInternalsFactory,
+            Coder<V> inputIterableElementValueCoder) {
+      return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
+          strategy,
+          stateInternalsFactory,
+          SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
new file mode 100644
index 0000000..63a80d2
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/**
+ * A customized {@link DoFnRunner} that handles late data dropping for
+ * a {@link KeyedWorkItem} input {@link OldDoFn}.
+ *
+ * <p>It expands windows before checking data lateness.
+ *
+ * <p>{@link KeyedWorkItem KeyedWorkItems} are always in empty windows.
+ *
+ * @param <K> key type
+ * @param <InputT> input value element type
+ * @param <OutputT> output value element type
+ * @param <W> window type
+ */
+public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWindow>
+    implements DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
+  private final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner;
+  private final LateDataFilter lateDataFilter;
+
+  public LateDataDroppingDoFnRunner(
+      DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner,
+      WindowingStrategy<?, ?> windowingStrategy,
+      TimerInternals timerInternals,
+      Aggregator<Long, Long> droppedDueToLateness) {
+    this.doFnRunner = doFnRunner;
+    lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals, droppedDueToLateness);
+  }
+
+  @Override
+  public void startBundle() {
+    doFnRunner.startBundle();
+  }
+
+  @Override
+  public void processElement(WindowedValue<KeyedWorkItem<K, InputT>> elem) {
+    Iterable<WindowedValue<InputT>> nonLateElements = lateDataFilter.filter(
+        elem.getValue().key(), elem.getValue().elementsIterable());
+    KeyedWorkItem<K, InputT> keyedWorkItem = KeyedWorkItems.workItem(
+        elem.getValue().key(), elem.getValue().timersIterable(), nonLateElements);
+    doFnRunner.processElement(elem.withValue(keyedWorkItem));
+  }
+
+  @Override
+  public void finishBundle() {
+    doFnRunner.finishBundle();
+  }
+
+  /**
+   * It filters late data in a {@link KeyedWorkItem}.
+   */
+  @VisibleForTesting
+  static class LateDataFilter {
+    private final WindowingStrategy<?, ?> windowingStrategy;
+    private final TimerInternals timerInternals;
+    private final Aggregator<Long, Long> droppedDueToLateness;
+
+    public LateDataFilter(
+        WindowingStrategy<?, ?> windowingStrategy,
+        TimerInternals timerInternals,
+        Aggregator<Long, Long> droppedDueToLateness) {
+      this.windowingStrategy = windowingStrategy;
+      this.timerInternals = timerInternals;
+      this.droppedDueToLateness = droppedDueToLateness;
+    }
+
+    /**
+     * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains
+     * non-late input elements.
+     */
+    public <K, InputT> Iterable<WindowedValue<InputT>> filter(
+        final K key, Iterable<WindowedValue<InputT>> elements) {
+      Iterable<Iterable<WindowedValue<InputT>>> windowsExpandedElements = Iterables.transform(
+          elements,
+          new Function<WindowedValue<InputT>, Iterable<WindowedValue<InputT>>>() {
+            @Override
+            public Iterable<WindowedValue<InputT>> apply(final WindowedValue<InputT> input) {
+              return Iterables.transform(
+                  input.getWindows(),
+                  new Function<BoundedWindow, WindowedValue<InputT>>() {
+                    @Override
+                    public WindowedValue<InputT> apply(BoundedWindow window) {
+                      return WindowedValue.of(
+                          input.getValue(), input.getTimestamp(), window, input.getPane());
+                    }
+                  });
+            }});
+
+      Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter(
+          Iterables.concat(windowsExpandedElements),
+          new Predicate<WindowedValue<InputT>>() {
+            @Override
+            public boolean apply(WindowedValue<InputT> input) {
+              BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
+              if (canDropDueToExpiredWindow(window)) {
+                // The element is too late for this window.
+                droppedDueToLateness.addValue(1L);
+                WindowTracing.debug(
+                    "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
+                    + "since too far behind inputWatermark:{}; outputWatermark:{}",
+                    input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
+                    timerInternals.currentOutputWatermarkTime());
+                return false;
+              } else {
+                return true;
+              }
+            }
+          });
+      return nonLateElements;
+    }
+
+    /** Is {@code window} expired w.r.t. the garbage collection watermark? */
+    private boolean canDropDueToExpiredWindow(BoundedWindow window) {
+      Instant inputWM = timerInternals.currentInputWatermarkTime();
+      return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
new file mode 100644
index 0000000..3e51dfb
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateMerging;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+
+/**
+ * Tracks which windows have non-empty panes. Specifically, which windows have new elements since
+ * their last triggering.
+ *
+ * @param <W> The kind of windows being tracked.
+ */
+public abstract class NonEmptyPanes<K, W extends BoundedWindow> {
+
+  static <K, W extends BoundedWindow> NonEmptyPanes<K, W> create(
+      WindowingStrategy<?, W> strategy, ReduceFn<K, ?, ?, W> reduceFn) {
+    if (strategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
+      return new DiscardingModeNonEmptyPanes<>(reduceFn);
+    } else {
+      return new GeneralNonEmptyPanes<>();
+    }
+  }
+
+  /**
+   * Record that some content has been added to the window in {@code context}, and therefore the
+   * current pane is not empty.
+   */
+  public abstract void recordContent(StateAccessor<K> context);
+
+  /**
+   * Record that the given pane is empty.
+   */
+  public abstract void clearPane(StateAccessor<K> state);
+
+  /**
+   * Return true if the current pane for the window in {@code context} is empty.
+   */
+  public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context);
+
+  /**
+   * Prefetch in preparation for merging.
+   */
+  public abstract void prefetchOnMerge(MergingStateAccessor<K, W> state);
+
+  /**
+   * Eagerly merge backing state.
+   */
+  public abstract void onMerge(MergingStateAccessor<K, W> context);
+
+  /**
+   * An implementation of {@code NonEmptyPanes} optimized for use with discarding mode. Uses the
+   * presence of data in the accumulation buffer to record non-empty panes.
+   */
+  private static class DiscardingModeNonEmptyPanes<K, W extends BoundedWindow>
+      extends NonEmptyPanes<K, W> {
+
+    private ReduceFn<K, ?, ?, W> reduceFn;
+
+    private DiscardingModeNonEmptyPanes(ReduceFn<K, ?, ?, W> reduceFn) {
+      this.reduceFn = reduceFn;
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
+      return reduceFn.isEmpty(state);
+    }
+
+    @Override
+    public void recordContent(StateAccessor<K> state) {
+      // Nothing to do -- the reduceFn is tracking contents
+    }
+
+    @Override
+    public void clearPane(StateAccessor<K> state) {
+      // Nothing to do -- the reduceFn is tracking contents
+    }
+
+    @Override
+    public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
+      // Nothing to do -- the reduceFn is tracking contents
+    }
+
+    @Override
+    public void onMerge(MergingStateAccessor<K, W> context) {
+      // Nothing to do -- the reduceFn is tracking contents
+    }
+  }
+
+  /**
+   * An implementation of {@code NonEmptyPanes} for general use.
+   */
+  private static class GeneralNonEmptyPanes<K, W extends BoundedWindow>
+      extends NonEmptyPanes<K, W> {
+
+    private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
+        PANE_ADDITIONS_TAG =
+        StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
+            "count", VarLongCoder.of(), new Sum.SumLongFn()));
+
+    @Override
+    public void recordContent(StateAccessor<K> state) {
+      state.access(PANE_ADDITIONS_TAG).add(1L);
+    }
+
+    @Override
+    public void clearPane(StateAccessor<K> state) {
+      state.access(PANE_ADDITIONS_TAG).clear();
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
+      return state.access(PANE_ADDITIONS_TAG).isEmpty();
+    }
+
+    @Override
+    public void prefetchOnMerge(MergingStateAccessor<K, W> state) {
+      StateMerging.prefetchCombiningValues(state, PANE_ADDITIONS_TAG);
+    }
+
+    @Override
+    public void onMerge(MergingStateAccessor<K, W> context) {
+      StateMerging.mergeCombiningValues(context, PANE_ADDITIONS_TAG);
+    }
+  }
+}