You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/24 19:52:46 UTC

[08/17] incubator-beam git commit: [BEAM-253] Unify Flink-Streaming Operator Wrappers

[BEAM-253] Unify Flink-Streaming Operator Wrappers

This also replaces the custom Flink StateInternals by proper Flink
Partitioned StateInternals.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1de76b7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1de76b7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1de76b7a

Branch: refs/heads/master
Commit: 1de76b7a5169a46ef9f14406e5a6e1284832f7f9
Parents: d94bffd
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sat Jun 11 11:42:12 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Aug 24 12:46:24 2016 -0700

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     |  368 +++++--
 .../wrappers/streaming/DoFnOperator.java        |  268 +++++
 .../streaming/FlinkAbstractParDoWrapper.java    |  282 -----
 .../FlinkGroupAlsoByWindowWrapper.java          |  644 -----------
 .../streaming/FlinkGroupByKeyWrapper.java       |   73 --
 .../streaming/FlinkParDoBoundMultiWrapper.java  |   79 --
 .../streaming/FlinkParDoBoundWrapper.java       |  104 --
 .../wrappers/streaming/FlinkStateInternals.java | 1038 ++++++++++++++++++
 .../streaming/SingletonKeyedWorkItem.java       |   54 +
 .../streaming/SingletonKeyedWorkItemCoder.java  |  125 +++
 .../wrappers/streaming/WindowDoFnOperator.java  |  326 ++++++
 .../wrappers/streaming/WorkItemKeySelector.java |   58 +
 .../state/AbstractFlinkTimerInternals.java      |  127 ---
 .../streaming/state/FlinkStateInternals.java    |  733 -------------
 .../streaming/state/StateCheckpointReader.java  |   93 --
 .../streaming/state/StateCheckpointUtils.java   |  155 ---
 .../streaming/state/StateCheckpointWriter.java  |  131 ---
 .../wrappers/streaming/state/StateType.java     |   73 --
 .../beam/runners/flink/PipelineOptionsTest.java |  103 +-
 .../streaming/FlinkStateInternalsTest.java      |  391 +++++++
 .../flink/streaming/GroupAlsoByWindowTest.java  |  523 ---------
 .../flink/streaming/StateSerializationTest.java |  338 ------
 22 files changed, 2572 insertions(+), 3514 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1de76b7a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 5b55d42..fff629c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -18,13 +18,15 @@
 
 package org.apache.beam.runners.flink.translation;
 
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.FlinkCoder;
 import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupAlsoByWindowWrapper;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupByKeyWrapper;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkParDoBoundMultiWrapper;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkParDoBoundWrapper;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.FlinkStreamingCreateFunction;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
@@ -50,10 +52,14 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.SystemReduceFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -61,8 +67,8 @@ import com.google.api.client.util.Maps;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -70,6 +76,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -77,7 +85,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -294,56 +304,66 @@ public class FlinkStreamingTransformTranslators {
 
     @Override
     public void translateNode(ParDo.Bound<IN, OUT> transform, FlinkStreamingTranslationContext context) {
-      PCollection<OUT> output = context.getOutput(transform);
 
-      final WindowingStrategy<OUT, ? extends BoundedWindow> windowingStrategy =
-          (WindowingStrategy<OUT, ? extends BoundedWindow>)
-              context.getOutput(transform).getWindowingStrategy();
+      WindowingStrategy<?, ?> windowingStrategy = context.getOutput(transform).getWindowingStrategy();
+
+      TypeInformation<WindowedValue<OUT>> typeInfo = context.getTypeInfo(context.getOutput(transform));
 
-      WindowedValue.WindowedValueCoder<OUT> outputStreamCoder = WindowedValue.getFullCoder(output.getCoder(),
-          windowingStrategy.getWindowFn().windowCoder());
-      CoderTypeInformation<WindowedValue<OUT>> outputWindowedValueCoder =
-          new CoderTypeInformation<>(outputStreamCoder);
+      DoFnOperator<IN, OUT, WindowedValue<OUT>> doFnOperator = new DoFnOperator<>(
+          transform.getFn(),
+          new TupleTag<OUT>("main output"),
+          Collections.<TupleTag<?>>emptyList(),
+          new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OUT>>(),
+          windowingStrategy,
+          new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
+          context.getPipelineOptions());
 
-      FlinkParDoBoundWrapper<IN, OUT> doFnWrapper = new FlinkParDoBoundWrapper<>(
-          context.getPipelineOptions(), windowingStrategy, transform.getFn());
       DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform));
       SingleOutputStreamOperator<WindowedValue<OUT>> outDataStream = inputDataStream
-          .flatMap(doFnWrapper)
-          .name(transform.getName())
-          .returns(outputWindowedValueCoder);
+          .transform(transform.getName(), typeInfo, doFnOperator);
 
       context.setOutputDataStream(context.getOutput(transform), outDataStream);
     }
   }
 
-  public static class WindowBoundTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> {
+  public static class WindowBoundTranslator<T>
+      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> {
 
     @Override
-    public void translateNode(Window.Bound<T> transform, FlinkStreamingTranslationContext context) {
-      PValue input = context.getInput(transform);
-      DataStream<WindowedValue<T>> inputDataStream = context.getInputDataStream(input);
-
-      final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy =
-          (WindowingStrategy<T, ? extends BoundedWindow>)
-              context.getOutput(transform).getWindowingStrategy();
-
-      final WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+    public void translateNode(
+        Window.Bound<T> transform,
+        FlinkStreamingTranslationContext context) {
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<T, BoundedWindow> windowingStrategy =
+          (WindowingStrategy<T, BoundedWindow>) context.getOutput(transform).getWindowingStrategy();
+
+      TypeInformation<WindowedValue<T>> typeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      OldDoFn<T, T> windowAssignerDoFn =
+          createWindowAssigner(windowingStrategy.getWindowFn());
+
+      DoFnOperator<T, T, WindowedValue<T>> doFnOperator = new DoFnOperator<>(
+          windowAssignerDoFn,
+          new TupleTag<T>("main output"),
+          Collections.<TupleTag<?>>emptyList(),
+          new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<T>>(),
+          windowingStrategy,
+          new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
+          context.getPipelineOptions());
+
+      DataStream<WindowedValue<T>> inputDataStream =
+          context.getInputDataStream(context.getInput(transform));
+      SingleOutputStreamOperator<WindowedValue<T>> outDataStream = inputDataStream
+          .transform(transform.getName(), typeInfo, doFnOperator);
 
-      WindowedValue.WindowedValueCoder<T> outputStreamCoder = WindowedValue.getFullCoder(
-          context.getInput(transform).getCoder(), windowingStrategy.getWindowFn().windowCoder());
-      CoderTypeInformation<WindowedValue<T>> outputWindowedValueCoder =
-          new CoderTypeInformation<>(outputStreamCoder);
-
-      final FlinkParDoBoundWrapper<T, T> windowDoFnAssigner = new FlinkParDoBoundWrapper<>(
-          context.getPipelineOptions(), windowingStrategy, createWindowAssigner(windowFn));
-
-      SingleOutputStreamOperator<WindowedValue<T>> windowedStream =
-          inputDataStream.flatMap(windowDoFnAssigner).returns(outputWindowedValueCoder);
-      context.setOutputDataStream(context.getOutput(transform), windowedStream);
+      context.setOutputDataStream(context.getOutput(transform), outDataStream);
     }
 
-    private static <T, W extends BoundedWindow> OldDoFn<T, T> createWindowAssigner(final WindowFn<T, W> windowFn) {
+    private static <T, W extends BoundedWindow> OldDoFn<T, T> createWindowAssigner(
+        final WindowFn<T, W> windowFn) {
+
       return new OldDoFn<T, T>() {
 
         @Override
@@ -373,45 +393,176 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
-  public static class GroupByKeyTranslator<K, V> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, V>> {
+  public static class GroupByKeyTranslator<K, InputT>
+      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> {
 
     @Override
-    public void translateNode(GroupByKey<K, V> transform, FlinkStreamingTranslationContext context) {
-      PValue input = context.getInput(transform);
+    public void translateNode(GroupByKey<K, InputT> transform, FlinkStreamingTranslationContext context) {
+
+      PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<?, BoundedWindow> windowingStrategy =
+          (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
+
+      KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
+
+      SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of(
+          inputKvCoder.getKeyCoder(),
+          inputKvCoder.getValueCoder(),
+          input.getWindowingStrategy().getWindowFn().windowCoder());
+
+      DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
+
+
+      WindowedValue.ValueOnlyWindowedValueCoder<
+          SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
+          WindowedValue.getValueOnlyCoder(workItemCoder);
+
+      CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
+          new CoderTypeInformation<>(windowedWorkItemCoder);
+
+      DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
+          inputDataStream
+              .flatMap(new CombinePerKeyTranslator.ToKeyedWorkItem<K, InputT>())
+              .returns(workItemTypeInfo).name("ToKeyedWorkItem");
+
+      KeyedStream<
+          WindowedValue<
+              SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream
+          .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder()));
+
+      SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, BoundedWindow> reduceFn =
+          SystemReduceFn.buffering(inputKvCoder.getValueCoder());
+
+      TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      DoFnOperator.DefaultOutputManagerFactory<
+            WindowedValue<KV<K, Iterable<InputT>>>> outputManagerFactory =
+          new DoFnOperator.DefaultOutputManagerFactory<>();
+
+      WindowDoFnOperator<
+            K,
+            InputT,
+            KV<K, Iterable<InputT>>> doFnOperator =
+          new WindowDoFnOperator<>(
+              reduceFn,
+              new TupleTag<KV<K, Iterable<InputT>>>("main output"),
+              Collections.<TupleTag<?>>emptyList(),
+              outputManagerFactory,
+              windowingStrategy,
+              new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
+              context.getPipelineOptions(),
+              inputKvCoder.getKeyCoder());
+
+      // our operator excepts WindowedValue<KeyedWorkItem> while our input stream
+      // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
+      @SuppressWarnings("unchecked")
+      SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> outDataStream =
+          keyedWorkItemStream
+              .transform(
+                  transform.getName(),
+                  outputTypeInfo,
+                  (OneInputStreamOperator) doFnOperator);
 
-      DataStream<WindowedValue<KV<K, V>>> inputDataStream = context.getInputDataStream(input);
-      KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) context.getInput(transform).getCoder();
-
-      KeyedStream<WindowedValue<KV<K, V>>, K> groupByKStream = FlinkGroupByKeyWrapper
-          .groupStreamByKey(inputDataStream, inputKvCoder);
-
-      DataStream<WindowedValue<KV<K, Iterable<V>>>> groupedByKNWstream =
-          FlinkGroupAlsoByWindowWrapper.createForIterable(context.getPipelineOptions(),
-              context.getInput(transform), groupByKStream);
+      context.setOutputDataStream(context.getOutput(transform), outDataStream);
 
-      context.setOutputDataStream(context.getOutput(transform), groupedByKNWstream);
     }
   }
 
-  public static class CombinePerKeyTranslator<K, VIN, VACC, VOUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Combine.PerKey<K, VIN, VOUT>> {
+  public static class CombinePerKeyTranslator<K, InputT, OutputT>
+      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+        Combine.PerKey<K, InputT, OutputT>> {
 
     @Override
-    public void translateNode(Combine.PerKey<K, VIN, VOUT> transform, FlinkStreamingTranslationContext context) {
-      PValue input = context.getInput(transform);
+    public void translateNode(
+        Combine.PerKey<K, InputT, OutputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<?, BoundedWindow> windowingStrategy =
+          (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
+
+      KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
+
+      SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of(
+          inputKvCoder.getKeyCoder(),
+          inputKvCoder.getValueCoder(),
+          input.getWindowingStrategy().getWindowFn().windowCoder());
+
+      DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
+
+
+      WindowedValue.ValueOnlyWindowedValueCoder<
+            SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
+          WindowedValue.getValueOnlyCoder(workItemCoder);
 
-      DataStream<WindowedValue<KV<K, VIN>>> inputDataStream = context.getInputDataStream(input);
-      KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) context.getInput(transform).getCoder();
-      KvCoder<K, VOUT> outputKvCoder = (KvCoder<K, VOUT>) context.getOutput(transform).getCoder();
+      CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
+          new CoderTypeInformation<>(windowedWorkItemCoder);
 
-      KeyedStream<WindowedValue<KV<K, VIN>>, K> groupByKStream = FlinkGroupByKeyWrapper
-          .groupStreamByKey(inputDataStream, inputKvCoder);
+      DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
+          inputDataStream
+              .flatMap(new ToKeyedWorkItem<K, InputT>())
+              .returns(workItemTypeInfo).name("ToKeyedWorkItem");
 
-      Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn = (Combine.KeyedCombineFn<K, VIN, VACC, VOUT>) transform.getFn();
-      DataStream<WindowedValue<KV<K, VOUT>>> groupedByKNWstream =
-          FlinkGroupAlsoByWindowWrapper.create(context.getPipelineOptions(),
-              context.getInput(transform), groupByKStream, combineFn, outputKvCoder);
+      KeyedStream<
+            WindowedValue<
+                SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream
+          .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder()));
 
-      context.setOutputDataStream(context.getOutput(transform), groupedByKNWstream);
+      SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn = SystemReduceFn.combining(
+          inputKvCoder.getKeyCoder(),
+          AppliedCombineFn.withInputCoder(
+              transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder));
+
+
+      OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> windowDoFn =
+          GroupAlsoByWindowViaWindowSetDoFn.create(windowingStrategy, reduceFn);
+
+
+      TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      WindowDoFnOperator<K, InputT, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> doFnOperator =
+          new WindowDoFnOperator<>(
+              windowDoFn,
+              new TupleTag<KV<K, OutputT>>("main output"),
+              Collections.<TupleTag<?>>emptyList(),
+              new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
+              windowingStrategy,
+              new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
+              context.getPipelineOptions(),
+              inputKvCoder.getKeyCoder());
+
+      // our operator excepts WindowedValue<KeyedWorkItem> while our input stream
+      // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
+      @SuppressWarnings("unchecked")
+      SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream = keyedWorkItemStream
+          .transform(transform.getName(), outputTypeInfo, (OneInputStreamOperator) doFnOperator);
+
+      context.setOutputDataStream(context.getOutput(transform), outDataStream);
+    }
+
+    private static class ToKeyedWorkItem<K, InputT>
+        extends RichFlatMapFunction<
+          WindowedValue<KV<K, InputT>>,
+          WindowedValue<SingletonKeyedWorkItem<K, InputT>>> {
+
+      @Override
+      public void flatMap(
+          WindowedValue<KV<K, InputT>> in,
+          Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) throws Exception {
+
+        SingletonKeyedWorkItem<K, InputT> workItem =
+            new SingletonKeyedWorkItem<>(
+                in.getValue().getKey(),
+                in.withValue(in.getValue().getValue()));
+
+        out.collect(WindowedValue.valueInEmptyWindows(workItem));
+      }
     }
   }
 
@@ -429,68 +580,65 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
-  public static class ParDoBoundMultiStreamingTranslator<IN, OUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.BoundMulti<IN, OUT>> {
-
-    private final int MAIN_TAG_INDEX = 0;
+  public static class ParDoBoundMultiStreamingTranslator<InputT, OutputT>
+      implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+        ParDo.BoundMulti<InputT, OutputT>> {
 
     @Override
-    public void translateNode(ParDo.BoundMulti<IN, OUT> transform, FlinkStreamingTranslationContext context) {
+    public void translateNode(
+        ParDo.BoundMulti<InputT, OutputT> transform,
+        FlinkStreamingTranslationContext context) {
 
       // we assume that the transformation does not change the windowing strategy.
-      WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy = context.getInput(transform).getWindowingStrategy();
+      WindowingStrategy<?, ?> windowingStrategy =
+          context.getInput(transform).getWindowingStrategy();
 
       Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
-      Map<TupleTag<?>, Integer> tagsToLabels = transformTupleTagsToLabels(
-          transform.getMainOutputTag(), outputs.keySet());
 
-      UnionCoder intermUnionCoder = getIntermUnionCoder(outputs.values());
-      WindowedValue.WindowedValueCoder<RawUnionValue> outputStreamCoder = WindowedValue.getFullCoder(
-          intermUnionCoder, windowingStrategy.getWindowFn().windowCoder());
+      Map<TupleTag<?>, Integer> tagsToLabels =
+          transformTupleTagsToLabels(transform.getMainOutputTag(), outputs.keySet());
 
-      CoderTypeInformation<WindowedValue<RawUnionValue>> intermWindowedValueCoder =
-          new CoderTypeInformation<>(outputStreamCoder);
+      DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = new DoFnOperator<>(
+          transform.getFn(),
+          transform.getMainOutputTag(),
+          transform.getSideOutputTags().getAll(),
+          new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+          windowingStrategy,
+          new HashMap<PCollectionView<?>, WindowingStrategy<?, ?>>(),
+          context.getPipelineOptions());
 
-      FlinkParDoBoundMultiWrapper<IN, OUT> doFnWrapper = new FlinkParDoBoundMultiWrapper<>(
-          context.getPipelineOptions(), windowingStrategy, transform.getFn(),
-          transform.getMainOutputTag(), tagsToLabels);
+      UnionCoder unionCoder = createUnionCoder(outputs.values());
 
-      DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform));
-      SingleOutputStreamOperator<WindowedValue<RawUnionValue>> intermDataStream =
-          inputDataStream.flatMap(doFnWrapper).returns(intermWindowedValueCoder);
+      CoderTypeInformation<RawUnionValue> unionTypeInformation =
+          new CoderTypeInformation<>(unionCoder);
 
-      for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
-        final int outputTag = tagsToLabels.get(output.getKey());
+      DataStream<WindowedValue<InputT>> inputDataStream =
+          context.getInputDataStream(context.getInput(transform));
 
-        WindowedValue.WindowedValueCoder<?> coderForTag = WindowedValue.getFullCoder(
-            output.getValue().getCoder(),
-            windowingStrategy.getWindowFn().windowCoder());
+      SingleOutputStreamOperator<RawUnionValue> unionStream = inputDataStream
+          .transform(transform.getName(), unionTypeInformation, doFnOperator);
 
-        CoderTypeInformation<WindowedValue<?>> windowedValueCoder =
-            new CoderTypeInformation(coderForTag);
+      for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
+        final int outputTag = tagsToLabels.get(output.getKey());
 
-        context.setOutputDataStream(output.getValue(),
-            intermDataStream.filter(new FilterFunction<WindowedValue<RawUnionValue>>() {
-              @Override
-              public boolean filter(WindowedValue<RawUnionValue> value) throws Exception {
-                return value.getValue().getUnionTag() == outputTag;
-              }
-            }).flatMap(new FlatMapFunction<WindowedValue<RawUnionValue>, WindowedValue<?>>() {
-              @Override
-              public void flatMap(WindowedValue<RawUnionValue> value, Collector<WindowedValue<?>> collector) throws Exception {
-                collector.collect(WindowedValue.of(
-                    value.getValue().getValue(),
-                    value.getTimestamp(),
-                    value.getWindows(),
-                    value.getPane()));
-              }
-            }).returns(windowedValueCoder));
+        TypeInformation outputTypeInfo =
+            context.getTypeInfo(output.getValue());
+
+        unionStream.flatMap(new FlatMapFunction<RawUnionValue, Object>() {
+          @Override
+          public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
+            if (value.getUnionTag() == outputTag) {
+              out.collect(value.getValue());
+            }
+          }
+        }).returns(outputTypeInfo);
       }
     }
 
     private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(TupleTag<?> mainTag, Set<TupleTag<?>> secondaryTags) {
       Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
-      tagToLabelMap.put(mainTag, MAIN_TAG_INDEX);
-      int count = MAIN_TAG_INDEX + 1;
+      int count = 0;
+      tagToLabelMap.put(mainTag, count++);
       for (TupleTag<?> tag : secondaryTags) {
         if (!tagToLabelMap.containsKey(tag)) {
           tagToLabelMap.put(tag, count++);
@@ -499,7 +647,7 @@ public class FlinkStreamingTransformTranslators {
       return tagToLabelMap;
     }
 
-    private UnionCoder getIntermUnionCoder(Collection<PCollection<?>> taggedCollections) {
+    private UnionCoder createUnionCoder(Collection<PCollection<?>> taggedCollections) {
       List<Coder<?>> outputCoders = Lists.newArrayList();
       for (PCollection<?> coll : taggedCollections) {
         outputCoders.add(coll.getCoder());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1de76b7a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
new file mode 100644
index 0000000..e273132
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.DoFnRunner;
+import org.apache.beam.sdk.util.DoFnRunners;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Flink operator for executing {@link DoFn DoFns}.
+ *
+ * @param <InputT>
+ * @param <FnOutputT>
+ * @param <OutputT>
+ */
+public class DoFnOperator<InputT, FnOutputT, OutputT>
+    extends AbstractStreamOperator<OutputT>
+    implements OneInputStreamOperator<WindowedValue<InputT>, OutputT> {
+
+  protected OldDoFn<InputT, FnOutputT> doFn;
+  protected final SerializedPipelineOptions serializedOptions;
+
+  protected final TupleTag<FnOutputT> mainOutputTag;
+  protected final List<TupleTag<?>> sideOutputTags;
+
+  protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+
+  protected final boolean hasSideInputs;
+
+  protected final WindowingStrategy<?, ?> windowingStrategy;
+
+  protected final OutputManagerFactory<OutputT> outputManagerFactory;
+
+  protected transient DoFnRunner<InputT, FnOutputT> doFnRunner;
+
+  /**
+   * To keep track of the current watermark so that we can immediately fire if a trigger
+   * registers an event time callback for a timestamp that lies in the past.
+   */
+  protected transient long currentWatermark = Long.MIN_VALUE;
+
+  public DoFnOperator(
+      OldDoFn<InputT, FnOutputT> doFn,
+      TupleTag<FnOutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      OutputManagerFactory<OutputT> outputManagerFactory,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+      PipelineOptions options) {
+    this.doFn = doFn;
+    this.mainOutputTag = mainOutputTag;
+    this.sideOutputTags = sideOutputTags;
+    this.sideInputs = sideInputs;
+    this.serializedOptions = new SerializedPipelineOptions(options);
+    this.windowingStrategy = windowingStrategy;
+    this.outputManagerFactory = outputManagerFactory;
+
+    this.hasSideInputs = !sideInputs.isEmpty();
+
+    setChainingStrategy(ChainingStrategy.ALWAYS);
+  }
+
+  protected ExecutionContext.StepContext createStepContext() {
+    return new StepContext();
+  }
+
+  // allow overriding this in WindowDoFnOperator because this one dynamically creates
+  // the DoFn
+  protected OldDoFn<InputT, FnOutputT> getDoFn() {
+    return doFn;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+
+    this.doFn = getDoFn();
+
+    Aggregator.AggregatorFactory aggregatorFactory = new Aggregator.AggregatorFactory() {
+      @Override
+      public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+          Class<?> fnClass,
+          ExecutionContext.StepContext stepContext,
+          String aggregatorName,
+          Combine.CombineFn<InputT, AccumT, OutputT> combine) {
+        SerializableFnAggregatorWrapper<InputT, OutputT> result =
+            new SerializableFnAggregatorWrapper<>(combine);
+
+        getRuntimeContext().addAccumulator(aggregatorName, result);
+        return result;
+      }
+    };
+
+    doFnRunner = DoFnRunners.createDefault(
+        serializedOptions.getPipelineOptions(),
+        doFn,
+        null,
+        outputManagerFactory.create(output),
+        mainOutputTag,
+        sideOutputTags,
+        createStepContext(),
+        aggregatorFactory,
+        windowingStrategy);
+
+    doFnRunner.startBundle();
+    doFn.setup();
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    doFnRunner.finishBundle();
+    doFn.teardown();
+  }
+
+  @Override
+  public final void processElement(StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
+    doFnRunner.processElement(streamRecord.getValue());
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    output.emitWatermark(mark);
+  }
+
+  /**
+   * Factory for creating an {@link DoFnRunners.OutputManager} from
+   * a Flink {@link Output}.
+   */
+  interface OutputManagerFactory<OutputT> extends Serializable {
+    DoFnRunners.OutputManager create(Output<StreamRecord<OutputT>> output);
+  }
+
+  /**
+   * Default implementation of {@link OutputManagerFactory} that creates an
+   * {@link DoFnRunners.OutputManager} that only writes to
+   * a single logical output.
+   */
+  public static class DefaultOutputManagerFactory<OutputT>
+      implements OutputManagerFactory<OutputT> {
+    @Override
+    public DoFnRunners.OutputManager create(final Output<StreamRecord<OutputT>> output) {
+      return new DoFnRunners.OutputManager() {
+        @Override
+        public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
+          // with side outputs we can't get around this because we don't
+          // know our own output type...
+          @SuppressWarnings("unchecked")
+          OutputT castValue = (OutputT) value;
+          output.collect(new StreamRecord<>(castValue));
+        }
+      };
+    }
+  }
+
+  /**
+   * Implementation of {@link OutputManagerFactory} that creates an
+   * {@link DoFnRunners.OutputManager} that can write to multiple logical
+   * outputs by unioning them in a {@link RawUnionValue}.
+   */
+  public static class MultiOutputOutputManagerFactory
+      implements OutputManagerFactory<RawUnionValue> {
+
+    Map<TupleTag<?>, Integer> mapping;
+
+    public MultiOutputOutputManagerFactory(Map<TupleTag<?>, Integer> mapping) {
+      this.mapping = mapping;
+    }
+
+    @Override
+    public DoFnRunners.OutputManager create(final Output<StreamRecord<RawUnionValue>> output) {
+      return new DoFnRunners.OutputManager() {
+        @Override
+        public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
+          int intTag = mapping.get(tag);
+          output.collect(new StreamRecord<>(new RawUnionValue(intTag, value)));
+        }
+      };
+    }
+  }
+
+  /**
+   * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow
+   * accessing state or timer internals.
+   */
+  protected class StepContext implements ExecutionContext.StepContext {
+
+    @Override
+    public String getStepName() {
+      return null;
+    }
+
+    @Override
+    public String getTransformName() {
+      return null;
+    }
+
+    @Override
+    public void noteOutput(WindowedValue<?> output) {}
+
+    @Override
+    public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {}
+
+    @Override
+    public <T, W extends BoundedWindow> void writePCollectionViewData(
+        TupleTag<?> tag,
+        Iterable<WindowedValue<T>> data,
+        Coder<Iterable<WindowedValue<T>>> dataCoder,
+        W window,
+        Coder<W> windowCoder) throws IOException {
+      throw new UnsupportedOperationException("Writing side-input data is not supported.");
+    }
+
+    @Override
+    public StateInternals<?> stateInternals() {
+      throw new UnsupportedOperationException("Not supported for regular DoFns.");
+    }
+
+    @Override
+    public TimerInternals timerInternals() {
+      throw new UnsupportedOperationException("Not supported for regular DoFns.");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1de76b7a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
deleted file mode 100644
index a9dd865..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-import org.joda.time.format.PeriodFormat;
-
-import java.util.Collection;
-
-/**
- * An abstract class that encapsulates the common code of the the {@link org.apache.beam.sdk.transforms.ParDo.Bound}
- * and {@link org.apache.beam.sdk.transforms.ParDo.BoundMulti} wrappers. See the {@link FlinkParDoBoundWrapper} and
- * {@link FlinkParDoBoundMultiWrapper} for the actual wrappers of the aforementioned transformations.
- * */
-public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFlatMapFunction<WindowedValue<IN>, WindowedValue<OUTFL>> {
-
-  private final OldDoFn<IN, OUTDF> doFn;
-  private final WindowingStrategy<?, ?> windowingStrategy;
-  private final SerializedPipelineOptions serializedPipelineOptions;
-
-  private DoFnProcessContext context;
-
-  public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, OldDoFn<IN, OUTDF> doFn) {
-    checkNotNull(options);
-    checkNotNull(windowingStrategy);
-    checkNotNull(doFn);
-
-    this.doFn = doFn;
-    this.serializedPipelineOptions = new SerializedPipelineOptions(options);
-    this.windowingStrategy = windowingStrategy;
-  }
-
-  @Override
-  public void open(Configuration parameters) throws Exception {
-    doFn.setup();
-  }
-
-  @Override
-  public void close() throws Exception {
-    if (this.context != null) {
-      // we have initialized the context
-      this.doFn.finishBundle(this.context);
-    }
-    this.doFn.teardown();
-  }
-
-  @Override
-  public void flatMap(WindowedValue<IN> value, Collector<WindowedValue<OUTFL>> out) throws Exception {
-    if (this.context == null) {
-      this.context = new DoFnProcessContext(doFn, out);
-      this.doFn.startBundle(this.context);
-    }
-
-    // for each window the element belongs to, create a new copy here.
-    Collection<? extends BoundedWindow> windows = value.getWindows();
-    if (windows.size() <= 1) {
-      processElement(value);
-    } else {
-      for (BoundedWindow window : windows) {
-        processElement(WindowedValue.of(
-            value.getValue(), value.getTimestamp(), window, value.getPane()));
-      }
-    }
-  }
-
-  private void processElement(WindowedValue<IN> value) throws Exception {
-    this.context.setElement(value);
-    doFn.processElement(this.context);
-  }
-
-  private class DoFnProcessContext extends OldDoFn<IN, OUTDF>.ProcessContext {
-
-    private final OldDoFn<IN, OUTDF> fn;
-
-    protected final Collector<WindowedValue<OUTFL>> collector;
-
-    private WindowedValue<IN> element;
-
-    private DoFnProcessContext(OldDoFn<IN, OUTDF> function,
-          Collector<WindowedValue<OUTFL>> outCollector) {
-      function.super();
-      super.setupDelegateAggregators();
-
-      this.fn = function;
-      this.collector = outCollector;
-    }
-
-    public void setElement(WindowedValue<IN> value) {
-      this.element = value;
-    }
-
-    @Override
-    public IN element() {
-      return this.element.getValue();
-    }
-
-    @Override
-    public Instant timestamp() {
-      return this.element.getTimestamp();
-    }
-
-    @Override
-    public BoundedWindow window() {
-      if (!(fn instanceof OldDoFn.RequiresWindowAccess)) {
-        throw new UnsupportedOperationException(
-            "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess.");
-      }
-
-      Collection<? extends BoundedWindow> windows = this.element.getWindows();
-      if (windows.size() != 1) {
-        throw new IllegalArgumentException("Each element is expected to belong to 1 window. " +
-            "This belongs to " + windows.size() + ".");
-      }
-      return windows.iterator().next();
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return this.element.getPane();
-    }
-
-    @Override
-    public WindowingInternals<IN, OUTDF> windowingInternals() {
-      return windowingInternalsHelper(element, collector);
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return serializedPipelineOptions.getPipelineOptions();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      throw new RuntimeException("sideInput() is not supported in Streaming mode.");
-    }
-
-    @Override
-    public void output(OUTDF output) {
-      outputWithTimestamp(output, this.element.getTimestamp());
-    }
-
-    @Override
-    public void outputWithTimestamp(OUTDF output, Instant timestamp) {
-      outputWithTimestampHelper(element, output, timestamp, collector);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      sideOutputWithTimestamp(tag, output, this.element.getTimestamp());
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      sideOutputWithTimestampHelper(element, output, timestamp, collector, tag);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      Accumulator acc = getRuntimeContext().getAccumulator(name);
-      if (acc != null) {
-        AccumulatorHelper.compareAccumulatorTypes(name,
-            SerializableFnAggregatorWrapper.class, acc.getClass());
-        return (Aggregator<AggInputT, AggOutputT>) acc;
-      }
-
-      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator =
-          new SerializableFnAggregatorWrapper<>(combiner);
-      getRuntimeContext().addAccumulator(name, accumulator);
-      return accumulator;
-    }
-  }
-
-  protected void checkTimestamp(WindowedValue<IN> ref, Instant timestamp) {
-    if (timestamp.isBefore(ref.getTimestamp().minus(doFn.getAllowedTimestampSkew()))) {
-      throw new IllegalArgumentException(String.format(
-          "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
-              + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
-              + "OldDoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.",
-          timestamp, ref.getTimestamp(),
-          PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod())));
-    }
-  }
-
-  protected <T> WindowedValue<T> makeWindowedValue(
-      T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-    final Instant inputTimestamp = timestamp;
-    final WindowFn windowFn = windowingStrategy.getWindowFn();
-
-    if (timestamp == null) {
-      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-    }
-
-    if (windows == null) {
-      try {
-        windows = windowFn.assignWindows(windowFn.new AssignContext() {
-          @Override
-          public Object element() {
-            throw new UnsupportedOperationException(
-                "WindowFn attempted to access input element when none was available");
-          }
-
-          @Override
-          public Instant timestamp() {
-            if (inputTimestamp == null) {
-              throw new UnsupportedOperationException(
-                  "WindowFn attempted to access input timestamp when none was available");
-            }
-            return inputTimestamp;
-          }
-
-          @Override
-          public BoundedWindow window() {
-            throw new UnsupportedOperationException(
-                "WindowFn attempted to access input window when none was available");
-          }
-        });
-      } catch (Exception e) {
-        throw UserCodeException.wrap(e);
-      }
-    }
-
-    return WindowedValue.of(output, timestamp, windows, pane);
-  }
-
-  ///////////      ABSTRACT METHODS TO BE IMPLEMENTED BY SUBCLASSES      /////////////////
-
-  public abstract void outputWithTimestampHelper(
-      WindowedValue<IN> inElement,
-      OUTDF output,
-      Instant timestamp,
-      Collector<WindowedValue<OUTFL>> outCollector);
-
-  public abstract <T> void sideOutputWithTimestampHelper(
-      WindowedValue<IN> inElement,
-      T output,
-      Instant timestamp,
-      Collector<WindowedValue<OUTFL>> outCollector,
-      TupleTag<T> tag);
-
-  public abstract WindowingInternals<IN, OUTDF> windowingInternalsHelper(
-      WindowedValue<IN> inElement,
-      Collector<WindowedValue<OUTFL>> outCollector);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1de76b7a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
deleted file mode 100644
index 4fddb53..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ /dev/null
@@ -1,644 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.Serializable;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.AbstractFlinkTimerInternals;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointReader;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointUtils;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointWriter;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.SystemReduceFn;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * This class is the key class implementing all the windowing/triggering logic of Apache Beam.
- * To provide full compatibility and support for all the windowing/triggering combinations offered by
- * Beam, we opted for a strategy that uses the SDK's code for doing these operations. See the code in
- * ({@link org.apache.beam.runners.core.GroupAlsoByWindowsDoFn}.
- * <p/>
- * In a nutshell, when the execution arrives to this operator, we expect to have a stream <b>already
- * grouped by key</b>. Each of the elements that enter here, registers a timer
- * (see {@link TimerInternals#setTimer(TimerInternals.TimerData)} in the
- * {@link FlinkGroupAlsoByWindowWrapper#activeTimers}.
- * This is essentially a timestamp indicating when to trigger the computation over the window this
- * element belongs to.
- * <p/>
- * When a watermark arrives, all the registered timers are checked to see which ones are ready to
- * fire (see {@link FlinkGroupAlsoByWindowWrapper#processWatermark(Watermark)}). These are deregistered from
- * the {@link FlinkGroupAlsoByWindowWrapper#activeTimers}
- * list, and are fed into the {@link org.apache.beam.runners.core.GroupAlsoByWindowsDoFn}
- * for furhter processing.
- */
-public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
-    extends AbstractStreamOperator<WindowedValue<KV<K, VOUT>>>
-    implements OneInputStreamOperator<WindowedValue<KV<K, VIN>>, WindowedValue<KV<K, VOUT>>> {
-
-  private static final long serialVersionUID = 1L;
-
-  private SerializedPipelineOptions serializedOptions;
-
-  private transient CoderRegistry coderRegistry;
-
-  private OldDoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator;
-
-  private ProcessContext context;
-
-  private final WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy;
-
-  private final Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn;
-
-  private final KvCoder<K, VIN> inputKvCoder;
-
-  /**
-   * State is kept <b>per-key</b>. This data structure keeps this mapping between an active key, i.e. a
-   * key whose elements are currently waiting to be processed, and its associated state.
-   */
-  private Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>();
-
-  /**
-   * Timers waiting to be processed.
-   */
-  private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
-
-  private FlinkTimerInternals timerInternals = new FlinkTimerInternals();
-
-  /**
-   * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy.
-   * This method assumes that <b>elements are already grouped by key</b>.
-   * <p/>
-   * The difference with {@link #createForIterable(PipelineOptions, PCollection, KeyedStream)}
-   * is that this method assumes that a combiner function is provided
-   * (see {@link org.apache.beam.sdk.transforms.Combine.KeyedCombineFn}).
-   * A combiner helps at increasing the speed and, in most of the cases, reduce the per-window state.
-   *
-   * @param options            the general job configuration options.
-   * @param input              the input Dataflow {@link org.apache.beam.sdk.values.PCollection}.
-   * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key.
-   * @param combiner           the combiner to be used.
-   * @param outputKvCoder      the type of the output values.
-   */
-  public static <K, VIN, VACC, VOUT> DataStream<WindowedValue<KV<K, VOUT>>> create(
-      PipelineOptions options,
-      PCollection input,
-      KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey,
-      Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner,
-      KvCoder<K, VOUT> outputKvCoder) {
-    checkNotNull(options);
-
-    KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
-    FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper<>(options,
-        input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, combiner);
-
-    Coder<WindowedValue<KV<K, VOUT>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of(
-        outputKvCoder,
-        input.getWindowingStrategy().getWindowFn().windowCoder());
-
-    CoderTypeInformation<WindowedValue<KV<K, VOUT>>> outputTypeInfo =
-        new CoderTypeInformation<>(windowedOutputElemCoder);
-
-    DataStream<WindowedValue<KV<K, VOUT>>> groupedByKeyAndWindow = groupedStreamByKey
-        .transform("GroupByWindowWithCombiner",
-            new CoderTypeInformation<>(outputKvCoder),
-            windower)
-        .returns(outputTypeInfo);
-
-    return groupedByKeyAndWindow;
-  }
-
-  /**
-   * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy.
-   * This method assumes that <b>elements are already grouped by key</b>.
-   * <p/>
-   * The difference with {@link #create(PipelineOptions, PCollection, KeyedStream, Combine.KeyedCombineFn, KvCoder)}
-   * is that this method assumes no combiner function
-   * (see {@link org.apache.beam.sdk.transforms.Combine.KeyedCombineFn}).
-   *
-   * @param options            the general job configuration options.
-   * @param input              the input Dataflow {@link org.apache.beam.sdk.values.PCollection}.
-   * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key.
-   */
-  public static <K, VIN> DataStream<WindowedValue<KV<K, Iterable<VIN>>>> createForIterable(
-      PipelineOptions options,
-      PCollection input,
-      KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey) {
-    checkNotNull(options);
-
-    KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
-    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-    Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
-
-    FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper(options,
-        input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, null);
-
-    Coder<Iterable<VIN>> valueIterCoder = IterableCoder.of(inputValueCoder);
-    KvCoder<K, Iterable<VIN>> outputElemCoder = KvCoder.of(keyCoder, valueIterCoder);
-
-    Coder<WindowedValue<KV<K, Iterable<VIN>>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of(
-        outputElemCoder,
-        input.getWindowingStrategy().getWindowFn().windowCoder());
-
-    CoderTypeInformation<WindowedValue<KV<K, Iterable<VIN>>>> outputTypeInfo =
-        new CoderTypeInformation<>(windowedOutputElemCoder);
-
-    DataStream<WindowedValue<KV<K, Iterable<VIN>>>> groupedByKeyAndWindow = groupedStreamByKey
-        .transform("GroupByWindow",
-            new CoderTypeInformation<>(windowedOutputElemCoder),
-            windower)
-        .returns(outputTypeInfo);
-
-    return groupedByKeyAndWindow;
-  }
-
-  public static <K, VIN, VACC, VOUT> FlinkGroupAlsoByWindowWrapper
-  createForTesting(PipelineOptions options,
-                   CoderRegistry registry,
-                   WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
-                   KvCoder<K, VIN> inputCoder,
-                   Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
-    checkNotNull(options);
-
-    return new FlinkGroupAlsoByWindowWrapper(options, registry, windowingStrategy, inputCoder, combiner);
-  }
-
-  private FlinkGroupAlsoByWindowWrapper(PipelineOptions options,
-                                        CoderRegistry registry,
-                                        WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
-                                        KvCoder<K, VIN> inputCoder,
-                                        Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
-    checkNotNull(options);
-
-    this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options));
-    this.coderRegistry = checkNotNull(registry);
-    this.inputKvCoder = checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder();
-    this.windowingStrategy = checkNotNull(windowingStrategy);//input.getWindowingStrategy();
-    this.combineFn = combiner;
-    this.operator = createGroupAlsoByWindowOperator();
-    this.chainingStrategy = ChainingStrategy.ALWAYS;
-  }
-
-  @Override
-  public void open() throws Exception {
-    super.open();
-    operator.setup();
-    this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals);
-    operator.startBundle(context);
-  }
-
-  /**
-   * Create the adequate {@link org.apache.beam.runners.core.GroupAlsoByWindowsDoFn},
-   * <b> if not already created</b>.
-   * If a {@link org.apache.beam.sdk.transforms.Combine.KeyedCombineFn} was provided, then
-   * a function with that combiner is created, so that elements are combined as they arrive. This is
-   * done for speed and (in most of the cases) for reduction of the per-window state.
-   */
-  private <W extends BoundedWindow> OldDoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() {
-    if (this.operator == null) {
-
-      StateInternalsFactory<K> stateInternalsFactory = new GroupAlsoByWindowWrapperStateInternalsFactory();
-
-      if (this.combineFn == null) {
-        // Thus VOUT == Iterable<VIN>
-        Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
-
-        this.operator = (OldDoFn) GroupAlsoByWindowViaWindowSetDoFn.create(
-            (WindowingStrategy<?, W>) this.windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder));
-      } else {
-        Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
-
-        AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = AppliedCombineFn
-            .withInputCoder(combineFn, coderRegistry, inputKvCoder);
-
-        this.operator = GroupAlsoByWindowViaWindowSetDoFn.create(
-            (WindowingStrategy<?, W>) this.windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn));
-      }
-    }
-    return this.operator;
-  }
-
-  private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception {
-    context.setElement(workItem);
-    operator.processElement(context);
-  }
-
-  @Override
-  public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element) throws Exception {
-    final WindowedValue<KV<K, VIN>> windowedValue = element.getValue();
-    final KV<K, VIN> kv = windowedValue.getValue();
-
-    final WindowedValue<VIN> updatedWindowedValue = WindowedValue.of(kv.getValue(),
-        windowedValue.getTimestamp(),
-        windowedValue.getWindows(),
-        windowedValue.getPane());
-
-    processKeyedWorkItem(
-        KeyedWorkItems.elementsWorkItem(
-            kv.getKey(),
-            Collections.singletonList(updatedWindowedValue)));
-  }
-
-  @Override
-  public void processWatermark(Watermark mark) throws Exception {
-    context.setCurrentInputWatermark(new Instant(mark.getTimestamp()));
-
-    Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp());
-    if (!timers.isEmpty()) {
-      for (K key : timers.keySet()) {
-        processKeyedWorkItem(KeyedWorkItems.<K, VIN>timersWorkItem(key, timers.get(key)));
-      }
-    }
-
-    /**
-     * This is to take into account the different semantics of the Watermark in Flink and
-     * in Dataflow. To understand the reasoning behind the Dataflow semantics and its
-     * watermark holding logic, see the documentation of
-     * {@link WatermarkHold#addHold(ReduceFn.ProcessValueContext, boolean)}
-     * */
-    long millis = Long.MAX_VALUE;
-    for (FlinkStateInternals state : perKeyStateInternals.values()) {
-      Instant watermarkHold = state.getWatermarkHold();
-      if (watermarkHold != null && watermarkHold.getMillis() < millis) {
-        millis = watermarkHold.getMillis();
-      }
-    }
-
-    if (mark.getTimestamp() < millis) {
-      millis = mark.getTimestamp();
-    }
-
-    context.setCurrentOutputWatermark(new Instant(millis));
-
-    // Don't forget to re-emit the watermark for further operators down the line.
-    // This is critical for jobs with multiple aggregation steps.
-    // Imagine a job with a groupByKey() on key K1, followed by a map() that changes
-    // the key K1 to K2, and another groupByKey() on K2. In this case, if the watermark
-    // is not re-emitted, the second aggregation would never be triggered, and no result
-    // will be produced.
-    output.emitWatermark(new Watermark(millis));
-  }
-
-  @Override
-  public void close() throws Exception {
-    operator.finishBundle(context);
-    operator.teardown();
-    super.close();
-  }
-
-  private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
-    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
-    if (timersForKey == null) {
-      timersForKey = new HashSet<>();
-    }
-    timersForKey.add(timer);
-    activeTimers.put(key, timersForKey);
-  }
-
-  private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
-    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
-    if (timersForKey != null) {
-      timersForKey.remove(timer);
-      if (timersForKey.isEmpty()) {
-        activeTimers.remove(key);
-      } else {
-        activeTimers.put(key, timersForKey);
-      }
-    }
-  }
-
-  /**
-   * Returns the list of timers that are ready to fire. These are the timers
-   * that are registered to be triggered at a time before the current watermark.
-   * We keep these timers in a Set, so that they are deduplicated, as the same
-   * timer can be registered multiple times.
-   */
-  private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) {
-
-    // we keep the timers to return in a different list and launch them later
-    // because we cannot prevent a trigger from registering another trigger,
-    // which would lead to concurrent modification exception.
-    Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create();
-
-    Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator();
-    while (it.hasNext()) {
-      Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
-
-      Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
-      while (timerIt.hasNext()) {
-        TimerInternals.TimerData timerData = timerIt.next();
-        if (timerData.getTimestamp().isBefore(currentWatermark)) {
-          toFire.put(keyWithTimers.getKey(), timerData);
-          timerIt.remove();
-        }
-      }
-
-      if (keyWithTimers.getValue().isEmpty()) {
-        it.remove();
-      }
-    }
-    return toFire;
-  }
-
-  /**
-   * Gets the state associated with the specified key.
-   *
-   * @param key the key whose state we want.
-   * @return The {@link FlinkStateInternals}
-   * associated with that key.
-   */
-  private FlinkStateInternals<K> getStateInternalsForKey(K key) {
-    FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key);
-    if (stateInternals == null) {
-      Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
-      OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getOutputTimeFn();
-      stateInternals = new FlinkStateInternals<>(key, inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn);
-      perKeyStateInternals.put(key, stateInternals);
-    }
-    return stateInternals;
-  }
-
-  private class FlinkTimerInternals extends AbstractFlinkTimerInternals<K, VIN> {
-    @Override
-    public void setTimer(TimerData timerKey) {
-      registerActiveTimer(context.element().key(), timerKey);
-    }
-
-    @Override
-    public void deleteTimer(TimerData timerKey) {
-      unregisterActiveTimer(context.element().key(), timerKey);
-    }
-  }
-
-  private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, VIN, VOUT, ?, KeyedWorkItem<K, VIN>>.ProcessContext {
-
-    private final FlinkTimerInternals timerInternals;
-
-    private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector;
-
-    private KeyedWorkItem<K, VIN> element;
-
-    public ProcessContext(OldDoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function,
-                          TimestampedCollector<WindowedValue<KV<K, VOUT>>> outCollector,
-                          FlinkTimerInternals timerInternals) {
-      function.super();
-      super.setupDelegateAggregators();
-
-      this.collector = checkNotNull(outCollector);
-      this.timerInternals = checkNotNull(timerInternals);
-    }
-
-    public void setElement(KeyedWorkItem<K, VIN> element) {
-      this.element = element;
-    }
-
-    public void setCurrentInputWatermark(Instant watermark) {
-      this.timerInternals.setCurrentInputWatermark(watermark);
-    }
-
-    public void setCurrentOutputWatermark(Instant watermark) {
-      this.timerInternals.setCurrentOutputWatermark(watermark);
-    }
-
-    @Override
-    public KeyedWorkItem<K, VIN> element() {
-      return this.element;
-    }
-
-    @Override
-    public Instant timestamp() {
-      throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return serializedOptions.getPipelineOptions();
-    }
-
-    @Override
-    public void output(KV<K, VOUT> output) {
-      throw new UnsupportedOperationException(
-          "output() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public void outputWithTimestamp(KV<K, VOUT> output, Instant timestamp) {
-      throw new UnsupportedOperationException(
-          "outputWithTimestamp() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public PaneInfo pane() {
-      throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public BoundedWindow window() {
-      throw new UnsupportedOperationException(
-          "window() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>> windowingInternals() {
-      return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() {
-
-        @Override
-        public StateInternals stateInternals() {
-          throw new UnsupportedOperationException("stateInternals() is not available");
-        }
-
-        @Override
-        public void outputWindowedValue(KV<K, VOUT> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-          // TODO: No need to represent timestamp twice.
-          collector.setAbsoluteTimestamp(timestamp.getMillis());
-          collector.collect(WindowedValue.of(output, timestamp, windows, pane));
-
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return timerInternals;
-        }
-
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          throw new UnsupportedOperationException("windows() is not available in Streaming mode.");
-        }
-
-        @Override
-        public PaneInfo pane() {
-          throw new UnsupportedOperationException("pane() is not available in Streaming mode.");
-        }
-
-        @Override
-        public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
-          throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-          throw new RuntimeException("sideInput() is not available in Streaming mode.");
-        }
-      };
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      throw new RuntimeException("sideInput() is not supported in Streaming mode.");
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      // ignore the side output, this can happen when a user does not register
-      // side outputs but then outputs using a freshly created TupleTag.
-      throw new RuntimeException("sideOutput() is not available when grouping by window.");
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      sideOutput(tag, output);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      Accumulator acc = getRuntimeContext().getAccumulator(name);
-      if (acc != null) {
-        AccumulatorHelper.compareAccumulatorTypes(name,
-            SerializableFnAggregatorWrapper.class, acc.getClass());
-        return (Aggregator<AggInputT, AggOutputT>) acc;
-      }
-
-      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator =
-          new SerializableFnAggregatorWrapper<>(combiner);
-      getRuntimeContext().addAccumulator(name, accumulator);
-      return accumulator;
-    }
-  }
-
-  //////////////        Checkpointing implementation        ////////////////
-
-  @Override
-  public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
-    StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-    AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
-    StateCheckpointWriter writer = StateCheckpointWriter.create(out);
-    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-
-    // checkpoint the timers
-    StateCheckpointUtils.encodeTimers(activeTimers, writer, keyCoder);
-
-    // checkpoint the state
-    StateCheckpointUtils.encodeState(perKeyStateInternals, writer, keyCoder);
-
-    // checkpoint the timerInternals
-    context.timerInternals.encodeTimerInternals(context, writer,
-        inputKvCoder, windowingStrategy.getWindowFn().windowCoder());
-
-    taskState.setOperatorState(out.closeAndGetHandle());
-    return taskState;
-  }
-
-  @Override
-  public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception {
-    super.restoreState(taskState, recoveryTimestamp);
-
-    final ClassLoader userClassloader = getUserCodeClassloader();
-
-    Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
-    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-
-    @SuppressWarnings("unchecked")
-    StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
-    DataInputView in = inputState.getState(userClassloader);
-    StateCheckpointReader reader = new StateCheckpointReader(in);
-
-    // restore the timers
-    this.activeTimers = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder);
-
-    // restore the state
-    this.perKeyStateInternals = StateCheckpointUtils.decodeState(
-        reader, windowingStrategy.getOutputTimeFn(), keyCoder, windowCoder, userClassloader);
-
-    // restore the timerInternals.
-    this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder);
-  }
-
-  private class GroupAlsoByWindowWrapperStateInternalsFactory implements
-      StateInternalsFactory<K>, Serializable {
-
-    @Override
-    public StateInternals<K> stateInternalsForKey(K key) {
-      return getStateInternalsForKey(key);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1de76b7a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
deleted file mode 100644
index 6b69d54..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-
-/**
- * This class groups the elements by key. It assumes that already the incoming stream
- * is composed of <code>[Key,Value]</code> pairs.
- * */
-public class FlinkGroupByKeyWrapper {
-
-  /**
-   * Just an auxiliary interface to bypass the fact that java anonymous classes cannot implement
-   * multiple interfaces.
-   */
-  private interface KeySelectorWithQueryableResultType<K, V> extends KeySelector<WindowedValue<KV<K, V>>, K>, ResultTypeQueryable<K> {
-  }
-
-  public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, KvCoder<K, V> inputKvCoder) {
-    final Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-    final TypeInformation<K> keyTypeInfo = new CoderTypeInformation<>(keyCoder);
-    final boolean isKeyVoid = keyCoder instanceof VoidCoder;
-
-    return inputDataStream.keyBy(
-        new KeySelectorWithQueryableResultType<K, V>() {
-
-          @Override
-          public K getKey(WindowedValue<KV<K, V>> value) throws Exception {
-            return isKeyVoid ? (K) VoidValue.INSTANCE :
-                value.getValue().getKey();
-          }
-
-          @Override
-          public TypeInformation<K> getProducedType() {
-            return keyTypeInfo;
-          }
-        });
-  }
-
-  // special type to return as key for null key
-  public static class VoidValue {
-    private VoidValue() {}
-
-    public static VoidValue INSTANCE = new VoidValue();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1de76b7a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
deleted file mode 100644
index 0ea0cab..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.TupleTag;
-
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-import java.util.Map;
-
-/**
- * A wrapper for the {@link org.apache.beam.sdk.transforms.ParDo.BoundMulti} Beam transformation.
- * */
-public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, RawUnionValue> {
-
-  private final TupleTag<?> mainTag;
-  private final Map<TupleTag<?>, Integer> outputLabels;
-
-  public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, OldDoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) {
-    super(options, windowingStrategy, doFn);
-    this.mainTag = checkNotNull(mainTag);
-    this.outputLabels = checkNotNull(tagsToLabels);
-  }
-
-  @Override
-  public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector) {
-    checkTimestamp(inElement, timestamp);
-    Integer index = outputLabels.get(mainTag);
-    collector.collect(makeWindowedValue(
-        new RawUnionValue(index, output),
-        timestamp,
-        inElement.getWindows(),
-        inElement.getPane()));
-  }
-
-  @Override
-  public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector, TupleTag<T> tag) {
-    checkTimestamp(inElement, timestamp);
-    Integer index = outputLabels.get(tag);
-    if (index != null) {
-      collector.collect(makeWindowedValue(
-          new RawUnionValue(index, output),
-          timestamp,
-          inElement.getWindows(),
-          inElement.getPane()));
-    }
-  }
-
-  @Override
-  public WindowingInternals<IN, OUT> windowingInternalsHelper(WindowedValue<IN> inElement, Collector<WindowedValue<RawUnionValue>> outCollector) {
-    throw new RuntimeException("FlinkParDoBoundMultiWrapper is just an internal operator serving as " +
-        "an intermediate transformation for the ParDo.BoundMulti translation. windowingInternals() " +
-        "is not available in this class.");
-  }
-}
\ No newline at end of file