You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/18 06:32:15 UTC

[2/7] incubator-beam git commit: Refactor FlinkProcessContext more cleanly into single- and multi-output versions

Refactor FlinkProcessContext more cleanly into single- and multi-output versions


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1fb1f7be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1fb1f7be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1fb1f7be

Branch: refs/heads/master
Commit: 1fb1f7bebeacf54e66f606f373d68c14483a444c
Parents: 24cae56
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Nov 11 16:37:42 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Nov 17 13:18:36 2016 -0800

----------------------------------------------------------------------
 .../functions/FlinkDoFnFunction.java            |  12 +-
 .../FlinkMergingNonShuffleReduceFunction.java   |  14 +-
 .../FlinkMergingPartialReduceFunction.java      |  14 +-
 .../functions/FlinkMergingReduceFunction.java   |  12 +-
 .../functions/FlinkMultiOutputDoFnFunction.java |  14 +-
 .../FlinkMultiOutputProcessContext.java         |  94 +----
 .../functions/FlinkPartialReduceFunction.java   |  14 +-
 .../functions/FlinkProcessContext.java          | 343 -------------------
 .../functions/FlinkProcessContextBase.java      | 285 +++++++++++++++
 .../functions/FlinkReduceFunction.java          |  14 +-
 .../FlinkSingleOutputProcessContext.java        |  70 ++++
 11 files changed, 421 insertions(+), 465 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 798a23c..dc0ef0f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -64,20 +64,20 @@ public class FlinkDoFnFunction<InputT, OutputT>
       Iterable<WindowedValue<InputT>> values,
       Collector<WindowedValue<OutputT>> out) throws Exception {
 
-    FlinkProcessContext<InputT, OutputT> context = new FlinkProcessContext<>(
+    FlinkSingleOutputProcessContext<InputT, OutputT> context = new FlinkSingleOutputProcessContext<>(
         serializedOptions.getPipelineOptions(),
         getRuntimeContext(),
         doFn,
         windowingStrategy,
-        out,
-        sideInputs);
+        sideInputs, out
+    );
 
     this.doFn.startBundle(context);
 
     if (!requiresWindowAccess || hasSideInputs) {
       // we don't need to explode the windows
       for (WindowedValue<InputT> value : values) {
-        context = context.forWindowedValue(value);
+        context.setWindowedValue(value);
         doFn.processElement(context);
       }
     } else {
@@ -86,7 +86,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
       // is in only one window
       for (WindowedValue<InputT> value : values) {
         for (WindowedValue<InputT> explodedValue : value.explodeWindows()) {
-          context = context.forWindowedValue(explodedValue);
+          context.setWindowedValue(explodedValue);
           doFn.processElement(context);
         }
       }
@@ -94,7 +94,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
 
     // set the windowed value to null so that the special logic for outputting
     // in startBundle/finishBundle kicks in
-    context = context.forWindowedValue(null);
+    context.setWindowedValue(null);
     this.doFn.finishBundle(context);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index dbaab17..a4284f8 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -92,14 +92,14 @@ public class FlinkMergingNonShuffleReduceFunction<
       Iterable<WindowedValue<KV<K, InputT>>> elements,
       Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
 
-    FlinkProcessContext<KV<K, InputT>, KV<K, OutputT>> processContext =
-        new FlinkProcessContext<>(
+    FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, OutputT>> processContext =
+        new FlinkSingleOutputProcessContext<>(
             serializedOptions.getPipelineOptions(),
             getRuntimeContext(),
             doFn,
             windowingStrategy,
-            out,
-            sideInputs);
+            sideInputs, out
+        );
 
     PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
         PerKeyCombineFnRunners.create(combineFn);
@@ -141,7 +141,7 @@ public class FlinkMergingNonShuffleReduceFunction<
     IntervalWindow currentWindow =
         (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
     InputT firstValue = currentValue.getValue().getValue();
-    processContext = processContext.forWindowedValue(currentValue);
+    processContext.setWindowedValue(currentValue);
     AccumT accumulator = combineFnRunner.createAccumulator(key, processContext);
     accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext);
 
@@ -157,7 +157,7 @@ public class FlinkMergingNonShuffleReduceFunction<
         // continue accumulating and merge windows
 
         InputT value = nextValue.getValue().getValue();
-        processContext = processContext.forWindowedValue(nextValue);
+        processContext.setWindowedValue(nextValue);
         accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
 
         windowTimestamp = outputTimeFn.combine(
@@ -175,7 +175,7 @@ public class FlinkMergingNonShuffleReduceFunction<
 
         currentWindow = nextWindow;
         InputT value = nextValue.getValue().getValue();
-        processContext = processContext.forWindowedValue(nextValue);
+        processContext.setWindowedValue(nextValue);
         accumulator = combineFnRunner.createAccumulator(key, processContext);
         accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
         windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
index bc09bdf..30d3326 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
@@ -60,14 +60,14 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte
       Iterable<WindowedValue<KV<K, InputT>>> elements,
       Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
 
-    FlinkProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext =
-        new FlinkProcessContext<>(
+    FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext =
+        new FlinkSingleOutputProcessContext<>(
             serializedOptions.getPipelineOptions(),
             getRuntimeContext(),
             doFn,
             windowingStrategy,
-            out,
-            sideInputs);
+            sideInputs, out
+        );
 
     PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
         PerKeyCombineFnRunners.create(combineFn);
@@ -109,7 +109,7 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte
     IntervalWindow currentWindow =
         (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
     InputT firstValue = currentValue.getValue().getValue();
-    processContext = processContext.forWindowedValue(currentValue);
+    processContext.setWindowedValue(currentValue);
     AccumT accumulator = combineFnRunner.createAccumulator(key, processContext);
     accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext);
 
@@ -125,7 +125,7 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte
         // continue accumulating and merge windows
 
         InputT value = nextValue.getValue().getValue();
-        processContext = processContext.forWindowedValue(nextValue);
+        processContext.setWindowedValue(nextValue);
         accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
 
         windowTimestamp = outputTimeFn.combine(
@@ -143,7 +143,7 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte
 
         currentWindow = nextWindow;
         InputT value = nextValue.getValue().getValue();
-        processContext = processContext.forWindowedValue(nextValue);
+        processContext.setWindowedValue(nextValue);
         accumulator = combineFnRunner.createAccumulator(key, processContext);
         accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
         windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
index 4050f47..29dc1e3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
@@ -62,14 +62,14 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi
       Iterable<WindowedValue<KV<K, AccumT>>> elements,
       Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
 
-    FlinkProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext =
-        new FlinkProcessContext<>(
+    FlinkSingleOutputProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext =
+        new FlinkSingleOutputProcessContext<>(
             serializedOptions.getPipelineOptions(),
             getRuntimeContext(),
             doFn,
             windowingStrategy,
-            out,
-            sideInputs);
+            sideInputs, out
+        );
 
     PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
         PerKeyCombineFnRunners.create(combineFn);
@@ -127,7 +127,7 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi
       if (nextWindow.equals(currentWindow)) {
         // continue accumulating and merge windows
 
-        processContext = processContext.forWindowedValue(nextValue);
+        processContext.setWindowedValue(nextValue);
 
         accumulator = combineFnRunner.mergeAccumulators(
             key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), processContext);
@@ -143,7 +143,7 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi
 
         windowTimestamps.clear();
 
-        processContext = processContext.forWindowedValue(nextValue);
+        processContext.setWindowedValue(nextValue);
 
         currentWindow = nextWindow;
         accumulator = nextValue.getValue().getValue();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index 810609e..7be4bb4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -74,22 +74,22 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
       Iterable<WindowedValue<InputT>> values,
       Collector<WindowedValue<RawUnionValue>> out) throws Exception {
 
-    FlinkProcessContext<InputT, OutputT> context =
+    FlinkMultiOutputProcessContext<InputT, OutputT> context =
         new FlinkMultiOutputProcessContext<>(
             serializedOptions.getPipelineOptions(),
             getRuntimeContext(),
             doFn,
             windowingStrategy,
-            out,
-            outputMap,
-            sideInputs);
+            sideInputs, out,
+            outputMap
+        );
 
     this.doFn.startBundle(context);
 
     if (!requiresWindowAccess || hasSideInputs) {
       // we don't need to explode the windows
       for (WindowedValue<InputT> value : values) {
-        context = context.forWindowedValue(value);
+        context.setWindowedValue(value);
         doFn.processElement(context);
       }
     } else {
@@ -98,7 +98,7 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
       // is in only one window
       for (WindowedValue<InputT> value : values) {
         for (WindowedValue<InputT> explodedValue : value.explodeWindows()) {
-          context = context.forWindowedValue(value);
+          context.setWindowedValue(value);
           doFn.processElement(context);
         }
       }
@@ -106,7 +106,7 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
 
     // set the windowed value to null so that the special logic for outputting
     // in startBundle/finishBundle kicks in
-    context = context.forWindowedValue(null);
+    context.setWindowedValue(null);
     this.doFn.finishBundle(context);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
index 153a2d7..a3d2b18 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
@@ -33,95 +33,35 @@ import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
 
 /**
- * {@link OldDoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that supports
- * side outputs.
+ * {@link OldDoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that supports side
+ * outputs.
  */
 class FlinkMultiOutputProcessContext<InputT, OutputT>
-    extends FlinkProcessContext<InputT, OutputT> {
+    extends FlinkProcessContextBase<InputT, OutputT> {
 
-  // we need a different Collector from the base class
   private final Collector<WindowedValue<RawUnionValue>> collector;
-
   private final Map<TupleTag<?>, Integer> outputMap;
 
-
   FlinkMultiOutputProcessContext(
       PipelineOptions pipelineOptions,
       RuntimeContext runtimeContext,
       OldDoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       Collector<WindowedValue<RawUnionValue>> collector,
-      Map<TupleTag<?>, Integer> outputMap,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) {
-    super(
-        pipelineOptions,
-        runtimeContext,
-        doFn,
-        windowingStrategy,
-        new Collector<WindowedValue<OutputT>>() {
-          @Override
-          public void collect(WindowedValue<OutputT> outputTWindowedValue) {
-
-          }
-
-          @Override
-          public void close() {
-
-          }
-        },
-        sideInputs);
-
+      Map<TupleTag<?>, Integer> outputMap) {
+    super(pipelineOptions, runtimeContext, doFn, windowingStrategy, sideInputs);
     this.collector = collector;
     this.outputMap = outputMap;
   }
 
   @Override
-  public FlinkProcessContext<InputT, OutputT> forWindowedValue(
-      WindowedValue<InputT> windowedValue) {
-    this.windowedValue = windowedValue;
-    return this;
-  }
-
-  @Override
-  public void outputWithTimestamp(OutputT value, Instant timestamp) {
-    if (windowedValue == null) {
-      // we are in startBundle() or finishBundle()
-
-      try {
-        Collection windows = windowingStrategy.getWindowFn().assignWindows(
-            new FlinkNoElementAssignContext(
-                windowingStrategy.getWindowFn(),
-                value,
-                timestamp));
-
-        collector.collect(
-            WindowedValue.of(
-                new RawUnionValue(0, value),
-                timestamp != null ? timestamp : new Instant(Long.MIN_VALUE),
-                windows,
-                PaneInfo.NO_FIRING));
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      collector.collect(
-          WindowedValue.of(
-              new RawUnionValue(0, value),
-              windowedValue.getTimestamp(),
-              windowedValue.getWindows(),
-              windowedValue.getPane()));
-    }
-  }
-
-  @Override
   protected void outputWithTimestampAndWindow(
       OutputT value,
       Instant timestamp,
       Collection<? extends BoundedWindow> windows,
       PaneInfo pane) {
-    collector.collect(
-        WindowedValue.of(
-            new RawUnionValue(0, value), timestamp, windows, pane));
+    collector.collect(WindowedValue.of(new RawUnionValue(0, value), timestamp, windows, pane));
   }
 
   @Override
@@ -142,19 +82,24 @@ class FlinkMultiOutputProcessContext<InputT, OutputT>
       throw new IllegalArgumentException("Unknown side output tag: " + tag);
     }
 
+    outputUnionValue(value, timestamp, new RawUnionValue(index, value));
+  }
+
+  private <T> void outputUnionValue(T value, Instant timestamp, RawUnionValue unionValue) {
     if (windowedValue == null) {
       // we are in startBundle() or finishBundle()
 
       try {
-        Collection windows = windowingStrategy.getWindowFn().assignWindows(
-            new FlinkNoElementAssignContext(
-                windowingStrategy.getWindowFn(),
-                value,
-                timestamp));
+        Collection<? extends BoundedWindow> windows =
+            windowingStrategy
+                .getWindowFn()
+                .assignWindows(
+                    new FlinkNoElementAssignContext(
+                        windowingStrategy.getWindowFn(), value, timestamp));
 
         collector.collect(
             WindowedValue.of(
-                new RawUnionValue(index, value),
+                unionValue,
                 timestamp != null ? timestamp : new Instant(Long.MIN_VALUE),
                 windows,
                 PaneInfo.NO_FIRING));
@@ -164,11 +109,10 @@ class FlinkMultiOutputProcessContext<InputT, OutputT>
     } else {
       collector.collect(
           WindowedValue.of(
-              new RawUnionValue(index, value),
+              unionValue,
               windowedValue.getTimestamp(),
               windowedValue.getWindows(),
               windowedValue.getPane()));
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index fa2ce4d..3ea456a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -88,14 +88,14 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
       Iterable<WindowedValue<KV<K, InputT>>> elements,
       Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
 
-    FlinkProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext =
-        new FlinkProcessContext<>(
+    FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext =
+        new FlinkSingleOutputProcessContext<>(
             serializedOptions.getPipelineOptions(),
             getRuntimeContext(),
             doFn,
             windowingStrategy,
-            out,
-            sideInputs);
+            sideInputs, out
+        );
 
     PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
         PerKeyCombineFnRunners.create(combineFn);
@@ -132,7 +132,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
     K key = currentValue.getValue().getKey();
     BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null);
     InputT firstValue = currentValue.getValue().getValue();
-    processContext = processContext.forWindowedValue(currentValue);
+    processContext.setWindowedValue(currentValue);
     AccumT accumulator = combineFnRunner.createAccumulator(key, processContext);
     accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext);
 
@@ -147,7 +147,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
       if (nextWindow.equals(currentWindow)) {
         // continue accumulating
         InputT value = nextValue.getValue().getValue();
-        processContext = processContext.forWindowedValue(nextValue);
+        processContext.setWindowedValue(nextValue);
         accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
 
         windowTimestamp = outputTimeFn.combine(
@@ -165,7 +165,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
 
         currentWindow = nextWindow;
         InputT value = nextValue.getValue().getValue();
-        processContext = processContext.forWindowedValue(nextValue);
+        processContext.setWindowedValue(nextValue);
         accumulator = combineFnRunner.createAccumulator(key, processContext);
         accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
         windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
deleted file mode 100644
index 1b28a70..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * {@link OldDoFn.ProcessContext} for our Flink Wrappers.
- */
-class FlinkProcessContext<InputT, OutputT>
-    extends OldDoFn<InputT, OutputT>.ProcessContext {
-
-  private final PipelineOptions pipelineOptions;
-  private final RuntimeContext runtimeContext;
-  private Collector<WindowedValue<OutputT>> collector;
-  private final boolean requiresWindowAccess;
-
-  protected WindowedValue<InputT> windowedValue;
-
-  protected WindowingStrategy<?, ?> windowingStrategy;
-
-  private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
-
-  FlinkProcessContext(
-      PipelineOptions pipelineOptions,
-      RuntimeContext runtimeContext,
-      OldDoFn<InputT, OutputT> doFn,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Collector<WindowedValue<OutputT>> collector,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) {
-    doFn.super();
-    checkNotNull(pipelineOptions);
-    checkNotNull(runtimeContext);
-    checkNotNull(doFn);
-    checkNotNull(collector);
-
-    this.pipelineOptions = pipelineOptions;
-    this.runtimeContext = runtimeContext;
-    this.collector = collector;
-    this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
-    this.windowingStrategy = windowingStrategy;
-    this.sideInputs = sideInputs;
-
-    super.setupDelegateAggregators();
-  }
-
-  FlinkProcessContext(
-      PipelineOptions pipelineOptions,
-      RuntimeContext runtimeContext,
-      OldDoFn<InputT, OutputT> doFn,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) {
-    doFn.super();
-    checkNotNull(pipelineOptions);
-    checkNotNull(runtimeContext);
-    checkNotNull(doFn);
-
-    this.pipelineOptions = pipelineOptions;
-    this.runtimeContext = runtimeContext;
-    this.collector = null;
-    this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
-    this.windowingStrategy = windowingStrategy;
-    this.sideInputs = sideInputs;
-
-    super.setupDelegateAggregators();
-  }
-
-  public FlinkProcessContext<InputT, OutputT> forOutput(
-      Collector<WindowedValue<OutputT>> collector) {
-    this.collector = collector;
-
-    // for now, returns ourselves, to be easy on the GC
-    return this;
-  }
-
-
-
-  public FlinkProcessContext<InputT, OutputT> forWindowedValue(
-      WindowedValue<InputT> windowedValue) {
-    this.windowedValue = windowedValue;
-
-    // for now, returns ourselves, to be easy on the GC
-    return this;
-  }
-
-  @Override
-  public InputT element() {
-    return this.windowedValue.getValue();
-  }
-
-
-  @Override
-  public Instant timestamp() {
-    return windowedValue.getTimestamp();
-  }
-
-  @Override
-  public BoundedWindow window() {
-    if (!requiresWindowAccess) {
-      throw new UnsupportedOperationException(
-          "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess.");
-    }
-    return Iterables.getOnlyElement(windowedValue.getWindows());
-  }
-
-  @Override
-  public PaneInfo pane() {
-    return windowedValue.getPane();
-  }
-
-  @Override
-  public WindowingInternals<InputT, OutputT> windowingInternals() {
-
-    return new WindowingInternals<InputT, OutputT>() {
-
-      @Override
-      public StateInternals stateInternals() {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void outputWindowedValue(
-          OutputT value,
-          Instant timestamp,
-          Collection<? extends BoundedWindow> windows,
-          PaneInfo pane) {
-        // TODO: Refactor this (get rid of duplication, move things around w.r.t.
-        // FlinkMultiOutputProcessContext)
-        collector.collect(WindowedValue.of(value, timestamp, windows, pane));
-        outputWithTimestampAndWindow(value, timestamp, windows, pane);
-      }
-
-      @Override
-      public <SideOutputT> void sideOutputWindowedValue(
-          TupleTag<SideOutputT> tag,
-          SideOutputT output,
-          Instant timestamp,
-          Collection<? extends BoundedWindow> windows,
-          PaneInfo pane) {
-        // TODO: Implement this
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public TimerInternals timerInternals() {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public Collection<? extends BoundedWindow> windows() {
-        return windowedValue.getWindows();
-      }
-
-      @Override
-      public PaneInfo pane() {
-        return windowedValue.getPane();
-      }
-
-      @Override
-      public <T> void writePCollectionViewData(TupleTag<?> tag,
-          Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public <ViewT> ViewT sideInput(
-          PCollectionView<ViewT> view,
-          BoundedWindow mainInputWindow) {
-
-        checkNotNull(view, "View passed to sideInput cannot be null");
-        checkNotNull(
-            sideInputs.get(view),
-            "Side input for " + view + " not available.");
-
-        // get the side input strategy for mapping the window
-        WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view);
-
-        BoundedWindow sideInputWindow =
-            windowingStrategy.getWindowFn().getSideInputWindow(mainInputWindow);
-
-        Map<BoundedWindow, ViewT> sideInputs =
-            runtimeContext.getBroadcastVariableWithInitializer(
-                view.getTagInternal().getId(), new SideInputInitializer<>(view));
-        return sideInputs.get(sideInputWindow);
-      }
-    };
-  }
-
-  @Override
-  public PipelineOptions getPipelineOptions() {
-    return pipelineOptions;
-  }
-
-  @Override
-  public <ViewT> ViewT sideInput(PCollectionView<ViewT> view) {
-    checkNotNull(view, "View passed to sideInput cannot be null");
-    checkNotNull(sideInputs.get(view), "Side input for " + view + " not available.");
-    Iterator<? extends BoundedWindow> windowIter = windowedValue.getWindows().iterator();
-    BoundedWindow window;
-    if (!windowIter.hasNext()) {
-      throw new IllegalStateException(
-          "sideInput called when main input element is not in any windows");
-    } else {
-      window = windowIter.next();
-      if (windowIter.hasNext()) {
-        throw new IllegalStateException(
-            "sideInput called when main input element is in multiple windows");
-      }
-    }
-
-    // get the side input strategy for mapping the window
-    WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view);
-
-    BoundedWindow sideInputWindow =
-        windowingStrategy.getWindowFn().getSideInputWindow(window);
-
-    Map<BoundedWindow, ViewT> sideInputs =
-        runtimeContext.getBroadcastVariableWithInitializer(
-            view.getTagInternal().getId(), new SideInputInitializer<>(view));
-    ViewT result = sideInputs.get(sideInputWindow);
-    if (result == null) {
-      result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
-    }
-    return result;
-  }
-
-  @Override
-  public void output(OutputT value) {
-    if (windowedValue != null) {
-      outputWithTimestamp(value, windowedValue.getTimestamp());
-    } else {
-      outputWithTimestamp(value, null);
-    }
-  }
-
-  @Override
-  public void outputWithTimestamp(OutputT value, Instant timestamp) {
-    if (windowedValue == null) {
-      // we are in startBundle() or finishBundle()
-
-      try {
-        Collection windows = windowingStrategy.getWindowFn().assignWindows(
-            new FlinkNoElementAssignContext(
-                windowingStrategy.getWindowFn(),
-                value,
-                timestamp));
-
-        collector.collect(
-            WindowedValue.of(
-                value,
-                timestamp != null ? timestamp : new Instant(Long.MIN_VALUE),
-                windows,
-                PaneInfo.NO_FIRING));
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      collector.collect(
-          WindowedValue.of(
-              value,
-              timestamp,
-              windowedValue.getWindows(),
-              windowedValue.getPane()));
-    }
-  }
-
-  protected void outputWithTimestampAndWindow(
-      OutputT value,
-      Instant timestamp,
-      Collection<? extends BoundedWindow> windows,
-      PaneInfo pane) {
-    collector.collect(
-        WindowedValue.of(
-            value, timestamp, windows, pane));
-  }
-
-  @Override
-  public <T> void sideOutput(TupleTag<T> tag, T output) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-    sideOutput(tag, output);
-  }
-
-  @Override
-  protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-  createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-    SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper =
-        new SerializableFnAggregatorWrapper<>(combiner);
-    Accumulator<?, ?> existingAccum =
-        (Accumulator<AggInputT, Serializable>) runtimeContext.getAccumulator(name);
-    if (existingAccum != null) {
-      return wrapper;
-    } else {
-      runtimeContext.addAccumulator(name, wrapper);
-    }
-    return wrapper;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
new file mode 100644
index 0000000..b814015
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+/**
+ * {@link OldDoFn.ProcessContext} for our Flink Wrappers.
+ */
+abstract class FlinkProcessContextBase<InputT, OutputT>
+    extends OldDoFn<InputT, OutputT>.ProcessContext {
+
+  private final PipelineOptions pipelineOptions;
+  private final RuntimeContext runtimeContext;
+  private final boolean requiresWindowAccess;
+  protected final WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy;
+  private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+
+  protected WindowedValue<InputT> windowedValue;
+
+  FlinkProcessContextBase(
+      PipelineOptions pipelineOptions,
+      RuntimeContext runtimeContext,
+      OldDoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy,
+      Map<PCollectionView<?>, WindowingStrategy<?, ? extends BoundedWindow>> sideInputs) {
+    doFn.super();
+    checkNotNull(pipelineOptions);
+    checkNotNull(runtimeContext);
+    checkNotNull(doFn);
+
+    this.pipelineOptions = pipelineOptions;
+    this.runtimeContext = runtimeContext;
+    this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
+    this.windowingStrategy = windowingStrategy;
+    this.sideInputs = sideInputs;
+
+    super.setupDelegateAggregators();
+  }
+
+  public void setWindowedValue(WindowedValue<InputT> windowedValue) {
+    this.windowedValue = windowedValue;
+  }
+
+  @Override
+  public InputT element() {
+    return this.windowedValue.getValue();
+  }
+
+
+  @Override
+  public Instant timestamp() {
+    return windowedValue.getTimestamp();
+  }
+
+  @Override
+  public BoundedWindow window() {
+    if (!requiresWindowAccess) {
+      throw new UnsupportedOperationException(
+          "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess.");
+    }
+    return Iterables.getOnlyElement(windowedValue.getWindows());
+  }
+
+  @Override
+  public PaneInfo pane() {
+    return windowedValue.getPane();
+  }
+
+  @Override
+  public WindowingInternals<InputT, OutputT> windowingInternals() {
+
+    return new WindowingInternals<InputT, OutputT>() {
+
+      @Override
+      public StateInternals stateInternals() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void outputWindowedValue(
+          OutputT value,
+          Instant timestamp,
+          Collection<? extends BoundedWindow> windows,
+          PaneInfo pane) {
+        outputWithTimestampAndWindow(value, timestamp, windows, pane);
+      }
+
+      @Override
+      public <SideOutputT> void sideOutputWindowedValue(
+          TupleTag<SideOutputT> tag,
+          SideOutputT output,
+          Instant timestamp,
+          Collection<? extends BoundedWindow> windows,
+          PaneInfo pane) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public TimerInternals timerInternals() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public Collection<? extends BoundedWindow> windows() {
+        return windowedValue.getWindows();
+      }
+
+      @Override
+      public PaneInfo pane() {
+        return windowedValue.getPane();
+      }
+
+      @Override
+      public <T> void writePCollectionViewData(TupleTag<?> tag,
+          Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public <ViewT> ViewT sideInput(
+          PCollectionView<ViewT> view,
+          BoundedWindow mainInputWindow) {
+
+        checkNotNull(view, "View passed to sideInput cannot be null");
+        checkNotNull(
+            sideInputs.get(view),
+            "Side input for " + view + " not available.");
+
+        // get the side input strategy for mapping the window
+        WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view);
+
+        BoundedWindow sideInputWindow =
+            windowingStrategy.getWindowFn().getSideInputWindow(mainInputWindow);
+
+        Map<BoundedWindow, ViewT> sideInputs =
+            runtimeContext.getBroadcastVariableWithInitializer(
+                view.getTagInternal().getId(), new SideInputInitializer<>(view));
+        return sideInputs.get(sideInputWindow);
+      }
+    };
+  }
+
+  @Override
+  public PipelineOptions getPipelineOptions() {
+    return pipelineOptions;
+  }
+
+  @Override
+  public <ViewT> ViewT sideInput(PCollectionView<ViewT> view) {
+    checkNotNull(view, "View passed to sideInput cannot be null");
+    checkNotNull(sideInputs.get(view), "Side input for " + view + " not available.");
+    Iterator<? extends BoundedWindow> windowIter = windowedValue.getWindows().iterator();
+    BoundedWindow window;
+    if (!windowIter.hasNext()) {
+      throw new IllegalStateException(
+          "sideInput called when main input element is not in any windows");
+    } else {
+      window = windowIter.next();
+      if (windowIter.hasNext()) {
+        throw new IllegalStateException(
+            "sideInput called when main input element is in multiple windows");
+      }
+    }
+
+    // get the side input strategy for mapping the window
+    WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view);
+
+    BoundedWindow sideInputWindow =
+        windowingStrategy.getWindowFn().getSideInputWindow(window);
+
+    Map<BoundedWindow, ViewT> sideInputs =
+        runtimeContext.getBroadcastVariableWithInitializer(
+            view.getTagInternal().getId(), new SideInputInitializer<>(view));
+    ViewT result = sideInputs.get(sideInputWindow);
+    if (result == null) {
+      result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
+    }
+    return result;
+  }
+
+  @Override
+  public void output(OutputT value) {
+    if (windowedValue != null) {
+      outputWithTimestamp(value, windowedValue.getTimestamp());
+    } else {
+      outputWithTimestamp(value, null);
+    }
+  }
+
+  @Override
+  public final void outputWithTimestamp(OutputT value, Instant timestamp) {
+    if (windowedValue == null) {
+      // we are in startBundle() or finishBundle()
+
+      try {
+        Collection windows = windowingStrategy.getWindowFn().assignWindows(
+            new FlinkNoElementAssignContext(
+                windowingStrategy.getWindowFn(),
+                value,
+                timestamp));
+
+        outputWithTimestampAndWindow(
+            value,
+            timestamp != null ? timestamp : new Instant(Long.MIN_VALUE),
+            windows,
+            PaneInfo.NO_FIRING);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      outputWithTimestampAndWindow(
+          value, timestamp, windowedValue.getWindows(), windowedValue.getPane());
+    }
+  }
+
+  protected abstract void outputWithTimestampAndWindow(
+      OutputT value,
+      Instant timestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane);
+
+  @Override
+  public abstract <T> void sideOutput(TupleTag<T> tag, T output);
+
+  @Override
+  public abstract <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp);
+
+  @Override
+  protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+  createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+    SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper =
+        new SerializableFnAggregatorWrapper<>(combiner);
+    Accumulator<?, ?> existingAccum =
+        (Accumulator<AggInputT, Serializable>) runtimeContext.getAccumulator(name);
+    if (existingAccum != null) {
+      return wrapper;
+    } else {
+      runtimeContext.addAccumulator(name, wrapper);
+    }
+    return wrapper;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index c9b24b4..ab0c471 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -92,14 +92,14 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
       Iterable<WindowedValue<KV<K, AccumT>>> elements,
       Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
 
-    FlinkProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext =
-        new FlinkProcessContext<>(
+    FlinkSingleOutputProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext =
+        new FlinkSingleOutputProcessContext<>(
             serializedOptions.getPipelineOptions(),
             getRuntimeContext(),
             doFn,
             windowingStrategy,
-            out,
-            sideInputs);
+            sideInputs, out
+        );
 
     PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
         PerKeyCombineFnRunners.create(combineFn);
@@ -150,14 +150,14 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
 
       if (nextWindow.equals(currentWindow)) {
         // continue accumulating
-        processContext = processContext.forWindowedValue(nextValue);
+        processContext.setWindowedValue(nextValue);
         accumulator = combineFnRunner.mergeAccumulators(
             key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), processContext);
 
         windowTimestamps.add(nextValue.getTimestamp());
       } else {
         // emit the value that we currently have
-        processContext = processContext.forWindowedValue(currentValue);
+        processContext.setWindowedValue(currentValue);
         out.collect(
             WindowedValue.of(
                 KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)),
@@ -179,7 +179,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
 
     // if at the end of the iteration we have a change in windows
     // the ProcessContext will not have been updated
-    processContext = processContext.forWindowedValue(currentValue);
+    processContext.setWindowedValue(currentValue);
 
     // emit the final accumulator
     out.collect(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
new file mode 100644
index 0000000..d67f6fd
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+/** {@link OldDoFn.ProcessContext} for {@link FlinkDoFnFunction} with a single main output. */
+class FlinkSingleOutputProcessContext<InputT, OutputT>
+    extends FlinkProcessContextBase<InputT, OutputT> {
+
+  private final Collector<WindowedValue<OutputT>> collector;
+
+  FlinkSingleOutputProcessContext(
+      PipelineOptions pipelineOptions,
+      RuntimeContext runtimeContext,
+      OldDoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+      Collector<WindowedValue<OutputT>> collector) {
+    super(pipelineOptions, runtimeContext, doFn, windowingStrategy, sideInputs);
+    this.collector = collector;
+  }
+
+  @Override
+  protected void outputWithTimestampAndWindow(
+      OutputT value,
+      Instant timestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane) {
+    collector.collect(WindowedValue.of(value, timestamp, windows, pane));
+  }
+
+  @Override
+  public <T> void sideOutput(TupleTag<T> tag, T value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
+    throw new UnsupportedOperationException();
+  }
+}