You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:18 UTC

[23/50] [abbrv] incubator-beam git commit: [flink] adjust directories according to package name

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
new file mode 100644
index 0000000..e2ceae6
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -0,0 +1,264 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.common.base.Preconditions;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.util.*;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.base.Throwables;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
+
+import java.util.Collection;
+
+/**
+ * An abstract class that encapsulates the common code of the the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.Bound}
+ * and {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} wrappers. See the {@link FlinkParDoBoundWrapper} and
+ * {@link FlinkParDoBoundMultiWrapper} for the actual wrappers of the aforementioned transformations.
+ * */
+public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFlatMapFunction<WindowedValue<IN>, WindowedValue<OUTFL>> {
+
+  private final DoFn<IN, OUTDF> doFn;
+  private final WindowingStrategy<?, ?> windowingStrategy;
+  private transient PipelineOptions options;
+
+  private DoFnProcessContext context;
+
+  public FlinkAbstractParDoWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUTDF> doFn) {
+    Preconditions.checkNotNull(options);
+    Preconditions.checkNotNull(windowingStrategy);
+    Preconditions.checkNotNull(doFn);
+
+    this.doFn = doFn;
+    this.options = options;
+    this.windowingStrategy = windowingStrategy;
+  }
+
+  private void initContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) {
+    if (this.context == null) {
+      this.context = new DoFnProcessContext(function, outCollector);
+    }
+  }
+
+  @Override
+  public void flatMap(WindowedValue<IN> value, Collector<WindowedValue<OUTFL>> out) throws Exception {
+    this.initContext(doFn, out);
+
+    // for each window the element belongs to, create a new copy here.
+    Collection<? extends BoundedWindow> windows = value.getWindows();
+    if (windows.size() <= 1) {
+      processElement(value);
+    } else {
+      for (BoundedWindow window : windows) {
+        processElement(WindowedValue.of(
+            value.getValue(), value.getTimestamp(), window, value.getPane()));
+      }
+    }
+  }
+
+  private void processElement(WindowedValue<IN> value) throws Exception {
+    this.context.setElement(value);
+    this.doFn.startBundle(context);
+    doFn.processElement(context);
+    this.doFn.finishBundle(context);
+  }
+
+  private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext {
+
+    private final DoFn<IN, OUTDF> fn;
+
+    protected final Collector<WindowedValue<OUTFL>> collector;
+
+    private WindowedValue<IN> element;
+
+    private DoFnProcessContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>> outCollector) {
+      function.super();
+      super.setupDelegateAggregators();
+
+      this.fn = function;
+      this.collector = outCollector;
+    }
+
+    public void setElement(WindowedValue<IN> value) {
+      this.element = value;
+    }
+
+    @Override
+    public IN element() {
+      return this.element.getValue();
+    }
+
+    @Override
+    public Instant timestamp() {
+      return this.element.getTimestamp();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      if (!(fn instanceof DoFn.RequiresWindowAccess)) {
+        throw new UnsupportedOperationException(
+            "window() is only available in the context of a DoFn marked as RequiresWindow.");
+      }
+
+      Collection<? extends BoundedWindow> windows = this.element.getWindows();
+      if (windows.size() != 1) {
+        throw new IllegalArgumentException("Each element is expected to belong to 1 window. " +
+            "This belongs to " + windows.size() + ".");
+      }
+      return windows.iterator().next();
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return this.element.getPane();
+    }
+
+    @Override
+    public WindowingInternals<IN, OUTDF> windowingInternals() {
+      return windowingInternalsHelper(element, collector);
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return options;
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      throw new RuntimeException("sideInput() is not supported in Streaming mode.");
+    }
+
+    @Override
+    public void output(OUTDF output) {
+      outputWithTimestamp(output, this.element.getTimestamp());
+    }
+
+    @Override
+    public void outputWithTimestamp(OUTDF output, Instant timestamp) {
+      outputWithTimestampHelper(element, output, timestamp, collector);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      sideOutputWithTimestamp(tag, output, this.element.getTimestamp());
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      sideOutputWithTimestampHelper(element, output, timestamp, collector, tag);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      Accumulator acc = getRuntimeContext().getAccumulator(name);
+      if (acc != null) {
+        AccumulatorHelper.compareAccumulatorTypes(name,
+            SerializableFnAggregatorWrapper.class, acc.getClass());
+        return (Aggregator<AggInputT, AggOutputT>) acc;
+      }
+
+      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator =
+          new SerializableFnAggregatorWrapper<>(combiner);
+      getRuntimeContext().addAccumulator(name, accumulator);
+      return accumulator;
+    }
+  }
+
+  protected void checkTimestamp(WindowedValue<IN> ref, Instant timestamp) {
+    if (timestamp.isBefore(ref.getTimestamp().minus(doFn.getAllowedTimestampSkew()))) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot output with timestamp %s. Output timestamps must be no earlier than the "
+              + "timestamp of the current input (%s) minus the allowed skew (%s). See the "
+              + "DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.",
+          timestamp, ref.getTimestamp(),
+          PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod())));
+    }
+  }
+
+  protected <T> WindowedValue<T> makeWindowedValue(
+      T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+    final Instant inputTimestamp = timestamp;
+    final WindowFn windowFn = windowingStrategy.getWindowFn();
+
+    if (timestamp == null) {
+      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    if (windows == null) {
+      try {
+        windows = windowFn.assignWindows(windowFn.new AssignContext() {
+          @Override
+          public Object element() {
+            throw new UnsupportedOperationException(
+                "WindowFn attempted to access input element when none was available");
+          }
+
+          @Override
+          public Instant timestamp() {
+            if (inputTimestamp == null) {
+              throw new UnsupportedOperationException(
+                  "WindowFn attempted to access input timestamp when none was available");
+            }
+            return inputTimestamp;
+          }
+
+          @Override
+          public Collection<? extends BoundedWindow> windows() {
+            throw new UnsupportedOperationException(
+                "WindowFn attempted to access input windows when none were available");
+          }
+        });
+      } catch (Exception e) {
+        throw UserCodeException.wrap(e);
+      }
+    }
+
+    return WindowedValue.of(output, timestamp, windows, pane);
+  }
+
+  ///////////      ABSTRACT METHODS TO BE IMPLEMENTED BY SUBCLASSES      /////////////////
+
+  public abstract void outputWithTimestampHelper(
+      WindowedValue<IN> inElement,
+      OUTDF output,
+      Instant timestamp,
+      Collector<WindowedValue<OUTFL>> outCollector);
+
+  public abstract <T> void sideOutputWithTimestampHelper(
+      WindowedValue<IN> inElement,
+      T output,
+      Instant timestamp,
+      Collector<WindowedValue<OUTFL>> outCollector,
+      TupleTag<T> tag);
+
+  public abstract WindowingInternals<IN, OUTDF> windowingInternalsHelper(
+      WindowedValue<IN> inElement,
+      Collector<WindowedValue<OUTFL>> outCollector);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
new file mode 100644
index 0000000..906a399
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -0,0 +1,629 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.*;
+import com.google.cloud.dataflow.sdk.coders.*;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.*;
+import com.google.cloud.dataflow.sdk.values.*;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.operators.*;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * This class is the key class implementing all the windowing/triggering logic of Apache Beam.
+ * To provide full compatibility and support for all the windowing/triggering combinations offered by
+ * Beam, we opted for a strategy that uses the SDK's code for doing these operations. See the code in
+ * ({@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}.
+ * <p/>
+ * In a nutshell, when the execution arrives to this operator, we expect to have a stream <b>already
+ * grouped by key</b>. Each of the elements that enter here, registers a timer
+ * (see {@link TimerInternals#setTimer(TimerInternals.TimerData)} in the
+ * {@link FlinkGroupAlsoByWindowWrapper#activeTimers}.
+ * This is essentially a timestamp indicating when to trigger the computation over the window this
+ * element belongs to.
+ * <p/>
+ * When a watermark arrives, all the registered timers are checked to see which ones are ready to
+ * fire (see {@link FlinkGroupAlsoByWindowWrapper#processWatermark(Watermark)}). These are deregistered from
+ * the {@link FlinkGroupAlsoByWindowWrapper#activeTimers}
+ * list, and are fed into the {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}
+ * for furhter processing.
+ */
+public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
+    extends AbstractStreamOperator<WindowedValue<KV<K, VOUT>>>
+    implements OneInputStreamOperator<WindowedValue<KV<K, VIN>>, WindowedValue<KV<K, VOUT>>> {
+
+  private static final long serialVersionUID = 1L;
+
+  private transient PipelineOptions options;
+
+  private transient CoderRegistry coderRegistry;
+
+  private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator;
+
+  private ProcessContext context;
+
+  private final WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy;
+
+  private final Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn;
+
+  private final KvCoder<K, VIN> inputKvCoder;
+
+  /**
+   * State is kept <b>per-key</b>. This data structure keeps this mapping between an active key, i.e. a
+   * key whose elements are currently waiting to be processed, and its associated state.
+   */
+  private Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>();
+
+  /**
+   * Timers waiting to be processed.
+   */
+  private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+
+  private FlinkTimerInternals timerInternals = new FlinkTimerInternals();
+
+  /**
+   * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy.
+   * This method assumes that <b>elements are already grouped by key</b>.
+   * <p/>
+   * The difference with {@link #createForIterable(PipelineOptions, PCollection, KeyedStream)}
+   * is that this method assumes that a combiner function is provided
+   * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
+   * A combiner helps at increasing the speed and, in most of the cases, reduce the per-window state.
+   *
+   * @param options            the general job configuration options.
+   * @param input              the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}.
+   * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key.
+   * @param combiner           the combiner to be used.
+   * @param outputKvCoder      the type of the output values.
+   */
+  public static <K, VIN, VACC, VOUT> DataStream<WindowedValue<KV<K, VOUT>>> create(
+      PipelineOptions options,
+      PCollection input,
+      KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey,
+      Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner,
+      KvCoder<K, VOUT> outputKvCoder) {
+    Preconditions.checkNotNull(options);
+
+    KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
+    FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper<>(options,
+        input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, combiner);
+
+    Coder<WindowedValue<KV<K, VOUT>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of(
+        outputKvCoder,
+        input.getWindowingStrategy().getWindowFn().windowCoder());
+
+    CoderTypeInformation<WindowedValue<KV<K, VOUT>>> outputTypeInfo =
+        new CoderTypeInformation<>(windowedOutputElemCoder);
+
+    DataStream<WindowedValue<KV<K, VOUT>>> groupedByKeyAndWindow = groupedStreamByKey
+        .transform("GroupByWindowWithCombiner",
+            new CoderTypeInformation<>(outputKvCoder),
+            windower)
+        .returns(outputTypeInfo);
+
+    return groupedByKeyAndWindow;
+  }
+
+  /**
+   * Creates an DataStream where elements are grouped in windows based on the specified windowing strategy.
+   * This method assumes that <b>elements are already grouped by key</b>.
+   * <p/>
+   * The difference with {@link #create(PipelineOptions, PCollection, KeyedStream, Combine.KeyedCombineFn, KvCoder)}
+   * is that this method assumes no combiner function
+   * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
+   *
+   * @param options            the general job configuration options.
+   * @param input              the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}.
+   * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key.
+   */
+  public static <K, VIN> DataStream<WindowedValue<KV<K, Iterable<VIN>>>> createForIterable(
+      PipelineOptions options,
+      PCollection input,
+      KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey) {
+    Preconditions.checkNotNull(options);
+
+    KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
+    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+    Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
+
+    FlinkGroupAlsoByWindowWrapper windower = new FlinkGroupAlsoByWindowWrapper(options,
+        input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), inputKvCoder, null);
+
+    Coder<Iterable<VIN>> valueIterCoder = IterableCoder.of(inputValueCoder);
+    KvCoder<K, Iterable<VIN>> outputElemCoder = KvCoder.of(keyCoder, valueIterCoder);
+
+    Coder<WindowedValue<KV<K, Iterable<VIN>>>> windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of(
+        outputElemCoder,
+        input.getWindowingStrategy().getWindowFn().windowCoder());
+
+    CoderTypeInformation<WindowedValue<KV<K, Iterable<VIN>>>> outputTypeInfo =
+        new CoderTypeInformation<>(windowedOutputElemCoder);
+
+    DataStream<WindowedValue<KV<K, Iterable<VIN>>>> groupedByKeyAndWindow = groupedStreamByKey
+        .transform("GroupByWindow",
+            new CoderTypeInformation<>(windowedOutputElemCoder),
+            windower)
+        .returns(outputTypeInfo);
+
+    return groupedByKeyAndWindow;
+  }
+
+  public static <K, VIN, VACC, VOUT> FlinkGroupAlsoByWindowWrapper
+  createForTesting(PipelineOptions options,
+                   CoderRegistry registry,
+                   WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
+                   KvCoder<K, VIN> inputCoder,
+                   Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
+    Preconditions.checkNotNull(options);
+
+    return new FlinkGroupAlsoByWindowWrapper(options, registry, windowingStrategy, inputCoder, combiner);
+  }
+
+  private FlinkGroupAlsoByWindowWrapper(PipelineOptions options,
+                                        CoderRegistry registry,
+                                        WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy,
+                                        KvCoder<K, VIN> inputCoder,
+                                        Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
+    Preconditions.checkNotNull(options);
+
+    this.options = Preconditions.checkNotNull(options);
+    this.coderRegistry = Preconditions.checkNotNull(registry);
+    this.inputKvCoder = Preconditions.checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder();
+    this.windowingStrategy = Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy();
+    this.combineFn = combiner;
+    this.operator = createGroupAlsoByWindowOperator();
+    this.chainingStrategy = ChainingStrategy.ALWAYS;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+    this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals);
+  }
+
+  /**
+   * Create the adequate {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn},
+   * <b> if not already created</b>.
+   * If a {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn} was provided, then
+   * a function with that combiner is created, so that elements are combined as they arrive. This is
+   * done for speed and (in most of the cases) for reduction of the per-window state.
+   */
+  private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> createGroupAlsoByWindowOperator() {
+    if (this.operator == null) {
+      if (this.combineFn == null) {
+        // Thus VOUT == Iterable<VIN>
+        Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
+
+        this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create(
+            (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder));
+      } else {
+        Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+
+        AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = AppliedCombineFn
+            .withInputCoder(combineFn, coderRegistry, inputKvCoder);
+
+        this.operator = GroupAlsoByWindowViaWindowSetDoFn.create(
+            (WindowingStrategy<?, W>) this.windowingStrategy, SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, appliedCombineFn));
+      }
+    }
+    return this.operator;
+  }
+
+  private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception {
+    context.setElement(workItem, getStateInternalsForKey(workItem.key()));
+
+    // TODO: Ideally startBundle/finishBundle would be called when the operator is first used / about to be discarded.
+    operator.startBundle(context);
+    operator.processElement(context);
+    operator.finishBundle(context);
+  }
+
+  @Override
+  public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element) throws Exception {
+    ArrayList<WindowedValue<VIN>> elements = new ArrayList<>();
+    elements.add(WindowedValue.of(element.getValue().getValue().getValue(), element.getValue().getTimestamp(),
+        element.getValue().getWindows(), element.getValue().getPane()));
+    processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(), elements));
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    context.setCurrentInputWatermark(new Instant(mark.getTimestamp()));
+
+    Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp());
+    if (!timers.isEmpty()) {
+      for (K key : timers.keySet()) {
+        processKeyedWorkItem(KeyedWorkItems.<K, VIN>timersWorkItem(key, timers.get(key)));
+      }
+    }
+
+    /**
+     * This is to take into account the different semantics of the Watermark in Flink and
+     * in Dataflow. To understand the reasoning behind the Dataflow semantics and its
+     * watermark holding logic, see the documentation of
+     * {@link WatermarkHold#addHold(ReduceFn.ProcessValueContext, boolean)}
+     * */
+    long millis = Long.MAX_VALUE;
+    for (FlinkStateInternals state : perKeyStateInternals.values()) {
+      Instant watermarkHold = state.getWatermarkHold();
+      if (watermarkHold != null && watermarkHold.getMillis() < millis) {
+        millis = watermarkHold.getMillis();
+      }
+    }
+
+    if (mark.getTimestamp() < millis) {
+      millis = mark.getTimestamp();
+    }
+
+    context.setCurrentOutputWatermark(new Instant(millis));
+
+    // Don't forget to re-emit the watermark for further operators down the line.
+    // This is critical for jobs with multiple aggregation steps.
+    // Imagine a job with a groupByKey() on key K1, followed by a map() that changes
+    // the key K1 to K2, and another groupByKey() on K2. In this case, if the watermark
+    // is not re-emitted, the second aggregation would never be triggered, and no result
+    // will be produced.
+    output.emitWatermark(new Watermark(millis));
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+  }
+
+  private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
+    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+    if (timersForKey == null) {
+      timersForKey = new HashSet<>();
+    }
+    timersForKey.add(timer);
+    activeTimers.put(key, timersForKey);
+  }
+
+  private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
+    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+    if (timersForKey != null) {
+      timersForKey.remove(timer);
+      if (timersForKey.isEmpty()) {
+        activeTimers.remove(key);
+      } else {
+        activeTimers.put(key, timersForKey);
+      }
+    }
+  }
+
+  /**
+   * Returns the list of timers that are ready to fire. These are the timers
+   * that are registered to be triggered at a time before the current watermark.
+   * We keep these timers in a Set, so that they are deduplicated, as the same
+   * timer can be registered multiple times.
+   */
+  private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) {
+
+    // we keep the timers to return in a different list and launch them later
+    // because we cannot prevent a trigger from registering another trigger,
+    // which would lead to concurrent modification exception.
+    Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create();
+
+    Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
+
+      Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
+      while (timerIt.hasNext()) {
+        TimerInternals.TimerData timerData = timerIt.next();
+        if (timerData.getTimestamp().isBefore(currentWatermark)) {
+          toFire.put(keyWithTimers.getKey(), timerData);
+          timerIt.remove();
+        }
+      }
+
+      if (keyWithTimers.getValue().isEmpty()) {
+        it.remove();
+      }
+    }
+    return toFire;
+  }
+
+  /**
+   * Gets the state associated with the specified key.
+   *
+   * @param key the key whose state we want.
+   * @return The {@link FlinkStateInternals}
+   * associated with that key.
+   */
+  private FlinkStateInternals<K> getStateInternalsForKey(K key) {
+    FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key);
+    if (stateInternals == null) {
+      Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
+      OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getWindowFn().getOutputTimeFn();
+      stateInternals = new FlinkStateInternals<>(key, inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn);
+      perKeyStateInternals.put(key, stateInternals);
+    }
+    return stateInternals;
+  }
+
+  private class FlinkTimerInternals extends AbstractFlinkTimerInternals<K, VIN> {
+    @Override
+    public void setTimer(TimerData timerKey) {
+      registerActiveTimer(context.element().key(), timerKey);
+    }
+
+    @Override
+    public void deleteTimer(TimerData timerKey) {
+      unregisterActiveTimer(context.element().key(), timerKey);
+    }
+  }
+
+  private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, VIN, VOUT, ?, KeyedWorkItem<K, VIN>>.ProcessContext {
+
+    private final FlinkTimerInternals timerInternals;
+
+    private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector;
+
+    private FlinkStateInternals<K> stateInternals;
+
+    private KeyedWorkItem<K, VIN> element;
+
+    public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function,
+                          TimestampedCollector<WindowedValue<KV<K, VOUT>>> outCollector,
+                          FlinkTimerInternals timerInternals) {
+      function.super();
+      super.setupDelegateAggregators();
+
+      this.collector = Preconditions.checkNotNull(outCollector);
+      this.timerInternals = Preconditions.checkNotNull(timerInternals);
+    }
+
+    public void setElement(KeyedWorkItem<K, VIN> element,
+                           FlinkStateInternals<K> stateForKey) {
+      this.element = element;
+      this.stateInternals = stateForKey;
+    }
+
+    public void setCurrentInputWatermark(Instant watermark) {
+      this.timerInternals.setCurrentInputWatermark(watermark);
+    }
+
+    public void setCurrentOutputWatermark(Instant watermark) {
+      this.timerInternals.setCurrentOutputWatermark(watermark);
+    }
+
+    @Override
+    public KeyedWorkItem<K, VIN> element() {
+      return this.element;
+    }
+
+    @Override
+    public Instant timestamp() {
+      throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      // TODO: PipelineOptions need to be available on the workers.
+      // Ideally they are captured as part of the pipeline.
+      // For now, construct empty options so that StateContexts.createFromComponents
+      // will yield a valid StateContext, which is needed to support the StateContext.window().
+      if (options == null) {
+        options = new PipelineOptions() {
+          @Override
+          public <T extends PipelineOptions> T as(Class<T> kls) {
+            return null;
+          }
+
+          @Override
+          public <T extends PipelineOptions> T cloneAs(Class<T> kls) {
+            return null;
+          }
+
+          @Override
+          public Class<? extends PipelineRunner<?>> getRunner() {
+            return null;
+          }
+
+          @Override
+          public void setRunner(Class<? extends PipelineRunner<?>> kls) {
+
+          }
+
+          @Override
+          public CheckEnabled getStableUniqueNames() {
+            return null;
+          }
+
+          @Override
+          public void setStableUniqueNames(CheckEnabled enabled) {
+          }
+        };
+      }
+      return options;
+    }
+
+    @Override
+    public void output(KV<K, VOUT> output) {
+      throw new UnsupportedOperationException(
+          "output() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public void outputWithTimestamp(KV<K, VOUT> output, Instant timestamp) {
+      throw new UnsupportedOperationException(
+          "outputWithTimestamp() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public PaneInfo pane() {
+      throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public BoundedWindow window() {
+      throw new UnsupportedOperationException(
+          "window() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>> windowingInternals() {
+      return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() {
+
+        @Override
+        public com.google.cloud.dataflow.sdk.util.state.StateInternals stateInternals() {
+          return stateInternals;
+        }
+
+        @Override
+        public void outputWindowedValue(KV<K, VOUT> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+          // TODO: No need to represent timestamp twice.
+          collector.setAbsoluteTimestamp(timestamp.getMillis());
+          collector.collect(WindowedValue.of(output, timestamp, windows, pane));
+
+        }
+
+        @Override
+        public TimerInternals timerInternals() {
+          return timerInternals;
+        }
+
+        @Override
+        public Collection<? extends BoundedWindow> windows() {
+          throw new UnsupportedOperationException("windows() is not available in Streaming mode.");
+        }
+
+        @Override
+        public PaneInfo pane() {
+          throw new UnsupportedOperationException("pane() is not available in Streaming mode.");
+        }
+
+        @Override
+        public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+          throw new RuntimeException("writePCollectionViewData() not available in Streaming mode.");
+        }
+
+        @Override
+        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+          throw new RuntimeException("sideInput() is not available in Streaming mode.");
+        }
+      };
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      throw new RuntimeException("sideInput() is not supported in Streaming mode.");
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      // ignore the side output, this can happen when a user does not register
+      // side outputs but then outputs using a freshly created TupleTag.
+      throw new RuntimeException("sideOutput() is not available when grouping by window.");
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      sideOutput(tag, output);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      Accumulator acc = getRuntimeContext().getAccumulator(name);
+      if (acc != null) {
+        AccumulatorHelper.compareAccumulatorTypes(name,
+            SerializableFnAggregatorWrapper.class, acc.getClass());
+        return (Aggregator<AggInputT, AggOutputT>) acc;
+      }
+
+      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator =
+          new SerializableFnAggregatorWrapper<>(combiner);
+      getRuntimeContext().addAccumulator(name, accumulator);
+      return accumulator;
+    }
+  }
+
+  //////////////        Checkpointing implementation        ////////////////
+
+  @Override
+  public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+    StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+    AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+    StateCheckpointWriter writer = StateCheckpointWriter.create(out);
+    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+
+    // checkpoint the timers
+    StateCheckpointUtils.encodeTimers(activeTimers, writer, keyCoder);
+
+    // checkpoint the state
+    StateCheckpointUtils.encodeState(perKeyStateInternals, writer, keyCoder);
+
+    // checkpoint the timerInternals
+    context.timerInternals.encodeTimerInternals(context, writer,
+        inputKvCoder, windowingStrategy.getWindowFn().windowCoder());
+
+    taskState.setOperatorState(out.closeAndGetHandle());
+    return taskState;
+  }
+
+  @Override
+  public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception {
+    super.restoreState(taskState, recoveryTimestamp);
+
+    final ClassLoader userClassloader = getUserCodeClassloader();
+
+    Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
+    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+
+    @SuppressWarnings("unchecked")
+    StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();
+    DataInputView in = inputState.getState(userClassloader);
+    StateCheckpointReader reader = new StateCheckpointReader(in);
+
+    // restore the timers
+    this.activeTimers = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder);
+
+    // restore the state
+    this.perKeyStateInternals = StateCheckpointUtils.decodeState(
+        reader, windowingStrategy.getOutputTimeFn(), keyCoder, windowCoder, userClassloader);
+
+    // restore the timerInternals.
+    this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
new file mode 100644
index 0000000..61953a6
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.VoidCoder;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.KV;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+
+/**
+ * This class groups the elements by key. It assumes that already the incoming stream
+ * is composed of <code>[Key,Value]</code> pairs.
+ * */
+public class FlinkGroupByKeyWrapper {
+
+  /**
+   * Just an auxiliary interface to bypass the fact that java anonymous classes cannot implement
+   * multiple interfaces.
+   */
+  private interface KeySelectorWithQueryableResultType<K, V> extends KeySelector<WindowedValue<KV<K, V>>, K>, ResultTypeQueryable<K> {
+  }
+
+  public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, KvCoder<K, V> inputKvCoder) {
+    final Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+    final TypeInformation<K> keyTypeInfo = new CoderTypeInformation<>(keyCoder);
+    final boolean isKeyVoid = keyCoder instanceof VoidCoder;
+
+    return inputDataStream.keyBy(
+        new KeySelectorWithQueryableResultType<K, V>() {
+
+          @Override
+          public K getKey(WindowedValue<KV<K, V>> value) throws Exception {
+            return isKeyVoid ? (K) VoidCoderTypeSerializer.VoidValue.INSTANCE :
+                value.getValue().getKey();
+          }
+
+          @Override
+          public TypeInformation<K> getProducedType() {
+            return keyTypeInfo;
+          }
+        });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
new file mode 100644
index 0000000..cdf23f6
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowingInternals;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.base.Preconditions;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.util.Map;
+
+/**
+ * A wrapper for the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} Beam transformation.
+ * */
+public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, RawUnionValue> {
+
+  private final TupleTag<?> mainTag;
+  private final Map<TupleTag<?>, Integer> outputLabels;
+
+  public FlinkParDoBoundMultiWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> mainTag, Map<TupleTag<?>, Integer> tagsToLabels) {
+    super(options, windowingStrategy, doFn);
+    this.mainTag = Preconditions.checkNotNull(mainTag);
+    this.outputLabels = Preconditions.checkNotNull(tagsToLabels);
+  }
+
+  @Override
+  public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector) {
+    checkTimestamp(inElement, timestamp);
+    Integer index = outputLabels.get(mainTag);
+    collector.collect(makeWindowedValue(
+        new RawUnionValue(index, output),
+        timestamp,
+        inElement.getWindows(),
+        inElement.getPane()));
+  }
+
+  @Override
+  public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector, TupleTag<T> tag) {
+    checkTimestamp(inElement, timestamp);
+    Integer index = outputLabels.get(tag);
+    if (index != null) {
+      collector.collect(makeWindowedValue(
+          new RawUnionValue(index, output),
+          timestamp,
+          inElement.getWindows(),
+          inElement.getPane()));
+    }
+  }
+
+  @Override
+  public WindowingInternals<IN, OUT> windowingInternalsHelper(WindowedValue<IN> inElement, Collector<WindowedValue<RawUnionValue>> outCollector) {
+    throw new RuntimeException("FlinkParDoBoundMultiWrapper is just an internal operator serving as " +
+        "an intermediate transformation for the ParDo.BoundMulti translation. windowingInternals() " +
+        "is not available in this class.");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
new file mode 100644
index 0000000..3357cd5
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.*;
+import com.google.cloud.dataflow.sdk.util.state.StateInternals;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * A wrapper for the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.Bound} Beam transformation.
+ * */
+public class FlinkParDoBoundWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, OUT> {
+
+  public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) {
+    super(options, windowingStrategy, doFn);
+  }
+
+  @Override
+  public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT output, Instant timestamp, Collector<WindowedValue<OUT>> collector) {
+    checkTimestamp(inElement, timestamp);
+    collector.collect(makeWindowedValue(
+        output,
+        timestamp,
+        inElement.getWindows(),
+        inElement.getPane()));
+  }
+
+  @Override
+  public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T output, Instant timestamp, Collector<WindowedValue<OUT>> outCollector, TupleTag<T> tag) {
+    // ignore the side output, this can happen when a user does not register
+    // side outputs but then outputs using a freshly created TupleTag.
+    throw new RuntimeException("sideOutput() not not available in ParDo.Bound().");
+  }
+
+  @Override
+  public WindowingInternals<IN, OUT> windowingInternalsHelper(final WindowedValue<IN> inElement, final Collector<WindowedValue<OUT>> collector) {
+    return new WindowingInternals<IN, OUT>() {
+      @Override
+      public StateInternals stateInternals() {
+        throw new NullPointerException("StateInternals are not available for ParDo.Bound().");
+      }
+
+      @Override
+      public void outputWindowedValue(OUT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+        collector.collect(makeWindowedValue(output, timestamp, windows, pane));
+      }
+
+      @Override
+      public TimerInternals timerInternals() {
+        throw new NullPointerException("TimeInternals are not available for ParDo.Bound().");
+      }
+
+      @Override
+      public Collection<? extends BoundedWindow> windows() {
+        return inElement.getWindows();
+      }
+
+      @Override
+      public PaneInfo pane() {
+        return inElement.getPane();
+      }
+
+      @Override
+      public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+        throw new RuntimeException("writePCollectionViewData() not supported in Streaming mode.");
+      }
+
+      @Override
+      public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
+        throw new RuntimeException("sideInput() not implemented.");
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
new file mode 100644
index 0000000..2599e88
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.io.ByteArrayInputStream;
+import java.util.List;
+
+/**
+ * This flat map function bootstraps from collection elements and turns them into WindowedValues
+ * (as required by the Flink runner).
+ */
+public class FlinkStreamingCreateFunction<IN, OUT> implements FlatMapFunction<IN, WindowedValue<OUT>> {
+
+  private final List<byte[]> elements;
+  private final Coder<OUT> coder;
+
+  public FlinkStreamingCreateFunction(List<byte[]> elements, Coder<OUT> coder) {
+    this.elements = elements;
+    this.coder = coder;
+  }
+
+  @Override
+  public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception {
+
+    @SuppressWarnings("unchecked")
+    OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE;
+    for (byte[] element : elements) {
+      ByteArrayInputStream bai = new ByteArrayInputStream(element);
+      OUT outValue = coder.decode(bai, Coder.Context.OUTER);
+
+      if (outValue == null) {
+        out.collect(WindowedValue.of(voidValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+      } else {
+        out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+      }
+    }
+
+    out.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
new file mode 100644
index 0000000..ddbc993
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.common.base.Preconditions;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * A wrapper translating Flink Sources implementing the {@link RichParallelSourceFunction} interface, into
+ * unbounded Beam sources (see {@link UnboundedSource}).
+ * */
+public class UnboundedFlinkSource<T, C extends UnboundedSource.CheckpointMark> extends UnboundedSource<T, C> {
+
+  private final PipelineOptions options;
+  private final RichParallelSourceFunction<T> flinkSource;
+
+  public UnboundedFlinkSource(PipelineOptions pipelineOptions, RichParallelSourceFunction<T> source) {
+    if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) {
+      throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+    }
+    options = Preconditions.checkNotNull(pipelineOptions);
+    flinkSource = Preconditions.checkNotNull(source);
+    validate();
+  }
+
+  public RichParallelSourceFunction<T> getFlinkSource() {
+    return this.flinkSource;
+  }
+
+  @Override
+  public List<? extends UnboundedSource<T, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
+    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+  }
+
+  @Override
+  public UnboundedReader<T> createReader(PipelineOptions options, @Nullable C checkpointMark) {
+    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+  }
+
+  @Nullable
+  @Override
+  public Coder<C> getCheckpointMarkCoder() {
+    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+  }
+
+
+  @Override
+  public void validate() {
+    Preconditions.checkNotNull(options);
+    Preconditions.checkNotNull(flinkSource);
+    if(!options.getRunner().equals(FlinkPipelineRunner.class)) {
+      throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+    }
+  }
+
+  @Override
+  public Coder<T> getDefaultOutputCoder() {
+    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
new file mode 100644
index 0000000..a24964a
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * An example unbounded Beam source that reads input from a socket. This is used mainly for testing and debugging.
+ * */
+public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> extends UnboundedSource<String, C> {
+
+  private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of();
+
+  private static final long serialVersionUID = 1L;
+
+  private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
+
+  private static final int CONNECTION_TIMEOUT_TIME = 0;
+
+  private final String hostname;
+  private final int port;
+  private final char delimiter;
+  private final long maxNumRetries;
+  private final long delayBetweenRetries;
+
+  public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries) {
+    this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
+  }
+
+  public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) {
+    this.hostname = hostname;
+    this.port = port;
+    this.delimiter = delimiter;
+    this.maxNumRetries = maxNumRetries;
+    this.delayBetweenRetries = delayBetweenRetries;
+  }
+
+  public String getHostname() {
+    return this.hostname;
+  }
+
+  public int getPort() {
+    return this.port;
+  }
+
+  public char getDelimiter() {
+    return this.delimiter;
+  }
+
+  public long getMaxNumRetries() {
+    return this.maxNumRetries;
+  }
+
+  public long getDelayBetweenRetries() {
+    return this.delayBetweenRetries;
+  }
+
+  @Override
+  public List<? extends UnboundedSource<String, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
+    return Collections.<UnboundedSource<String, C>>singletonList(this);
+  }
+
+  @Override
+  public UnboundedReader<String> createReader(PipelineOptions options, @Nullable C checkpointMark) {
+    return new UnboundedSocketReader(this);
+  }
+
+  @Nullable
+  @Override
+  public Coder getCheckpointMarkCoder() {
+    // Flink and Dataflow have different checkpointing mechanisms.
+    // In our case we do not need a coder.
+    return null;
+  }
+
+  @Override
+  public void validate() {
+    checkArgument(port > 0 && port < 65536, "port is out of range");
+    checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
+    checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
+  }
+
+  @Override
+  public Coder getDefaultOutputCoder() {
+    return DEFAULT_SOCKET_CODER;
+  }
+
+  public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> implements Serializable {
+
+    private static final long serialVersionUID = 7526472295622776147L;
+    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
+
+    private final UnboundedSocketSource source;
+
+    private Socket socket;
+    private BufferedReader reader;
+
+    private boolean isRunning;
+
+    private String currentRecord;
+
+    public UnboundedSocketReader(UnboundedSocketSource source) {
+      this.source = source;
+    }
+
+    private void openConnection() throws IOException {
+      this.socket = new Socket();
+      this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), CONNECTION_TIMEOUT_TIME);
+      this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
+      this.isRunning = true;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      int attempt = 0;
+      while (!isRunning) {
+        try {
+          openConnection();
+          LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort());
+
+          return advance();
+        } catch (IOException e) {
+          LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs...");
+
+          if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) {
+            try {
+              Thread.sleep(this.source.getDelayBetweenRetries());
+            } catch (InterruptedException e1) {
+              e1.printStackTrace();
+            }
+          } else {
+            this.isRunning = false;
+            break;
+          }
+        }
+      }
+      LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort());
+      return false;
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      final StringBuilder buffer = new StringBuilder();
+      int data;
+      while (isRunning && (data = reader.read()) != -1) {
+        // check if the string is complete
+        if (data != this.source.getDelimiter()) {
+          buffer.append((char) data);
+        } else {
+          if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
+            buffer.setLength(buffer.length() - 1);
+          }
+          this.currentRecord = buffer.toString();
+          buffer.setLength(0);
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public byte[] getCurrentRecordId() throws NoSuchElementException {
+      return new byte[0];
+    }
+
+    @Override
+    public String getCurrent() throws NoSuchElementException {
+      return this.currentRecord;
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      return Instant.now();
+    }
+
+    @Override
+    public void close() throws IOException {
+      this.reader.close();
+      this.socket.close();
+      this.isRunning = false;
+      LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + ".");
+    }
+
+    @Override
+    public Instant getWatermark() {
+      return Instant.now();
+    }
+
+    @Override
+    public CheckpointMark getCheckpointMark() {
+      return null;
+    }
+
+    @Override
+    public UnboundedSource<String, ?> getCurrentSource() {
+      return this.source;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
new file mode 100644
index 0000000..7c1ccdf
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.joda.time.Instant;
+
+/**
+ * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded}
+ * interface.
+ *
+ *</p>
+ * For now we support non-parallel, not checkpointed sources.
+ * */
+public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements Triggerable {
+
+  private final String name;
+  private final UnboundedSource.UnboundedReader<T> reader;
+
+  private StreamingRuntimeContext runtime = null;
+  private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;
+
+  private volatile boolean isRunning = false;
+
+  public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) {
+    this.name = transform.getName();
+    this.reader = transform.getSource().createReader(options, null);
+  }
+
+  public String getName() {
+    return this.name;
+  }
+
+  WindowedValue<T> makeWindowedValue(T output, Instant timestamp) {
+    if (timestamp == null) {
+      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+    return WindowedValue.of(output, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+  }
+
+  @Override
+  public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
+    if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
+      throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " +
+          "Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
+    }
+
+    context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
+    runtime = (StreamingRuntimeContext) getRuntimeContext();
+
+    this.isRunning = true;
+    boolean inputAvailable = reader.start();
+
+    setNextWatermarkTimer(this.runtime);
+
+    while (isRunning) {
+
+      while (!inputAvailable && isRunning) {
+        // wait a bit until we retry to pull more records
+        Thread.sleep(50);
+        inputAvailable = reader.advance();
+      }
+
+      if (inputAvailable) {
+
+        // get it and its timestamp from the source
+        T item = reader.getCurrent();
+        Instant timestamp = reader.getCurrentTimestamp();
+
+        // write it to the output collector
+        synchronized (ctx.getCheckpointLock()) {
+          context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
+        }
+
+        inputAvailable = reader.advance();
+      }
+
+    }
+  }
+
+  @Override
+  public void cancel() {
+    isRunning = false;
+  }
+
+  @Override
+  public void trigger(long timestamp) throws Exception {
+    if (this.isRunning) {
+      synchronized (context.getCheckpointLock()) {
+        long watermarkMillis = this.reader.getWatermark().getMillis();
+        context.emitWatermark(new Watermark(watermarkMillis));
+      }
+      setNextWatermarkTimer(this.runtime);
+    }
+  }
+
+  private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
+    if (this.isRunning) {
+      long watermarkInterval =  runtime.getExecutionConfig().getAutoWatermarkInterval();
+      long timeToNextWatermark = getTimeToNextWaternark(watermarkInterval);
+      runtime.registerTimer(timeToNextWatermark, this);
+    }
+  }
+
+  private long getTimeToNextWaternark(long watermarkInterval) {
+    return System.currentTimeMillis() + watermarkInterval;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
new file mode 100644
index 0000000..7accf09
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
+
+import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.TimerInternals;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.KV;
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * An implementation of Beam's {@link TimerInternals}, that also provides serialization functionality.
+ * The latter is used when snapshots of the current state are taken, for fault-tolerance.
+ * */
+public abstract class AbstractFlinkTimerInternals<K, VIN> implements TimerInternals, Serializable {
+  private Instant currentInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+  private Instant currentOutputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  public void setCurrentInputWatermark(Instant watermark) {
+    checkIfValidInputWatermark(watermark);
+    this.currentInputWatermark = watermark;
+  }
+
+  public void setCurrentOutputWatermark(Instant watermark) {
+    checkIfValidOutputWatermark(watermark);
+    this.currentOutputWatermark = watermark;
+  }
+
+  private void setCurrentInputWatermarkAfterRecovery(Instant watermark) {
+    if (!currentInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+      throw new RuntimeException("Explicitly setting the input watermark is only allowed on " +
+          "initialization after recovery from a node failure. Apparently this is not " +
+          "the case here as the watermark is already set.");
+    }
+    this.currentInputWatermark = watermark;
+  }
+
+  private void setCurrentOutputWatermarkAfterRecovery(Instant watermark) {
+    if (!currentOutputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+      throw new RuntimeException("Explicitly setting the output watermark is only allowed on " +
+        "initialization after recovery from a node failure. Apparently this is not " +
+        "the case here as the watermark is already set.");
+    }
+    this.currentOutputWatermark = watermark;
+  }
+
+  @Override
+  public Instant currentProcessingTime() {
+    return Instant.now();
+  }
+
+  @Override
+  public Instant currentInputWatermarkTime() {
+    return currentInputWatermark;
+  }
+
+  @Nullable
+  @Override
+  public Instant currentSynchronizedProcessingTime() {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public Instant currentOutputWatermarkTime() {
+    return currentOutputWatermark;
+  }
+
+  private void checkIfValidInputWatermark(Instant newWatermark) {
+    if (currentInputWatermark.isAfter(newWatermark)) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot set current input watermark to %s. Newer watermarks " +
+              "must be no earlier than the current one (%s).",
+          newWatermark, currentInputWatermark));
+    }
+  }
+
+  private void checkIfValidOutputWatermark(Instant newWatermark) {
+    if (currentOutputWatermark.isAfter(newWatermark)) {
+      throw new IllegalArgumentException(String.format(
+        "Cannot set current output watermark to %s. Newer watermarks " +
+          "must be no earlier than the current one (%s).",
+        newWatermark, currentOutputWatermark));
+    }
+  }
+
+  public void encodeTimerInternals(DoFn.ProcessContext context,
+                                   StateCheckpointWriter writer,
+                                   KvCoder<K, VIN> kvCoder,
+                                   Coder<? extends BoundedWindow> windowCoder) throws IOException {
+    if (context == null) {
+      throw new RuntimeException("The Context has not been initialized.");
+    }
+
+    writer.setTimestamp(currentInputWatermark);
+    writer.setTimestamp(currentOutputWatermark);
+  }
+
+  public void restoreTimerInternals(StateCheckpointReader reader,
+                                    KvCoder<K, VIN> kvCoder,
+                                    Coder<? extends BoundedWindow> windowCoder) throws IOException {
+    setCurrentInputWatermarkAfterRecovery(reader.getTimestamp());
+    setCurrentOutputWatermarkAfterRecovery(reader.getTimestamp());
+  }
+}