You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:38 UTC

[43/50] [abbrv] incubator-beam git commit: [flink] convert tabs to 2 spaces

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index eabc307..7dae0b0 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -44,221 +44,221 @@ import java.util.Collection;
  * */
 public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFlatMapFunction<WindowedValue<IN>, WindowedValue<OUTFL>> {
 
-	private final DoFn<IN, OUTDF> doFn;
-	private final WindowingStrategy<?, ?> windowingStrategy;
-	private transient PipelineOptions options;
-
-	private DoFnProcessContext context;
-
-	public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUTDF> doFn) {
-		Preconditions.checkNotNull(options);
-		Preconditions.checkNotNull(windowingStrategy);
-		Preconditions.checkNotNull(doFn);
-
-		this.doFn = doFn;
-		this.options = options;
-		this.windowingStrategy = windowingStrategy;
-	}
-
-	private void initContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) {
-		if (this.context == null) {
-			this.context = new DoFnProcessContext(function, outCollector);
-		}
-	}
-
-	@Override
-	public void flatMap(WindowedValue<IN> value, Collector<WindowedValue<OUTFL>> out) throws Exception {
-		this.initContext(doFn, out);
-
-		// for each window the element belongs to, create a new copy here.
-		Collection<? extends BoundedWindow> windows = value.getWindows();
-		if (windows.size() <= 1) {
-			processElement(value);
-		} else {
-			for (BoundedWindow window : windows) {
-				processElement(WindowedValue.of(
-						value.getValue(), value.getTimestamp(), window, value.getPane()));
-			}
-		}
-	}
-
-	private void processElement(WindowedValue<IN> value) throws Exception {
-		this.context.setElement(value);
-		this.doFn.startBundle(context);
-		doFn.processElement(context);
-		this.doFn.finishBundle(context);
-	}
-
-	private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext {
-
-		private final DoFn<IN, OUTDF> fn;
-
-		protected final Collector<WindowedValue<OUTFL>> collector;
-
-		private WindowedValue<IN> element;
-
-		private DoFnProcessContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) {
-			function.super();
-			super.setupDelegateAggregators();
-
-			this.fn = function;
-			this.collector = outCollector;
-		}
-
-		public void setElement(WindowedValue<IN> value) {
-			this.element = value;
-		}
-
-		@Override
-		public IN element() {
-			return this.element.getValue();
-		}
-
-		@Override
-		public Instant timestamp() {
-			return this.element.getTimestamp();
-		}
-
-		@Override
-		public BoundedWindow window() {
-			if (!(fn instanceof DoFn.RequiresWindowAccess)) {
-				throw new UnsupportedOperationException(
-						"window() is only available in the context of a DoFn marked as RequiresWindow.");
-			}
-
-			Collection<? extends BoundedWindow> windows = this.element.getWindows();
-			if (windows.size() != 1) {
-				throw new IllegalArgumentException("Each element is expected to belong to 1 window. " +
-						"This belongs to " + windows.size() + ".");
-			}
-			return windows.iterator().next();
-		}
-
-		@Override
-		public PaneInfo pane() {
-			return this.element.getPane();
-		}
-
-		@Override
-		public WindowingInternals<IN, OUTDF> windowingInternals() {
-			return windowingInternalsHelper(element, collector);
-		}
-
-		@Override
-		public PipelineOptions getPipelineOptions() {
-			return options;
-		}
-
-		@Override
-		public <T> T sideInput(PCollectionView<T> view) {
-			throw new RuntimeException("sideInput() is not supported in Streaming mode.");
-		}
-
-		@Override
-		public void output(OUTDF output) {
-			outputWithTimestamp(output, this.element.getTimestamp());
-		}
-
-		@Override
-		public void outputWithTimestamp(OUTDF output, Instant timestamp) {
-			outputWithTimestampHelper(element, output, timestamp, collector);
-		}
-
-		@Override
-		public <T> void sideOutput(TupleTag<T> tag, T output) {
-			sideOutputWithTimestamp(tag, output, this.element.getTimestamp());
-		}
-
-		@Override
-		public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-			sideOutputWithTimestampHelper(element, output, timestamp, collector, tag);
-		}
-
-		@Override
-		protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-			Accumulator acc = getRuntimeContext().getAccumulator(name);
-			if (acc != null) {
-				AccumulatorHelper.compareAccumulatorTypes(name,
-						SerializableFnAggregatorWrapper.class, acc.getClass());
-				return (Aggregator<AggInputT, AggOutputT>) acc;
-			}
-
-			SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator =
-					new SerializableFnAggregatorWrapper<>(combiner);
-			getRuntimeContext().addAccumulator(name, accumulator);
-			return accumulator;
-		}
-	}
-
-	protected void checkTimestamp(WindowedValue<IN> ref, Instant timestamp) {
-		if (timestamp.isBefore(ref.getTimestamp().minus(doFn.getAllowedTimestampSkew()))) {
-			throw new IllegalArgumentException(String.format(
-					"Cannot output with timestamp %s. Output timestamps must be no earlier than the "
-							+ "timestamp of the current input (%s) minus the allowed skew (%s). See the "
-							+ "DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.",
-					timestamp, ref.getTimestamp(),
-					PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod())));
-		}
-	}
-
-	protected <T> WindowedValue<T> makeWindowedValue(
-			T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-		final Instant inputTimestamp = timestamp;
-		final WindowFn windowFn = windowingStrategy.getWindowFn();
-
-		if (timestamp == null) {
-			timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-		}
-
-		if (windows == null) {
-			try {
-				windows = windowFn.assignWindows(windowFn.new AssignContext() {
-					@Override
-					public Object element() {
-						throw new UnsupportedOperationException(
-								"WindowFn attempted to access input element when none was available");
-					}
-
-					@Override
-					public Instant timestamp() {
-						if (inputTimestamp == null) {
-							throw new UnsupportedOperationException(
-									"WindowFn attempted to access input timestamp when none was available");
-						}
-						return inputTimestamp;
-					}
-
-					@Override
-					public Collection<? extends BoundedWindow> windows() {
-						throw new UnsupportedOperationException(
-								"WindowFn attempted to access input windows when none were available");
-					}
-				});
-			} catch (Exception e) {
-				throw UserCodeException.wrap(e);
-			}
-		}
-
-		return WindowedValue.of(output, timestamp, windows, pane);
-	}
-
-	///////////			ABSTRACT METHODS TO BE IMPLEMENTED BY SUBCLASSES			/////////////////
-
-	public abstract void outputWithTimestampHelper(
-			WindowedValue<IN> inElement,
-			OUTDF output,
-			Instant timestamp,
-			Collector<WindowedValue<OUTFL>> outCollector);
-
-	public abstract <T> void sideOutputWithTimestampHelper(
-			WindowedValue<IN> inElement,
-			T output,
-			Instant timestamp,
-			Collector<WindowedValue<OUTFL>> outCollector,
-			TupleTag<T> tag);
-
-	public abstract WindowingInternals<IN, OUTDF> windowingInternalsHelper(
-			WindowedValue<IN> inElement,
-			Collector<WindowedValue<OUTFL>> outCollector);
+  private final DoFn<IN, OUTDF> doFn;
+  private final WindowingStrategy<?, ?> windowingStrategy;
+  private transient PipelineOptions options;
+
+  private DoFnProcessContext context;
+
+  public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUTDF> doFn) {
+    Preconditions.checkNotNull(options);
+    Preconditions.checkNotNull(windowingStrategy);
+    Preconditions.checkNotNull(doFn);
+
+    this.doFn = doFn;
+    this.options = options;
+    this.windowingStrategy = windowingStrategy;
+  }
+
+  private void initContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) {
+    if (this.context == null) {
+      this.context = new DoFnProcessContext(function, outCollector);
+    }
+  }
+
+  @Override
+  public void flatMap(WindowedValue<IN> value, Collector<WindowedValue<OUTFL>> out) throws Exception {
+    this.initContext(doFn, out);
+
+    // for each window the element belongs to, create a new copy here.
+    Collection<? extends BoundedWindow> windows = value.getWindows();
+    if (windows.size() <= 1) {
+      processElement(value);
+    } else {
+      for (BoundedWindow window : windows) {
+        processElement(WindowedValue.of(
+            value.getValue(), value.getTimestamp(), window, value.getPane()));
+      }
+    }
+  }
+
+  private void processElement(WindowedValue<IN> value) throws Exception {
+    this.context.setElement(value);
+    this.doFn.startBundle(context);
+    doFn.processElement(context);
+    this.doFn.finishBundle(context);
+  }
+
+  private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext {
+
+    private final DoFn<IN, OUTDF> fn;
+
+    protected final Collector<WindowedValue<OUTFL>> collector;
+
+    private WindowedValue<IN> element;
+
+    private DoFnProcessContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) {
+      function.super();
+      super.setupDelegateAggregators();
+
+      this.fn = function;
+      this.collector = outCollector;
+    }
+
+    public void setElement(WindowedValue<IN> value) {
+      this.element = value;
+    }
+
+    @Override
+    public IN element() {
+      return this.element.getValue();
+    }
+
+    @Override
+    public Instant timestamp() {
+      return this.element.getTimestamp();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      if (!(fn instanceof DoFn.RequiresWindowAccess)) {
+        throw new UnsupportedOperationException(
+            "window() is only available in the context of a DoFn marked as RequiresWindow.");
+      }
+
+      Collection<? extends BoundedWindow> windows = this.element.getWindows();
+      if (windows.size() != 1) {
+        throw new IllegalArgumentException("Each element is expected to belong to 1 window. " +
+            "This belongs to " + windows.size() + ".");
+      }
+      return windows.iterator().next();
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return this.element.getPane();
+    }
+
+    @Override
+    public WindowingInternals<IN, OUTDF> windowingInternals() {
+      return windowingInternalsHelper(element, collector);
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return options;
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      throw new RuntimeException("sideInput() is not supported in Streaming mode.");
+    }
+
+    @Override
+    public void output(OUTDF output) {
+      outputWithTimestamp(output, this.element.getTimestamp());
+    }
+
+    @Override
+    public void outputWithTimestamp(OUTDF output, Instant timestamp) {
+      outputWithTimestampHelper(element, output, timestamp, collector);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      sideOutputWithTimestamp(tag, output, this.element.getTimestamp());
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      sideOutputWithTimestampHelper(element, output, timestamp, collector, tag);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      Accumulator acc = getRuntimeContext().getAccumulator(name);
+      if (acc != null) {
+        AccumulatorHelper.compareAccumulatorTypes(name,
+            SerializableFnAggregatorWrapper.class, acc.getClass());
+        return (Aggregator<AggInputT, AggOutputT>) acc;
+      }
+
+      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator =
+          new SerializableFnAggregatorWrapper<>(combiner);
+      getRuntimeContext().addAccumulator(name, accumulator);
+      return accumulator;
+    }
+  }
+
+  protected void checkTimestamp(WindowedValue<IN> ref, Instant timestamp) {
+    if (timestamp.isBefore(ref.getTimestamp().minus(doFn.getAllowedTimestampSkew()))) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+              + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
+              + "DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.",
+          timestamp, ref.getTimestamp(),
+          PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod())));
+    }
+  }
+
+  protected <T> WindowedValue<T> makeWindowedValue(
+      T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+    final Instant inputTimestamp = timestamp;
+    final WindowFn windowFn = windowingStrategy.getWindowFn();
+
+    if (timestamp == null) {
+      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    if (windows == null) {
+      try {
+        windows = windowFn.assignWindows(windowFn.new AssignContext() {
+          @Override
+          public Object element() {
+            throw new UnsupportedOperationException(
+                "WindowFn attempted to access input element when none was available");
+          }
+
+          @Override
+          public Instant timestamp() {
+            if (inputTimestamp == null) {
+              throw new UnsupportedOperationException(
+                  "WindowFn attempted to access input timestamp when none was available");
+            }
+            return inputTimestamp;
+          }
+
+          @Override
+          public Collection<? extends BoundedWindow> windows() {
+            throw new UnsupportedOperationException(
+                "WindowFn attempted to access input windows when none were available");
+          }
+        });
+      } catch (Exception e) {
+        throw UserCodeException.wrap(e);
+      }
+    }
+
+    return WindowedValue.of(output, timestamp, windows, pane);
+  }
+
+  ///////////      ABSTRACT METHODS TO BE IMPLEMENTED BY SUBCLASSES      /////////////////
+
+  public abstract void outputWithTimestampHelper(
+      WindowedValue<IN> inElement,
+      OUTDF output,
+      Instant timestamp,
+      Collector<WindowedValue<OUTFL>> outCollector);
+
+  public abstract <T> void sideOutputWithTimestampHelper(
+      WindowedValue<IN> inElement,
+      T output,
+      Instant timestamp,
+      Collector<WindowedValue<OUTFL>> outCollector,
+      TupleTag<T> tag);
+
+  public abstract WindowingInternals<IN, OUTDF> windowingInternalsHelper(
+      WindowedValue<IN> inElement,
+      Collector<WindowedValue<OUTFL>> outCollector);
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index fb3d329..55235c9 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -68,562 +68,562 @@ import java.util.*;
  * for furhter processing.
  */
 public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
-		extends AbstractStreamOperator<WindowedValue<KV<K, VOUT>>>
-		implements OneInputStreamOperator<WindowedValue<KV<K, VIN>>, WindowedValue<KV<K, VOUT>>> {
-
-	private static final long serialVersionUID = 1L;
-
-	private transient PipelineOptions options;
-
-	private transient CoderRegistry coderRegistry;
-
-	private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator;
-
-	private ProcessContext context;
-
-	private final WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy;
-
-	private final Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn;
-
-	private final KvCoder<K, VIN> inputKvCoder;
-
-	/**
-	 * State is kept <b>per-key</b>. This data structure keeps this mapping between an active key, i.e. a
-	 * key whose elements are currently waiting to be processed, and its associated state.
-	 */
-	private Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>();
-
-	/**
-	 * Timers waiting to be processed.
-	 */
-	private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
-
-	private FlinkTimerInternals timerInternals = new FlinkTimerInternals();
-
-	/**
-	 * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy.
-	 * This method assumes that <b>elements are already grouped by key</b>.
-	 * <p/>
-	 * The difference with {@link #createForIterable(PipelineOptions, PCollection, KeyedStream)}
-	 * is that this method assumes that a combiner function is provided
-	 * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
-	 * A combiner helps at increasing the speed and, in most of the cases, reduce the per-window state.
-	 *
-	 * @param options            the general job configuration options.
-	 * @param input              the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}.
-	 * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key.
-	 * @param combiner           the combiner to be used.
-	 * @param outputKvCoder      the type of the output values.
-	 */
-	public static <K, VIN, VACC, VOUT> DataStream<WindowedValue<KV<K, VOUT>>> create(
-			PipelineOptions options,
-			PCollection input,
-			KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey,
-			Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner,
-			KvCoder<K, VOUT> outputKvCoder) {
-		Preconditions.checkNotNull(options);
-
-		KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
-		FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper<>(options,
-				input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, combiner);
-
-		Coder<WindowedValue<KV<K, VOUT>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of(
-				outputKvCoder,
-				input.getWindowingStrategy().getWindowFn().windowCoder());
-
-		CoderTypeInformation<WindowedValue<KV<K, VOUT>>> outputTypeInfo =
-				new CoderTypeInformation<>(windowedOutputElemCoder);
-
-		DataStream<WindowedValue<KV<K, VOUT>>> groupedByKeyAndWindow = groupedStreamByKey
-				.transform("GroupByWindowWithCombiner",
-						new CoderTypeInformation<>(outputKvCoder),
-						windower)
-				.returns(outputTypeInfo);
-
-		return groupedByKeyAndWindow;
-	}
-
-	/**
-	 * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy.
-	 * This method assumes that <b>elements are already grouped by key</b>.
-	 * <p/>
-	 * The difference with {@link #create(PipelineOptions, PCollection, KeyedStream, Combine.KeyedCombineFn, KvCoder)}
-	 * is that this method assumes no combiner function
-	 * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
-	 *
-	 * @param options            the general job configuration options.
-	 * @param input              the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}.
-	 * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key.
-	 */
-	public static <K, VIN> DataStream<WindowedValue<KV<K, Iterable<VIN>>>> createForIterable(
-			PipelineOptions options,
-			PCollection input,
-			KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey) {
-		Preconditions.checkNotNull(options);
-
-		KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
-		Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-		Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
-
-		FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper(options,
-				input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, null);
-
-		Coder<Iterable<VIN>> valueIterCoder = IterableCoder.of(inputValueCoder);
-		KvCoder<K, Iterable<VIN>> outputElemCoder = KvCoder.of(keyCoder, valueIterCoder);
-
-		Coder<WindowedValue<KV<K, Iterable<VIN>>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of(
-				outputElemCoder,
-				input.getWindowingStrategy().getWindowFn().windowCoder());
-
-		CoderTypeInformation<WindowedValue<KV<K, Iterable<VIN>>>> outputTypeInfo =
-				new CoderTypeInformation<>(windowedOutputElemCoder);
-
-		DataStream<WindowedValue<KV<K, Iterable<VIN>>>> groupedByKeyAndWindow = groupedStreamByKey
-				.transform("GroupByWindow",
-						new CoderTypeInformation<>(windowedOutputElemCoder),
-						windower)
-				.returns(outputTypeInfo);
-
-		return groupedByKeyAndWindow;
-	}
-
-	public static <K, VIN, VACC, VOUT> FlinkGroupAlsoByWindowWrapper
-	createForTesting(PipelineOptions options,
-	                 CoderRegistry registry,
-	                 WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
-	                 KvCoder<K, VIN> inputCoder,
-	                 Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
-		Preconditions.checkNotNull(options);
-
-		return new FlinkGroupAlsoByWindowWrapper(options, registry, windowingStrategy, inputCoder, combiner);
-	}
-
-	private FlinkGroupAlsoByWindowWrapper(PipelineOptions options,
-	                                      CoderRegistry registry,
-	                                      WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
-	                                      KvCoder<K, VIN> inputCoder,
-	                                      Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
-		Preconditions.checkNotNull(options);
-
-		this.options = Preconditions.checkNotNull(options);
-		this.coderRegistry = Preconditions.checkNotNull(registry);
-		this.inputKvCoder = Preconditions.checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder();
-		this.windowingStrategy = Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy();
-		this.combineFn = combiner;
-		this.operator = createGroupAlsoByWindowOperator();
-		this.chainingStrategy = ChainingStrategy.ALWAYS;
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals);
-	}
-
-	/**
-	 * Create the adequate {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn},
-	 * <b> if not already created</b>.
-	 * If a {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn} was provided, then
-	 * a function with that combiner is created, so that elements are combined as they arrive. This is
-	 * done for speed and (in most of the cases) for reduction of the per-window state.
-	 */
-	private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() {
-		if (this.operator == null) {
-			if (this.combineFn == null) {
-				// Thus VOUT == Iterable<VIN>
-				Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
-
-				this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create(
-						(WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder));
-			} else {
-				Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
-
-				AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = AppliedCombineFn
-						.withInputCoder(combineFn, coderRegistry, inputKvCoder);
-
-				this.operator = GroupAlsoByWindowViaWindowSetDoFn.create(
-						(WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn));
-			}
-		}
-		return this.operator;
-	}
-
-	private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception {
-		context.setElement(workItem, getStateInternalsForKey(workItem.key()));
-
-		// TODO: Ideally startBundle/finishBundle would be called when the operator is first used / about to be discarded.
-		operator.startBundle(context);
-		operator.processElement(context);
-		operator.finishBundle(context);
-	}
-
-	@Override
-	public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element) throws Exception {
-		ArrayList<WindowedValue<VIN>> elements = new ArrayList<>();
-		elements.add(WindowedValue.of(element.getValue().getValue().getValue(), element.getValue().getTimestamp(),
-				element.getValue().getWindows(), element.getValue().getPane()));
-		processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(), elements));
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		context.setCurrentInputWatermark(new Instant(mark.getTimestamp()));
-
-		Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp());
-		if (!timers.isEmpty()) {
-			for (K key : timers.keySet()) {
-				processKeyedWorkItem(KeyedWorkItems.<K, VIN>timersWorkItem(key, timers.get(key)));
-			}
-		}
-
-		/**
-		 * This is to take into account the different semantics of the Watermark in Flink and
-		 * in Dataflow. To understand the reasoning behind the Dataflow semantics and its
-		 * watermark holding logic, see the documentation of
-		 * {@link WatermarkHold#addHold(ReduceFn.ProcessValueContext, boolean)}
-		 * */
-		long millis = Long.MAX_VALUE;
-		for (FlinkStateInternals state : perKeyStateInternals.values()) {
-			Instant watermarkHold = state.getWatermarkHold();
-			if (watermarkHold != null && watermarkHold.getMillis() < millis) {
-				millis = watermarkHold.getMillis();
-			}
-		}
-
-		if (mark.getTimestamp() < millis) {
-			millis = mark.getTimestamp();
-		}
-
-		context.setCurrentOutputWatermark(new Instant(millis));
-
-		// Don't forget to re-emit the watermark for further operators down the line.
-		// This is critical for jobs with multiple aggregation steps.
-		// Imagine a job with a groupByKey() on key K1, followed by a map() that changes
-		// the key K1 to K2, and another groupByKey() on K2. In this case, if the watermark
-		// is not re-emitted, the second aggregation would never be triggered, and no result
-		// will be produced.
-		output.emitWatermark(new Watermark(millis));
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-	}
-
-	private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
-		Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
-		if (timersForKey == null) {
-			timersForKey = new HashSet<>();
-		}
-		timersForKey.add(timer);
-		activeTimers.put(key, timersForKey);
-	}
-
-	private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
-		Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
-		if (timersForKey != null) {
-			timersForKey.remove(timer);
-			if (timersForKey.isEmpty()) {
-				activeTimers.remove(key);
-			} else {
-				activeTimers.put(key, timersForKey);
-			}
-		}
-	}
-
-	/**
-	 * Returns the list of timers that are ready to fire. These are the timers
-	 * that are registered to be triggered at a time before the current watermark.
-	 * We keep these timers in a Set, so that they are deduplicated, as the same
-	 * timer can be registered multiple times.
-	 */
-	private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) {
-
-		// we keep the timers to return in a different list and launch them later
-		// because we cannot prevent a trigger from registering another trigger,
-		// which would lead to concurrent modification exception.
-		Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create();
-
-		Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator();
-		while (it.hasNext()) {
-			Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
-
-			Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
-			while (timerIt.hasNext()) {
-				TimerInternals.TimerData timerData = timerIt.next();
-				if (timerData.getTimestamp().isBefore(currentWatermark)) {
-					toFire.put(keyWithTimers.getKey(), timerData);
-					timerIt.remove();
-				}
-			}
-
-			if (keyWithTimers.getValue().isEmpty()) {
-				it.remove();
-			}
-		}
-		return toFire;
-	}
-
-	/**
-	 * Gets the state associated with the specified key.
-	 *
-	 * @param key the key whose state we want.
-	 * @return The {@link FlinkStateInternals}
-	 * associated with that key.
-	 */
-	private FlinkStateInternals<K> getStateInternalsForKey(K key) {
-		FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key);
-		if (stateInternals == null) {
-			Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
-			OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getWindowFn().getOutputTimeFn();
-			stateInternals = new FlinkStateInternals<>(key, inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn);
-			perKeyStateInternals.put(key, stateInternals);
-		}
-		return stateInternals;
-	}
-
-	private class FlinkTimerInternals extends AbstractFlinkTimerInternals<K, VIN> {
-		@Override
-		public void setTimer(TimerData timerKey) {
-			registerActiveTimer(context.element().key(), timerKey);
-		}
-
-		@Override
-		public void deleteTimer(TimerData timerKey) {
-			unregisterActiveTimer(context.element().key(), timerKey);
-		}
-	}
-
-	private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, VIN, VOUT, ?, KeyedWorkItem<K, VIN>>.ProcessContext {
-
-		private final FlinkTimerInternals timerInternals;
-
-		private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector;
-
-		private FlinkStateInternals<K> stateInternals;
-
-		private KeyedWorkItem<K, VIN> element;
-
-		public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function,
-		                      TimestampedCollector<WindowedValue<KV<K, VOUT>>> outCollector,
-		                      FlinkTimerInternals timerInternals) {
-			function.super();
-			super.setupDelegateAggregators();
-
-			this.collector = Preconditions.checkNotNull(outCollector);
-			this.timerInternals = Preconditions.checkNotNull(timerInternals);
-		}
-
-		public void setElement(KeyedWorkItem<K, VIN> element,
-		                       FlinkStateInternals<K> stateForKey) {
-			this.element = element;
-			this.stateInternals = stateForKey;
-		}
-
-		public void setCurrentInputWatermark(Instant watermark) {
-			this.timerInternals.setCurrentInputWatermark(watermark);
-		}
-
-		public void setCurrentOutputWatermark(Instant watermark) {
-			this.timerInternals.setCurrentOutputWatermark(watermark);
-		}
-
-		@Override
-		public KeyedWorkItem<K, VIN> element() {
-			return this.element;
-		}
-
-		@Override
-		public Instant timestamp() {
-			throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems.");
-		}
-
-		@Override
-		public PipelineOptions getPipelineOptions() {
-			// TODO: PipelineOptions need to be available on the workers.
-			// Ideally they are captured as part of the pipeline.
-			// For now, construct empty options so that StateContexts.createFromComponents
-			// will yield a valid StateContext, which is needed to support the StateContext.window().
-			if (options == null) {
-				options = new PipelineOptions() {
-					@Override
-					public <T extends PipelineOptions> T as(Class<T> kls) {
-						return null;
-					}
-
-					@Override
-					public <T extends PipelineOptions> T cloneAs(Class<T> kls) {
-						return null;
-					}
-
-					@Override
-					public Class<? extends PipelineRunner<?>> getRunner() {
-						return null;
-					}
-
-					@Override
-					public void setRunner(Class<? extends PipelineRunner<?>> kls) {
-
-					}
-
-					@Override
-					public CheckEnabled getStableUniqueNames() {
-						return null;
-					}
-
-					@Override
-					public void setStableUniqueNames(CheckEnabled enabled) {
-					}
-				};
-			}
-			return options;
-		}
-
-		@Override
-		public void output(KV<K, VOUT> output) {
-			throw new UnsupportedOperationException(
-					"output() is not available when processing KeyedWorkItems.");
-		}
-
-		@Override
-		public void outputWithTimestamp(KV<K, VOUT> output, Instant timestamp) {
-			throw new UnsupportedOperationException(
-					"outputWithTimestamp() is not available when processing KeyedWorkItems.");
-		}
-
-		@Override
-		public PaneInfo pane() {
-			throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems.");
-		}
-
-		@Override
-		public BoundedWindow window() {
-			throw new UnsupportedOperationException(
-					"window() is not available when processing KeyedWorkItems.");
-		}
-
-		@Override
-		public WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>> windowingInternals() {
-			return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() {
-
-				@Override
-				public com.google.cloud.dataflow.sdk.util.state.StateInternals stateInternals() {
-					return stateInternals;
-				}
-
-				@Override
-				public void outputWindowedValue(KV<K, VOUT> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-					// TODO: No need to represent timestamp twice.
-					collector.setAbsoluteTimestamp(timestamp.getMillis());
-					collector.collect(WindowedValue.of(output, timestamp, windows, pane));
-
-				}
-
-				@Override
-				public TimerInternals timerInternals() {
-					return timerInternals;
-				}
-
-				@Override
-				public Collection<? extends BoundedWindow> windows() {
-					throw new UnsupportedOperationException("windows() is not available in Streaming mode.");
-				}
-
-				@Override
-				public PaneInfo pane() {
-					throw new UnsupportedOperationException("pane() is not available in Streaming mode.");
-				}
-
-				@Override
-				public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
-					throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
-				}
-
-				@Override
-				public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-					throw new RuntimeException("sideInput() is not available in Streaming mode.");
-				}
-			};
-		}
-
-		@Override
-		public <T> T sideInput(PCollectionView<T> view) {
-			throw new RuntimeException("sideInput() is not supported in Streaming mode.");
-		}
-
-		@Override
-		public <T> void sideOutput(TupleTag<T> tag, T output) {
-			// ignore the side output, this can happen when a user does not register
-			// side outputs but then outputs using a freshly created TupleTag.
-			throw new RuntimeException("sideOutput() is not available when grouping by window.");
-		}
-
-		@Override
-		public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-			sideOutput(tag, output);
-		}
-
-		@Override
-		protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-			Accumulator acc = getRuntimeContext().getAccumulator(name);
-			if (acc != null) {
-				AccumulatorHelper.compareAccumulatorTypes(name,
-						SerializableFnAggregatorWrapper.class, acc.getClass());
-				return (Aggregator<AggInputT, AggOutputT>) acc;
-			}
-
-			SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator =
-					new SerializableFnAggregatorWrapper<>(combiner);
-			getRuntimeContext().addAccumulator(name, accumulator);
-			return accumulator;
-		}
-	}
-
-	//////////////				Checkpointing implementation				////////////////
-
-	@Override
-	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
-		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-		AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
-		StateCheckpointWriter writer = StateCheckpointWriter.create(out);
-		Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-
-		// checkpoint the timers
-		StateCheckpointUtils.encodeTimers(activeTimers, writer, keyCoder);
-
-		// checkpoint the state
-		StateCheckpointUtils.encodeState(perKeyStateInternals, writer, keyCoder);
-
-		// checkpoint the timerInternals
-		context.timerInternals.encodeTimerInternals(context, writer,
-				inputKvCoder, windowingStrategy.getWindowFn().windowCoder());
-
-		taskState.setOperatorState(out.closeAndGetHandle());
-		return taskState;
-	}
-
-	@Override
-	public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception {
-		super.restoreState(taskState, recoveryTimestamp);
-
-		final ClassLoader userClassloader = getUserCodeClassloader();
-
-		Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
-		Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-
-		@SuppressWarnings("unchecked")
-		StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
-		DataInputView in = inputState.getState(userClassloader);
-		StateCheckpointReader reader = new StateCheckpointReader(in);
-
-		// restore the timers
-		this.activeTimers = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder);
-
-		// restore the state
-		this.perKeyStateInternals = StateCheckpointUtils.decodeState(
-				reader, windowingStrategy.getOutputTimeFn(), keyCoder, windowCoder, userClassloader);
-
-		// restore the timerInternals.
-		this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder);
-	}
+    extends AbstractStreamOperator<WindowedValue<KV<K, VOUT>>>
+    implements OneInputStreamOperator<WindowedValue<KV<K, VIN>>, WindowedValue<KV<K, VOUT>>> {
+
+  private static final long serialVersionUID = 1L;
+
+  private transient PipelineOptions options;
+
+  private transient CoderRegistry coderRegistry;
+
+  private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator;
+
+  private ProcessContext context;
+
+  private final WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy;
+
+  private final Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn;
+
+  private final KvCoder<K, VIN> inputKvCoder;
+
+  /**
+   * State is kept <b>per-key</b>. This data structure keeps this mapping between an active key, i.e. a
+   * key whose elements are currently waiting to be processed, and its associated state.
+   */
+  private Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>();
+
+  /**
+   * Timers waiting to be processed.
+   */
+  private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+
+  private FlinkTimerInternals timerInternals = new FlinkTimerInternals();
+
+  /**
+   * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy.
+   * This method assumes that <b>elements are already grouped by key</b>.
+   * <p/>
+   * The difference with {@link #createForIterable(PipelineOptions, PCollection, KeyedStream)}
+   * is that this method assumes that a combiner function is provided
+   * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
+   * A combiner helps at increasing the speed and, in most of the cases, reduce the per-window state.
+   *
+   * @param options            the general job configuration options.
+   * @param input              the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}.
+   * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key.
+   * @param combiner           the combiner to be used.
+   * @param outputKvCoder      the type of the output values.
+   */
+  public static <K, VIN, VACC, VOUT> DataStream<WindowedValue<KV<K, VOUT>>> create(
+      PipelineOptions options,
+      PCollection input,
+      KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey,
+      Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner,
+      KvCoder<K, VOUT> outputKvCoder) {
+    Preconditions.checkNotNull(options);
+
+    KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
+    FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper<>(options,
+        input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, combiner);
+
+    Coder<WindowedValue<KV<K, VOUT>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of(
+        outputKvCoder,
+        input.getWindowingStrategy().getWindowFn().windowCoder());
+
+    CoderTypeInformation<WindowedValue<KV<K, VOUT>>> outputTypeInfo =
+        new CoderTypeInformation<>(windowedOutputElemCoder);
+
+    DataStream<WindowedValue<KV<K, VOUT>>> groupedByKeyAndWindow = groupedStreamByKey
+        .transform("GroupByWindowWithCombiner",
+            new CoderTypeInformation<>(outputKvCoder),
+            windower)
+        .returns(outputTypeInfo);
+
+    return groupedByKeyAndWindow;
+  }
+
+  /**
+   * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy.
+   * This method assumes that <b>elements are already grouped by key</b>.
+   * <p/>
+   * The difference with {@link #create(PipelineOptions, PCollection, KeyedStream, Combine.KeyedCombineFn, KvCoder)}
+   * is that this method assumes no combiner function
+   * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
+   *
+   * @param options            the general job configuration options.
+   * @param input              the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}.
+   * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key.
+   */
+  public static <K, VIN> DataStream<WindowedValue<KV<K, Iterable<VIN>>>> createForIterable(
+      PipelineOptions options,
+      PCollection input,
+      KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey) {
+    Preconditions.checkNotNull(options);
+
+    KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
+    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+    Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
+
+    FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper(options,
+        input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, null);
+
+    Coder<Iterable<VIN>> valueIterCoder = IterableCoder.of(inputValueCoder);
+    KvCoder<K, Iterable<VIN>> outputElemCoder = KvCoder.of(keyCoder, valueIterCoder);
+
+    Coder<WindowedValue<KV<K, Iterable<VIN>>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of(
+        outputElemCoder,
+        input.getWindowingStrategy().getWindowFn().windowCoder());
+
+    CoderTypeInformation<WindowedValue<KV<K, Iterable<VIN>>>> outputTypeInfo =
+        new CoderTypeInformation<>(windowedOutputElemCoder);
+
+    DataStream<WindowedValue<KV<K, Iterable<VIN>>>> groupedByKeyAndWindow = groupedStreamByKey
+        .transform("GroupByWindow",
+            new CoderTypeInformation<>(windowedOutputElemCoder),
+            windower)
+        .returns(outputTypeInfo);
+
+    return groupedByKeyAndWindow;
+  }
+
+  public static <K, VIN, VACC, VOUT> FlinkGroupAlsoByWindowWrapper
+  createForTesting(PipelineOptions options,
+                   CoderRegistry registry,
+                   WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
+                   KvCoder<K, VIN> inputCoder,
+                   Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
+    Preconditions.checkNotNull(options);
+
+    return new FlinkGroupAlsoByWindowWrapper(options, registry, windowingStrategy, inputCoder, combiner);
+  }
+
+  private FlinkGroupAlsoByWindowWrapper(PipelineOptions options,
+                                        CoderRegistry registry,
+                                        WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
+                                        KvCoder<K, VIN> inputCoder,
+                                        Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
+    Preconditions.checkNotNull(options);
+
+    this.options = Preconditions.checkNotNull(options);
+    this.coderRegistry = Preconditions.checkNotNull(registry);
+    this.inputKvCoder = Preconditions.checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder();
+    this.windowingStrategy = Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy();
+    this.combineFn = combiner;
+    this.operator = createGroupAlsoByWindowOperator();
+    this.chainingStrategy = ChainingStrategy.ALWAYS;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+    this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals);
+  }
+
+  /**
+   * Create the adequate {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn},
+   * <b> if not already created</b>.
+   * If a {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn} was provided, then
+   * a function with that combiner is created, so that elements are combined as they arrive. This is
+   * done for speed and (in most of the cases) for reduction of the per-window state.
+   */
+  private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() {
+    if (this.operator == null) {
+      if (this.combineFn == null) {
+        // Thus VOUT == Iterable<VIN>
+        Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
+
+        this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create(
+            (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder));
+      } else {
+        Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+
+        AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = AppliedCombineFn
+            .withInputCoder(combineFn, coderRegistry, inputKvCoder);
+
+        this.operator = GroupAlsoByWindowViaWindowSetDoFn.create(
+            (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn));
+      }
+    }
+    return this.operator;
+  }
+
+  private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception {
+    context.setElement(workItem, getStateInternalsForKey(workItem.key()));
+
+    // TODO: Ideally startBundle/finishBundle would be called when the operator is first used / about to be discarded.
+    operator.startBundle(context);
+    operator.processElement(context);
+    operator.finishBundle(context);
+  }
+
+  @Override
+  public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element) throws Exception {
+    ArrayList<WindowedValue<VIN>> elements = new ArrayList<>();
+    elements.add(WindowedValue.of(element.getValue().getValue().getValue(), element.getValue().getTimestamp(),
+        element.getValue().getWindows(), element.getValue().getPane()));
+    processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(), elements));
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    context.setCurrentInputWatermark(new Instant(mark.getTimestamp()));
+
+    Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp());
+    if (!timers.isEmpty()) {
+      for (K key : timers.keySet()) {
+        processKeyedWorkItem(KeyedWorkItems.<K, VIN>timersWorkItem(key, timers.get(key)));
+      }
+    }
+
+    /**
+     * This is to take into account the different semantics of the Watermark in Flink and
+     * in Dataflow. To understand the reasoning behind the Dataflow semantics and its
+     * watermark holding logic, see the documentation of
+     * {@link WatermarkHold#addHold(ReduceFn.ProcessValueContext, boolean)}
+     * */
+    long millis = Long.MAX_VALUE;
+    for (FlinkStateInternals state : perKeyStateInternals.values()) {
+      Instant watermarkHold = state.getWatermarkHold();
+      if (watermarkHold != null && watermarkHold.getMillis() < millis) {
+        millis = watermarkHold.getMillis();
+      }
+    }
+
+    if (mark.getTimestamp() < millis) {
+      millis = mark.getTimestamp();
+    }
+
+    context.setCurrentOutputWatermark(new Instant(millis));
+
+    // Don't forget to re-emit the watermark for further operators down the line.
+    // This is critical for jobs with multiple aggregation steps.
+    // Imagine a job with a groupByKey() on key K1, followed by a map() that changes
+    // the key K1 to K2, and another groupByKey() on K2. In this case, if the watermark
+    // is not re-emitted, the second aggregation would never be triggered, and no result
+    // will be produced.
+    output.emitWatermark(new Watermark(millis));
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+  }
+
+  private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
+    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+    if (timersForKey == null) {
+      timersForKey = new HashSet<>();
+    }
+    timersForKey.add(timer);
+    activeTimers.put(key, timersForKey);
+  }
+
+  private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
+    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+    if (timersForKey != null) {
+      timersForKey.remove(timer);
+      if (timersForKey.isEmpty()) {
+        activeTimers.remove(key);
+      } else {
+        activeTimers.put(key, timersForKey);
+      }
+    }
+  }
+
+  /**
+   * Returns the list of timers that are ready to fire. These are the timers
+   * that are registered to be triggered at a time before the current watermark.
+   * We keep these timers in a Set, so that they are deduplicated, as the same
+   * timer can be registered multiple times.
+   */
+  private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) {
+
+    // we keep the timers to return in a different list and launch them later
+    // because we cannot prevent a trigger from registering another trigger,
+    // which would lead to concurrent modification exception.
+    Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create();
+
+    Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
+
+      Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
+      while (timerIt.hasNext()) {
+        TimerInternals.TimerData timerData = timerIt.next();
+        if (timerData.getTimestamp().isBefore(currentWatermark)) {
+          toFire.put(keyWithTimers.getKey(), timerData);
+          timerIt.remove();
+        }
+      }
+
+      if (keyWithTimers.getValue().isEmpty()) {
+        it.remove();
+      }
+    }
+    return toFire;
+  }
+
+  /**
+   * Gets the state associated with the specified key.
+   *
+   * @param key the key whose state we want.
+   * @return The {@link FlinkStateInternals}
+   * associated with that key.
+   */
+  private FlinkStateInternals<K> getStateInternalsForKey(K key) {
+    FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key);
+    if (stateInternals == null) {
+      Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
+      OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getWindowFn().getOutputTimeFn();
+      stateInternals = new FlinkStateInternals<>(key, inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn);
+      perKeyStateInternals.put(key, stateInternals);
+    }
+    return stateInternals;
+  }
+
+  private class FlinkTimerInternals extends AbstractFlinkTimerInternals<K, VIN> {
+    @Override
+    public void setTimer(TimerData timerKey) {
+      registerActiveTimer(context.element().key(), timerKey);
+    }
+
+    @Override
+    public void deleteTimer(TimerData timerKey) {
+      unregisterActiveTimer(context.element().key(), timerKey);
+    }
+  }
+
+  private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, VIN, VOUT, ?, KeyedWorkItem<K, VIN>>.ProcessContext {
+
+    private final FlinkTimerInternals timerInternals;
+
+    private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector;
+
+    private FlinkStateInternals<K> stateInternals;
+
+    private KeyedWorkItem<K, VIN> element;
+
+    public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function,
+                          TimestampedCollector<WindowedValue<KV<K, VOUT>>> outCollector,
+                          FlinkTimerInternals timerInternals) {
+      function.super();
+      super.setupDelegateAggregators();
+
+      this.collector = Preconditions.checkNotNull(outCollector);
+      this.timerInternals = Preconditions.checkNotNull(timerInternals);
+    }
+
+    public void setElement(KeyedWorkItem<K, VIN> element,
+                           FlinkStateInternals<K> stateForKey) {
+      this.element = element;
+      this.stateInternals = stateForKey;
+    }
+
+    public void setCurrentInputWatermark(Instant watermark) {
+      this.timerInternals.setCurrentInputWatermark(watermark);
+    }
+
+    public void setCurrentOutputWatermark(Instant watermark) {
+      this.timerInternals.setCurrentOutputWatermark(watermark);
+    }
+
+    @Override
+    public KeyedWorkItem<K, VIN> element() {
+      return this.element;
+    }
+
+    @Override
+    public Instant timestamp() {
+      throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      // TODO: PipelineOptions need to be available on the workers.
+      // Ideally they are captured as part of the pipeline.
+      // For now, construct empty options so that StateContexts.createFromComponents
+      // will yield a valid StateContext, which is needed to support the StateContext.window().
+      if (options == null) {
+        options = new PipelineOptions() {
+          @Override
+          public <T extends PipelineOptions> T as(Class<T> kls) {
+            return null;
+          }
+
+          @Override
+          public <T extends PipelineOptions> T cloneAs(Class<T> kls) {
+            return null;
+          }
+
+          @Override
+          public Class<? extends PipelineRunner<?>> getRunner() {
+            return null;
+          }
+
+          @Override
+          public void setRunner(Class<? extends PipelineRunner<?>> kls) {
+
+          }
+
+          @Override
+          public CheckEnabled getStableUniqueNames() {
+            return null;
+          }
+
+          @Override
+          public void setStableUniqueNames(CheckEnabled enabled) {
+          }
+        };
+      }
+      return options;
+    }
+
+    @Override
+    public void output(KV<K, VOUT> output) {
+      throw new UnsupportedOperationException(
+          "output() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public void outputWithTimestamp(KV<K, VOUT> output, Instant timestamp) {
+      throw new UnsupportedOperationException(
+          "outputWithTimestamp() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public PaneInfo pane() {
+      throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public BoundedWindow window() {
+      throw new UnsupportedOperationException(
+          "window() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>> windowingInternals() {
+      return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() {
+
+        @Override
+        public com.google.cloud.dataflow.sdk.util.state.StateInternals stateInternals() {
+          return stateInternals;
+        }
+
+        @Override
+        public void outputWindowedValue(KV<K, VOUT> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+          // TODO: No need to represent timestamp twice.
+          collector.setAbsoluteTimestamp(timestamp.getMillis());
+          collector.collect(WindowedValue.of(output, timestamp, windows, pane));
+
+        }
+
+        @Override
+        public TimerInternals timerInternals() {
+          return timerInternals;
+        }
+
+        @Override
+        public Collection<? extends BoundedWindow> windows() {
+          throw new UnsupportedOperationException("windows() is not available in Streaming mode.");
+        }
+
+        @Override
+        public PaneInfo pane() {
+          throw new UnsupportedOperationException("pane() is not available in Streaming mode.");
+        }
+
+        @Override
+        public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+          throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
+        }
+
+        @Override
+        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+          throw new RuntimeException("sideInput() is not available in Streaming mode.");
+        }
+      };
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      throw new RuntimeException("sideInput() is not supported in Streaming mode.");
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      // ignore the side output, this can happen when a user does not register
+      // side outputs but then outputs using a freshly created TupleTag.
+      throw new RuntimeException("sideOutput() is not available when grouping by window.");
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      sideOutput(tag, output);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      Accumulator acc = getRuntimeContext().getAccumulator(name);
+      if (acc != null) {
+        AccumulatorHelper.compareAccumulatorTypes(name,
+            SerializableFnAggregatorWrapper.class, acc.getClass());
+        return (Aggregator<AggInputT, AggOutputT>) acc;
+      }
+
+      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator =
+          new SerializableFnAggregatorWrapper<>(combiner);
+      getRuntimeContext().addAccumulator(name, accumulator);
+      return accumulator;
+    }
+  }
+
+  //////////////        Checkpointing implementation        ////////////////
+
+  @Override
+  public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+    StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+    AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+    StateCheckpointWriter writer = StateCheckpointWriter.create(out);
+    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+
+    // checkpoint the timers
+    StateCheckpointUtils.encodeTimers(activeTimers, writer, keyCoder);
+
+    // checkpoint the state
+    StateCheckpointUtils.encodeState(perKeyStateInternals, writer, keyCoder);
+
+    // checkpoint the timerInternals
+    context.timerInternals.encodeTimerInternals(context, writer,
+        inputKvCoder, windowingStrategy.getWindowFn().windowCoder());
+
+    taskState.setOperatorState(out.closeAndGetHandle());
+    return taskState;
+  }
+
+  @Override
+  public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception {
+    super.restoreState(taskState, recoveryTimestamp);
+
+    final ClassLoader userClassloader = getUserCodeClassloader();
+
+    Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
+    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+
+    @SuppressWarnings("unchecked")
+    StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
+    DataInputView in = inputState.getState(userClassloader);
+    StateCheckpointReader reader = new StateCheckpointReader(in);
+
+    // restore the timers
+    this.activeTimers = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder);
+
+    // restore the state
+    this.perKeyStateInternals = StateCheckpointUtils.decodeState(
+        reader, windowingStrategy.getOutputTimeFn(), keyCoder, windowCoder, userClassloader);
+
+    // restore the timerInternals.
+    this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
index 24f6d40..d01cf81 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
@@ -34,31 +34,31 @@ import org.apache.flink.streaming.api.datastream.KeyedStream;
  * */
 public class FlinkGroupByKeyWrapper {
 
-	/**
-	 * Just an auxiliary interface to bypass the fact that java anonymous classes cannot implement
-	 * multiple interfaces.
-	 */
-	private interface KeySelectorWithQueryableResultType<K, V> extends KeySelector<WindowedValue<KV<K, V>>, K>, ResultTypeQueryable<K> {
-	}
+  /**
+   * Just an auxiliary interface to bypass the fact that java anonymous classes cannot implement
+   * multiple interfaces.
+   */
+  private interface KeySelectorWithQueryableResultType<K, V> extends KeySelector<WindowedValue<KV<K, V>>, K>, ResultTypeQueryable<K> {
+  }
 
-	public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, KvCoder<K, V> inputKvCoder) {
-		final Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-		final TypeInformation<K> keyTypeInfo = new CoderTypeInformation<>(keyCoder);
-		final boolean isKeyVoid = keyCoder instanceof VoidCoder;
+  public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, KvCoder<K, V> inputKvCoder) {
+    final Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+    final TypeInformation<K> keyTypeInfo = new CoderTypeInformation<>(keyCoder);
+    final boolean isKeyVoid = keyCoder instanceof VoidCoder;
 
-		return inputDataStream.keyBy(
-				new KeySelectorWithQueryableResultType<K, V>() {
+    return inputDataStream.keyBy(
+        new KeySelectorWithQueryableResultType<K, V>() {
 
-					@Override
-					public K getKey(WindowedValue<KV<K, V>> value) throws Exception {
-						return isKeyVoid ? (K) VoidCoderTypeSerializer.VoidValue.INSTANCE :
-								value.getValue().getKey();
-					}
+          @Override
+          public K getKey(WindowedValue<KV<K, V>> value) throws Exception {
+            return isKeyVoid ? (K) VoidCoderTypeSerializer.VoidValue.INSTANCE :
+                value.getValue().getKey();
+          }
 
-					@Override
-					public TypeInformation<K> getProducedType() {
-						return keyTypeInfo;
-					}
-				});
-	}
+          @Override
+          public TypeInformation<K> getProducedType() {
+            return keyTypeInfo;
+          }
+        });
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
index d65cbc3..066a55c 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
@@ -33,43 +33,43 @@ import java.util.Map;
  * */
 public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, RawUnionValue> {
 
-	private final TupleTag<?> mainTag;
-	private final Map<TupleTag<?>, Integer> outputLabels;
+  private final TupleTag<?> mainTag;
+  private final Map<TupleTag<?>, Integer> outputLabels;
 
-	public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) {
-		super(options, windowingStrategy, doFn);
-		this.mainTag = Preconditions.checkNotNull(mainTag);
-		this.outputLabels = Preconditions.checkNotNull(tagsToLabels);
-	}
+  public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) {
+    super(options, windowingStrategy, doFn);
+    this.mainTag = Preconditions.checkNotNull(mainTag);
+    this.outputLabels = Preconditions.checkNotNull(tagsToLabels);
+  }
 
-	@Override
-	public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector) {
-		checkTimestamp(inElement, timestamp);
-		Integer index = outputLabels.get(mainTag);
-		collector.collect(makeWindowedValue(
-				new RawUnionValue(index, output),
-				timestamp,
-				inElement.getWindows(),
-				inElement.getPane()));
-	}
+  @Override
+  public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector) {
+    checkTimestamp(inElement, timestamp);
+    Integer index = outputLabels.get(mainTag);
+    collector.collect(makeWindowedValue(
+        new RawUnionValue(index, output),
+        timestamp,
+        inElement.getWindows(),
+        inElement.getPane()));
+  }
 
-	@Override
-	public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector, TupleTag<T> tag) {
-		checkTimestamp(inElement, timestamp);
-		Integer index = outputLabels.get(tag);
-		if (index != null) {
-			collector.collect(makeWindowedValue(
-					new RawUnionValue(index, output),
-					timestamp,
-					inElement.getWindows(),
-					inElement.getPane()));
-		}
-	}
+  @Override
+  public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector, TupleTag<T> tag) {
+    checkTimestamp(inElement, timestamp);
+    Integer index = outputLabels.get(tag);
+    if (index != null) {
+      collector.collect(makeWindowedValue(
+          new RawUnionValue(index, output),
+          timestamp,
+          inElement.getWindows(),
+          inElement.getPane()));
+    }
+  }
 
-	@Override
-	public WindowingInternals<IN, OUT> windowingInternalsHelper(WindowedValue<IN> inElement, Collector<WindowedValue<RawUnionValue>> outCollector) {
-		throw new RuntimeException("FlinkParDoBoundMultiWrapper is just an internal operator serving as " +
-				"an intermediate transformation for the ParDo.BoundMulti translation. windowingInternals() " +
-				"is not available in this class.");
-	}
+  @Override
+  public WindowingInternals<IN, OUT> windowingInternalsHelper(WindowedValue<IN> inElement, Collector<WindowedValue<RawUnionValue>> outCollector) {
+    throw new RuntimeException("FlinkParDoBoundMultiWrapper is just an internal operator serving as " +
+        "an intermediate transformation for the ParDo.BoundMulti translation. windowingInternals() " +
+        "is not available in this class.");
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
index b0d8a76..b3a7090 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
@@ -35,64 +35,64 @@ import java.util.*;
  * */
 public class FlinkParDoBoundWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, OUT> {
 
-	public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) {
-		super(options, windowingStrategy, doFn);
-	}
+  public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) {
+    super(options, windowingStrategy, doFn);
+  }
 
-	@Override
-	public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<OUT>> collector) {
-		checkTimestamp(inElement, timestamp);
-		collector.collect(makeWindowedValue(
-				output,
-				timestamp,
-				inElement.getWindows(),
-				inElement.getPane()));
-	}
+  @Override
+  public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<OUT>> collector) {
+    checkTimestamp(inElement, timestamp);
+    collector.collect(makeWindowedValue(
+        output,
+        timestamp,
+        inElement.getWindows(),
+        inElement.getPane()));
+  }
 
-	@Override
-	public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<OUT>> outCollector, TupleTag<T> tag) {
-		// ignore the side output, this can happen when a user does not register
-		// side outputs but then outputs using a freshly created TupleTag.
-		throw new RuntimeException("sideOutput() not not available in ParDo.Bound().");
-	}
+  @Override
+  public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<OUT>> outCollector, TupleTag<T> tag) {
+    // ignore the side output, this can happen when a user does not register
+    // side outputs but then outputs using a freshly created TupleTag.
+    throw new RuntimeException("sideOutput() not not available in ParDo.Bound().");
+  }
 
-	@Override
-	public WindowingInternals<IN, OUT> windowingInternalsHelper(final WindowedValue<IN> inElement, final Collector<WindowedValue<OUT>> collector) {
-		return new WindowingInternals<IN, OUT>() {
-			@Override
-			public StateInternals stateInternals() {
-				throw new NullPointerException("StateInternals are not available for ParDo.Bound().");
-			}
+  @Override
+  public WindowingInternals<IN, OUT> windowingInternalsHelper(final WindowedValue<IN> inElement, final Collector<WindowedValue<OUT>> collector) {
+    return new WindowingInternals<IN, OUT>() {
+      @Override
+      public StateInternals stateInternals() {
+        throw new NullPointerException("StateInternals are not available for ParDo.Bound().");
+      }
 
-			@Override
-			public void outputWindowedValue(OUT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-				collector.collect(makeWindowedValue(output, timestamp, windows, pane));
-			}
+      @Override
+      public void outputWindowedValue(OUT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+        collector.collect(makeWindowedValue(output, timestamp, windows, pane));
+      }
 
-			@Override
-			public TimerInternals timerInternals() {
-				throw new NullPointerException("TimeInternals are not available for ParDo.Bound().");
-			}
+      @Override
+      public TimerInternals timerInternals() {
+        throw new NullPointerException("TimeInternals are not available for ParDo.Bound().");
+      }
 
-			@Override
-			public Collection<? extends BoundedWindow> windows() {
-				return inElement.getWindows();
-			}
+      @Override
+      public Collection<? extends BoundedWindow> windows() {
+        return inElement.getWindows();
+      }
 
-			@Override
-			public PaneInfo pane() {
-				return inElement.getPane();
-			}
+      @Override
+      public PaneInfo pane() {
+        return inElement.getPane();
+      }
 
-			@Override
-			public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
-				throw new RuntimeException("writePCollectionViewData() not supported in Streaming mode.");
-			}
+      @Override
+      public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+        throw new RuntimeException("writePCollectionViewData() not supported in Streaming mode.");
+      }
 
-			@Override
-			public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-				throw new RuntimeException("sideInput() not implemented.");
-			}
-		};
-	}
+      @Override
+      public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+        throw new RuntimeException("sideInput() not implemented.");
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
index dc8e05a..39770c9 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
@@ -34,30 +34,30 @@ import java.util.List;
  */
 public class FlinkStreamingCreateFunction<IN, OUT> implements FlatMapFunction<IN, WindowedValue<OUT>> {
 
-	private final List<byte[]> elements;
-	private final Coder<OUT> coder;
-
-	public FlinkStreamingCreateFunction(List<byte[]> elements, Coder<OUT> coder) {
-		this.elements = elements;
-		this.coder = coder;
-	}
-
-	@Override
-	public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception {
-
-		@SuppressWarnings("unchecked")
-		OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE;
-		for (byte[] element : elements) {
-			ByteArrayInputStream bai = new ByteArrayInputStream(element);
-			OUT outValue = coder.decode(bai, Coder.Context.OUTER);
-
-			if (outValue == null) {
-				out.collect(WindowedValue.of(voidValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
-			} else {
-				out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
-			}
-		}
-
-		out.close();
-	}
+  private final List<byte[]> elements;
+  private final Coder<OUT> coder;
+
+  public FlinkStreamingCreateFunction(List<byte[]> elements, Coder<OUT> coder) {
+    this.elements = elements;
+    this.coder = coder;
+  }
+
+  @Override
+  public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception {
+
+    @SuppressWarnings("unchecked")
+    OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE;
+    for (byte[] element : elements) {
+      ByteArrayInputStream bai = new ByteArrayInputStream(element);
+      OUT outValue = coder.decode(bai, Coder.Context.OUTER);
+
+      if (outValue == null) {
+        out.collect(WindowedValue.of(voidValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+      } else {
+        out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+      }
+    }
+
+    out.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
index 699d256..4d6f4e2 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -31,50 +31,50 @@ import java.util.List;
  * */
 public class UnboundedFlinkSource<T, C extends UnboundedSource.CheckpointMark> extends UnboundedSource<T, C> {
 
-	private final PipelineOptions options;
-	private final RichParallelSourceFunction<T> flinkSource;
+  private final PipelineOptions options;
+  private final RichParallelSourceFunction<T> flinkSource;
 
-	public UnboundedFlinkSource(PipelineOptions pipelineOptions, RichParallelSourceFunction<T> source) {
-		if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) {
-			throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
-		}
-		options = Preconditions.checkNotNull(pipelineOptions);
-		flinkSource = Preconditions.checkNotNull(source);
-		validate();
-	}
+  public UnboundedFlinkSource(PipelineOptions pipelineOptions, RichParallelSourceFunction<T> source) {
+    if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) {
+      throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+    }
+    options = Preconditions.checkNotNull(pipelineOptions);
+    flinkSource = Preconditions.checkNotNull(source);
+    validate();
+  }
 
-	public RichParallelSourceFunction<T> getFlinkSource() {
-		return this.flinkSource;
-	}
+  public RichParallelSourceFunction<T> getFlinkSource() {
+    return this.flinkSource;
+  }
 
-	@Override
-	public List<? extends UnboundedSource<T, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
-		throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
-	}
+  @Override
+  public List<? extends UnboundedSource<T, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
+    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+  }
 
-	@Override
-	public UnboundedReader<T> createReader(PipelineOptions options, @Nullable C checkpointMark) {
-		throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
-	}
+  @Override
+  public UnboundedReader<T> createReader(PipelineOptions options, @Nullable C checkpointMark) {
+    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+  }
 
-	@Nullable
-	@Override
-	public Coder<C> getCheckpointMarkCoder() {
-		throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
-	}
+  @Nullable
+  @Override
+  public Coder<C> getCheckpointMarkCoder() {
+    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+  }
 
 
-	@Override
-	public void validate() {
-		Preconditions.checkNotNull(options);
-		Preconditions.checkNotNull(flinkSource);
-		if(!options.getRunner().equals(FlinkPipelineRunner.class)) {
-			throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
-		}
-	}
+  @Override
+  public void validate() {
+    Preconditions.checkNotNull(options);
+    Preconditions.checkNotNull(flinkSource);
+    if(!options.getRunner().equals(FlinkPipelineRunner.class)) {
+      throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+    }
+  }
 
-	@Override
-	public Coder<T> getDefaultOutputCoder() {
-		throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
-	}
+  @Override
+  public Coder<T> getDefaultOutputCoder() {
+    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+  }
 }