You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/04/19 13:09:14 UTC

[05/18] beam git commit: [BEAM-1994] Remove Flink examples package

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java
new file mode 100644
index 0000000..e24bf31
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueTypeInformation.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Flink {@link TypeInformation} for Beam values that have been encoded to byte data
+ * by a {@link Coder}.
+ */
+public class EncodedValueTypeInformation
+    extends TypeInformation<byte[]>
+    implements AtomicType<byte[]> {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public boolean isBasicType() {
+    return false;
+  }
+
+  @Override
+  public boolean isTupleType() {
+    return false;
+  }
+
+  @Override
+  public int getArity() {
+    return 0;
+  }
+
+  @Override
+  public int getTotalFields() {
+    return 0;
+  }
+
+  @Override
+  public Class<byte[]> getTypeClass() {
+    return byte[].class;
+  }
+
+  @Override
+  public boolean isKeyType() {
+    return true;
+  }
+
+  @Override
+  public TypeSerializer<byte[]> createSerializer(ExecutionConfig executionConfig) {
+    return new EncodedValueSerializer();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    return other instanceof EncodedValueTypeInformation;
+  }
+
+  @Override
+  public int hashCode() {
+    return this.getClass().hashCode();
+  }
+
+  @Override
+  public boolean canEqual(Object obj) {
+    return obj instanceof EncodedValueTypeInformation;
+  }
+
+  @Override
+  public String toString() {
+    return "EncodedValueTypeInformation";
+  }
+
+  @Override
+  public TypeComparator<byte[]> createComparator(
+      boolean sortOrderAscending,
+      ExecutionConfig executionConfig) {
+    return new EncodedValueComparator(sortOrderAscending);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java
new file mode 100644
index 0000000..36b5ba3
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import java.io.ByteArrayOutputStream;
+
+/**
+ * Version of {@link java.io.ByteArrayOutputStream} that allows to retrieve the internal
+ * byte[] buffer without incurring an array copy.
+ */
+public class InspectableByteArrayOutputStream extends ByteArrayOutputStream {
+
+  /**
+   * Get the underlying byte array.
+   */
+  public byte[] getBuffer() {
+    return buf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java
new file mode 100644
index 0000000..9df6836
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvKeySelector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+/**
+ * {@link KeySelector} that extracts the key from a {@link KV} and returns
+ * it in encoded form as a {@code byte} array.
+ */
+public class KvKeySelector<InputT, K>
+    implements KeySelector<WindowedValue<KV<K, InputT>>, byte[]>, ResultTypeQueryable<byte[]> {
+
+  private final Coder<K> keyCoder;
+
+  public KvKeySelector(Coder<K> keyCoder) {
+    this.keyCoder = keyCoder;
+  }
+
+  @Override
+  public byte[] getKey(WindowedValue<KV<K, InputT>> value) throws Exception {
+    return CoderUtils.encodeToByteArray(keyCoder, value.getValue().getKey());
+  }
+
+  @Override
+  public TypeInformation<byte[]> getProducedType() {
+    return new EncodedValueTypeInformation();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
new file mode 100644
index 0000000..6fb3182
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.types;

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
new file mode 100644
index 0000000..2256bb1
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink.translation.utils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.IOChannelUtils;
+
+/**
+ * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
+ */
+public class SerializedPipelineOptions implements Serializable {
+
+  private final byte[] serializedOptions;
+
+  /** Lazily initialized copy of deserialized options. */
+  private transient PipelineOptions pipelineOptions;
+
+  public SerializedPipelineOptions(PipelineOptions options) {
+    checkNotNull(options, "PipelineOptions must not be null.");
+
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      new ObjectMapper().writeValue(baos, options);
+      this.serializedOptions = baos.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+    }
+
+  }
+
+  public PipelineOptions getPipelineOptions() {
+    if (pipelineOptions == null) {
+      try {
+        pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+
+        IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
+        FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+      } catch (IOException e) {
+        throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
+      }
+    }
+
+    return pipelineOptions;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
new file mode 100644
index 0000000..5dedd53
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.utils;

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java
new file mode 100644
index 0000000..82a2c4e
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.flink.core.memory.DataInputView;
+
+/**
+ * Wrapper for {@link DataInputView}. We need this because Flink reads data using a
+ * {@link org.apache.flink.core.memory.DataInputView} while
+ * Dataflow {@link org.apache.beam.sdk.coders.Coder}s expect an
+ * {@link java.io.InputStream}.
+ */
+public class DataInputViewWrapper extends InputStream {
+
+  private DataInputView inputView;
+
+  public DataInputViewWrapper(DataInputView inputView) {
+    this.inputView = inputView;
+  }
+
+  public void setInputView(DataInputView inputView) {
+    this.inputView = inputView;
+  }
+
+  @Override
+  public int read() throws IOException {
+    try {
+      return inputView.readUnsignedByte();
+    } catch (EOFException e) {
+      // translate between DataInput and InputStream,
+      // DataInput signals EOF by exception, InputStream does it by returning -1
+      return -1;
+    }
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    return inputView.read(b, off, len);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
new file mode 100644
index 0000000..f2d9db2
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * Wrapper for {@link org.apache.flink.core.memory.DataOutputView}. We need this because
+ * Flink writes data using a {@link org.apache.flink.core.memory.DataInputView} while
+ * Dataflow {@link org.apache.beam.sdk.coders.Coder}s expect an
+ * {@link java.io.OutputStream}.
+ */
+public class DataOutputViewWrapper extends OutputStream {
+
+  private DataOutputView outputView;
+
+  public DataOutputViewWrapper(DataOutputView outputView) {
+    this.outputView = outputView;
+  }
+
+  public void setOutputView(DataOutputView outputView) {
+    this.outputView = outputView;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    outputView.write(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    outputView.write(b, off, len);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
new file mode 100644
index 0000000..70d97e3
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.flink.api.common.accumulators.Accumulator;
+
+/**
+ * Wrapper that wraps a {@link org.apache.beam.sdk.transforms.Combine.CombineFn}
+ * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using
+ * the function as an aggregator in a {@link org.apache.beam.sdk.transforms.ParDo}
+ * operation.
+ */
+public class SerializableFnAggregatorWrapper<InputT, OutputT>
+    implements Aggregator<InputT, OutputT>, Accumulator<InputT, Serializable> {
+
+  private OutputT aa;
+  private Combine.CombineFn<InputT, ?, OutputT> combiner;
+
+  public SerializableFnAggregatorWrapper(Combine.CombineFn<InputT, ?, OutputT> combiner) {
+    this.combiner = combiner;
+    resetLocal();
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void add(InputT value) {
+    this.aa = combiner.apply(ImmutableList.of((InputT) aa, value));
+  }
+
+  @Override
+  public Serializable getLocalValue() {
+    return (Serializable) aa;
+  }
+
+  @Override
+  public void resetLocal() {
+    this.aa = combiner.apply(ImmutableList.<InputT>of());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void merge(Accumulator<InputT, Serializable> other) {
+    this.aa = combiner.apply(ImmutableList.of((InputT) aa, (InputT) other.getLocalValue()));
+  }
+
+  @Override
+  public void addValue(InputT value) {
+    add(value);
+  }
+
+  @Override
+  public String getName() {
+    return "Aggregator :" + combiner.toString();
+  }
+
+  @Override
+  public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() {
+    return combiner;
+  }
+
+  @Override
+  public Accumulator<InputT, Serializable> clone() {
+    try {
+      super.clone();
+    } catch (CloneNotSupportedException e) {
+      // Flink Accumulators cannot throw CloneNotSupportedException, work around that.
+      throw new RuntimeException(e);
+    }
+
+    // copy it by merging
+    OutputT resultCopy = combiner.apply(Lists.newArrayList((InputT) aa));
+    SerializableFnAggregatorWrapper<InputT, OutputT> result = new
+        SerializableFnAggregatorWrapper<>(combiner);
+
+    result.aa = resultCopy;
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
new file mode 100644
index 0000000..a87472b
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Wrapper for executing a {@link Source} as a Flink {@link InputFormat}.
+ */
+public class SourceInputFormat<T>
+    implements InputFormat<WindowedValue<T>, SourceInputSplit<T>> {
+  private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);
+
+  private final BoundedSource<T> initialSource;
+
+  private transient PipelineOptions options;
+  private final SerializedPipelineOptions serializedOptions;
+
+  private transient BoundedSource.BoundedReader<T> reader;
+  private boolean inputAvailable = false;
+
+  public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) {
+    this.initialSource = initialSource;
+    this.serializedOptions = new SerializedPipelineOptions(options);
+  }
+
+  @Override
+  public void configure(Configuration configuration) {
+    options = serializedOptions.getPipelineOptions();
+  }
+
+  @Override
+  public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
+    reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);
+    inputAvailable = reader.start();
+  }
+
+  @Override
+  public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
+    try {
+      final long estimatedSize = initialSource.getEstimatedSizeBytes(options);
+
+      return new BaseStatistics() {
+        @Override
+        public long getTotalInputSize() {
+          return estimatedSize;
+        }
+
+        @Override
+        public long getNumberOfRecords() {
+          return BaseStatistics.NUM_RECORDS_UNKNOWN;
+        }
+
+        @Override
+        public float getAverageRecordWidth() {
+          return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN;
+        }
+      };
+    } catch (Exception e) {
+      LOG.warn("Could not read Source statistics: {}", e);
+    }
+
+    return null;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
+    try {
+      long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
+      List<? extends Source<T>> shards =
+          initialSource.split(desiredSizeBytes, options);
+      int numShards = shards.size();
+      SourceInputSplit<T>[] sourceInputSplits = new SourceInputSplit[numShards];
+      for (int i = 0; i < numShards; i++) {
+        sourceInputSplits[i] = new SourceInputSplit<>(shards.get(i), i);
+      }
+      return sourceInputSplits;
+    } catch (Exception e) {
+      throw new IOException("Could not create input splits from Source.", e);
+    }
+  }
+
+  @Override
+  public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) {
+    return new DefaultInputSplitAssigner(sourceInputSplits);
+  }
+
+
+  @Override
+  public boolean reachedEnd() throws IOException {
+    return !inputAvailable;
+  }
+
+  @Override
+  public WindowedValue<T> nextRecord(WindowedValue<T> t) throws IOException {
+    if (inputAvailable) {
+      final T current = reader.getCurrent();
+      final Instant timestamp = reader.getCurrentTimestamp();
+      // advance reader to have a record ready next time
+      inputAvailable = reader.advance();
+      return WindowedValue.of(
+          current,
+          timestamp,
+          GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+    }
+
+    return null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // TODO null check can be removed once FLINK-3796 is fixed
+    if (reader != null) {
+      reader.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java
new file mode 100644
index 0000000..e4a7386
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import org.apache.beam.sdk.io.Source;
+import org.apache.flink.core.io.InputSplit;
+
+/**
+ * {@link org.apache.flink.core.io.InputSplit} for
+ * {@link org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}. We pass
+ * the sharded Source around in the input split because Sources simply split up into several
+ * Sources for sharding. This is different to how Flink creates a separate InputSplit from
+ * an InputFormat.
+ */
+public class SourceInputSplit<T> implements InputSplit {
+
+  private Source<T> source;
+  private int splitNumber;
+
+  public SourceInputSplit() {
+  }
+
+  public SourceInputSplit(Source<T> source, int splitNumber) {
+    this.source = source;
+    this.splitNumber = splitNumber;
+  }
+
+  @Override
+  public int getSplitNumber() {
+    return splitNumber;
+  }
+
+  public Source<T> getSource() {
+    return source;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
new file mode 100644
index 0000000..72f7deb
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
new file mode 100644
index 0000000..8a09286
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -0,0 +1,774 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.AggregatorFactory;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.StatefulDoFnRunner;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.NullSideInputReader;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.HeapInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.joda.time.Instant;
+
+/**
+ * Flink operator for executing {@link DoFn DoFns}.
+ *
+ * @param <InputT> the input type of the {@link DoFn}
+ * @param <FnOutputT> the output type of the {@link DoFn}
+ * @param <OutputT> the output type of the operator, this can be different from the fn output
+ *                 type when we have additional tagged outputs
+ */
+public class DoFnOperator<InputT, FnOutputT, OutputT>
+    extends AbstractStreamOperator<OutputT>
+    implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>,
+      TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT>,
+    KeyGroupCheckpointedOperator, Triggerable<Object, TimerData> {
+
+  protected DoFn<InputT, FnOutputT> doFn;
+
+  protected final SerializedPipelineOptions serializedOptions;
+
+  protected final TupleTag<FnOutputT> mainOutputTag;
+  protected final List<TupleTag<?>> additionalOutputTags;
+
+  protected final Collection<PCollectionView<?>> sideInputs;
+  protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
+
+  protected final WindowingStrategy<?, ?> windowingStrategy;
+
+  protected final OutputManagerFactory<OutputT> outputManagerFactory;
+
+  protected transient DoFnRunner<InputT, FnOutputT> doFnRunner;
+  protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner;
+
+  protected transient SideInputHandler sideInputHandler;
+
+  protected transient SideInputReader sideInputReader;
+
+  protected transient DoFnRunners.OutputManager outputManager;
+
+  private transient DoFnInvoker<InputT, FnOutputT> doFnInvoker;
+
+  protected transient long currentInputWatermark;
+
+  protected transient long currentOutputWatermark;
+
+  private transient StateTag<Object, BagState<WindowedValue<InputT>>> pushedBackTag;
+
+  protected transient FlinkStateInternals<?> stateInternals;
+
+  private Coder<WindowedValue<InputT>> inputCoder;
+
+  private final Coder<?> keyCoder;
+
+  private final TimerInternals.TimerDataCoder timerCoder;
+
+  protected transient HeapInternalTimerService<?, TimerInternals.TimerData> timerService;
+
+  protected transient FlinkTimerInternals timerInternals;
+
+  private transient StateInternals<?> pushbackStateInternals;
+
+  private transient Optional<Long> pushedBackWatermark;
+
+  public DoFnOperator(
+      DoFn<InputT, FnOutputT> doFn,
+      Coder<WindowedValue<InputT>> inputCoder,
+      TupleTag<FnOutputT> mainOutputTag,
+      List<TupleTag<?>> additionalOutputTags,
+      OutputManagerFactory<OutputT> outputManagerFactory,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<Integer, PCollectionView<?>> sideInputTagMapping,
+      Collection<PCollectionView<?>> sideInputs,
+      PipelineOptions options,
+      Coder<?> keyCoder) {
+    this.doFn = doFn;
+    this.inputCoder = inputCoder;
+    this.mainOutputTag = mainOutputTag;
+    this.additionalOutputTags = additionalOutputTags;
+    this.sideInputTagMapping = sideInputTagMapping;
+    this.sideInputs = sideInputs;
+    this.serializedOptions = new SerializedPipelineOptions(options);
+    this.windowingStrategy = windowingStrategy;
+    this.outputManagerFactory = outputManagerFactory;
+
+    setChainingStrategy(ChainingStrategy.ALWAYS);
+
+    this.keyCoder = keyCoder;
+
+    this.timerCoder =
+        TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+  }
+
+  private ExecutionContext.StepContext createStepContext() {
+    return new StepContext();
+  }
+
+  // allow overriding this in WindowDoFnOperator because this one dynamically creates
+  // the DoFn
+  protected DoFn<InputT, FnOutputT> getDoFn() {
+    return doFn;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+
+    currentInputWatermark = Long.MIN_VALUE;
+    currentOutputWatermark = Long.MIN_VALUE;
+
+    AggregatorFactory aggregatorFactory = new AggregatorFactory() {
+      @Override
+      public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+          Class<?> fnClass,
+          ExecutionContext.StepContext stepContext,
+          String aggregatorName,
+          Combine.CombineFn<InputT, AccumT, OutputT> combine) {
+
+        @SuppressWarnings("unchecked")
+        SerializableFnAggregatorWrapper<InputT, OutputT> result =
+            (SerializableFnAggregatorWrapper<InputT, OutputT>)
+                getRuntimeContext().getAccumulator(aggregatorName);
+
+        if (result == null) {
+          result = new SerializableFnAggregatorWrapper<>(combine);
+          getRuntimeContext().addAccumulator(aggregatorName, result);
+        }
+        return result;
+      }
+    };
+
+    sideInputReader = NullSideInputReader.of(sideInputs);
+
+    if (!sideInputs.isEmpty()) {
+
+      pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
+
+      FlinkBroadcastStateInternals sideInputStateInternals =
+          new FlinkBroadcastStateInternals<>(
+              getContainingTask().getIndexInSubtaskGroup(), getOperatorStateBackend());
+
+      sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);
+      sideInputReader = sideInputHandler;
+
+      // maybe init by initializeState
+      if (pushbackStateInternals == null) {
+        if (keyCoder != null) {
+          pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
+              getKeyedStateBackend());
+        } else {
+          pushbackStateInternals =
+              new FlinkSplitStateInternals<Object>(getOperatorStateBackend());
+        }
+      }
+
+      pushedBackWatermark = Optional.absent();
+
+    }
+
+    outputManager = outputManagerFactory.create(output);
+
+    // StatefulPardo or WindowDoFn
+    if (keyCoder != null) {
+      stateInternals = new FlinkStateInternals<>((KeyedStateBackend) getKeyedStateBackend(),
+          keyCoder);
+
+      timerService = (HeapInternalTimerService<?, TimerInternals.TimerData>)
+          getInternalTimerService("beam-timer", new CoderTypeSerializer<>(timerCoder), this);
+
+      timerInternals = new FlinkTimerInternals();
+
+    }
+
+    // WindowDoFnOperator need use state and timer to get DoFn.
+    // So must wait StateInternals and TimerInternals ready.
+    this.doFn = getDoFn();
+    doFnInvoker = DoFnInvokers.invokerFor(doFn);
+
+    doFnInvoker.invokeSetup();
+
+    ExecutionContext.StepContext stepContext = createStepContext();
+
+    doFnRunner = DoFnRunners.simpleRunner(
+        serializedOptions.getPipelineOptions(),
+        doFn,
+        sideInputReader,
+        outputManager,
+        mainOutputTag,
+        additionalOutputTags,
+        stepContext,
+        aggregatorFactory,
+        windowingStrategy);
+
+    if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) {
+      // When the doFn is this, we know it came from WindowDoFnOperator and
+      //   InputT = KeyedWorkItem<K, V>
+      //   OutputT = KV<K, V>
+      //
+      // for some K, V
+
+
+      doFnRunner = DoFnRunners.lateDataDroppingRunner(
+          (DoFnRunner) doFnRunner,
+          stepContext,
+          windowingStrategy,
+          ((GroupAlsoByWindowViaWindowSetNewDoFn) doFn).getDroppedDueToLatenessAggregator());
+    } else if (keyCoder != null) {
+      // It is a stateful DoFn
+
+      StatefulDoFnRunner.CleanupTimer cleanupTimer =
+          new StatefulDoFnRunner.TimeInternalsCleanupTimer(
+              stepContext.timerInternals(), windowingStrategy);
+
+      // we don't know the window type
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
+
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      StatefulDoFnRunner.StateCleaner<?> stateCleaner =
+          new StatefulDoFnRunner.StateInternalsStateCleaner<>(
+              doFn, stepContext.stateInternals(), windowCoder);
+
+      doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
+          doFn,
+          doFnRunner,
+          stepContext,
+          aggregatorFactory,
+          windowingStrategy,
+          cleanupTimer,
+          stateCleaner);
+    }
+
+    pushbackDoFnRunner =
+        SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    doFnInvoker.invokeTeardown();
+  }
+
+  protected final long getPushbackWatermarkHold() {
+    // if we don't have side inputs we never hold the watermark
+    if (sideInputs.isEmpty()) {
+      return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+    }
+
+    try {
+      checkInitPushedBackWatermark();
+      return pushedBackWatermark.get();
+    } catch (Exception e) {
+      throw new RuntimeException("Error retrieving pushed back watermark state.", e);
+    }
+  }
+
+  private void checkInitPushedBackWatermark() {
+    // init and restore from pushedBack state.
+    // Not done in initializeState, because OperatorState is not ready.
+    if (!pushedBackWatermark.isPresent()) {
+
+      BagState<WindowedValue<InputT>> pushedBack =
+          pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+      long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+      for (WindowedValue<InputT> value : pushedBack.read()) {
+        min = Math.min(min, value.getTimestamp().getMillis());
+      }
+      setPushedBackWatermark(min);
+    }
+  }
+
+  @Override
+  public final void processElement(
+      StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
+    doFnRunner.startBundle();
+    doFnRunner.processElement(streamRecord.getValue());
+    doFnRunner.finishBundle();
+  }
+
+  private void setPushedBackWatermark(long watermark) {
+    pushedBackWatermark = Optional.fromNullable(watermark);
+  }
+
+  @Override
+  public final void processElement1(
+      StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
+    pushbackDoFnRunner.startBundle();
+    Iterable<WindowedValue<InputT>> justPushedBack =
+        pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue());
+
+    BagState<WindowedValue<InputT>> pushedBack =
+        pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+    checkInitPushedBackWatermark();
+
+    long min = pushedBackWatermark.get();
+    for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
+      min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
+      pushedBack.add(pushedBackValue);
+    }
+    setPushedBackWatermark(min);
+    pushbackDoFnRunner.finishBundle();
+  }
+
+  @Override
+  public final void processElement2(
+      StreamRecord<RawUnionValue> streamRecord) throws Exception {
+    pushbackDoFnRunner.startBundle();
+
+    @SuppressWarnings("unchecked")
+    WindowedValue<Iterable<?>> value =
+        (WindowedValue<Iterable<?>>) streamRecord.getValue().getValue();
+
+    PCollectionView<?> sideInput = sideInputTagMapping.get(streamRecord.getValue().getUnionTag());
+    sideInputHandler.addSideInputValue(sideInput, value);
+
+    BagState<WindowedValue<InputT>> pushedBack =
+        pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+    List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
+
+    Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
+    if (pushedBackContents != null) {
+      for (WindowedValue<InputT> elem : pushedBackContents) {
+
+        // we need to set the correct key in case the operator is
+        // a (keyed) window operator
+        setKeyContextElement1(new StreamRecord<>(elem));
+
+        Iterable<WindowedValue<InputT>> justPushedBack =
+            pushbackDoFnRunner.processElementInReadyWindows(elem);
+        Iterables.addAll(newPushedBack, justPushedBack);
+      }
+    }
+
+    pushedBack.clear();
+    long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+    for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
+      min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
+      pushedBack.add(pushedBackValue);
+    }
+    setPushedBackWatermark(min);
+
+    pushbackDoFnRunner.finishBundle();
+
+    // maybe output a new watermark
+    processWatermark1(new Watermark(currentInputWatermark));
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    processWatermark1(mark);
+  }
+
+  @Override
+  public void processWatermark1(Watermark mark) throws Exception {
+    if (keyCoder == null) {
+      this.currentInputWatermark = mark.getTimestamp();
+      long potentialOutputWatermark =
+          Math.min(getPushbackWatermarkHold(), currentInputWatermark);
+      if (potentialOutputWatermark > currentOutputWatermark) {
+        currentOutputWatermark = potentialOutputWatermark;
+        output.emitWatermark(new Watermark(currentOutputWatermark));
+      }
+    } else {
+      // fireTimers, so we need startBundle.
+      pushbackDoFnRunner.startBundle();
+
+      this.currentInputWatermark = mark.getTimestamp();
+
+      // hold back by the pushed back values waiting for side inputs
+      long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
+
+      timerService.advanceWatermark(actualInputWatermark);
+
+      Instant watermarkHold = stateInternals.watermarkHold();
+
+      long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());
+
+      long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold);
+
+      if (potentialOutputWatermark > currentOutputWatermark) {
+        currentOutputWatermark = potentialOutputWatermark;
+        output.emitWatermark(new Watermark(currentOutputWatermark));
+      }
+      pushbackDoFnRunner.finishBundle();
+    }
+  }
+
+  @Override
+  public void processWatermark2(Watermark mark) throws Exception {
+    // ignore watermarks from the side-input input
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    // copy from AbstractStreamOperator
+    if (getKeyedStateBackend() != null) {
+      KeyedStateCheckpointOutputStream out;
+
+      try {
+        out = context.getRawKeyedOperatorStateOutput();
+      } catch (Exception exception) {
+        throw new Exception("Could not open raw keyed operator state stream for "
+            + getOperatorName() + '.', exception);
+      }
+
+      try {
+        KeyGroupsList allKeyGroups = out.getKeyGroupList();
+        for (int keyGroupIdx : allKeyGroups) {
+          out.startNewKeyGroup(keyGroupIdx);
+
+          DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
+
+          // if (this instanceof KeyGroupCheckpointedOperator)
+          snapshotKeyGroupState(keyGroupIdx, dov);
+
+          // We can't get all timerServices, so we just snapshot our timerService
+          // Maybe this is a normal DoFn that has no timerService
+          if (keyCoder != null) {
+            timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx);
+          }
+
+        }
+      } catch (Exception exception) {
+        throw new Exception("Could not write timer service of " + getOperatorName()
+            + " to checkpoint state stream.", exception);
+      } finally {
+        try {
+          out.close();
+        } catch (Exception closeException) {
+          LOG.warn("Could not close raw keyed operator state stream for {}. This "
+              + "might have prevented deleting some state data.", getOperatorName(),
+              closeException);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception {
+    if (!sideInputs.isEmpty() && keyCoder != null) {
+      ((FlinkKeyGroupStateInternals) pushbackStateInternals).snapshotKeyGroupState(
+          keyGroupIndex, out);
+    }
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws Exception {
+    if (getKeyedStateBackend() != null) {
+      int totalKeyGroups = getKeyedStateBackend().getNumberOfKeyGroups();
+      KeyGroupsList localKeyGroupRange = getKeyedStateBackend().getKeyGroupRange();
+
+      for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) {
+        DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(streamProvider.getStream());
+
+        int keyGroupIdx = streamProvider.getKeyGroupId();
+        checkArgument(localKeyGroupRange.contains(keyGroupIdx),
+            "Key Group " + keyGroupIdx + " does not belong to the local range.");
+
+        // if (this instanceof KeyGroupRestoringOperator)
+        restoreKeyGroupState(keyGroupIdx, div);
+
+        // We just initialize our timerService
+        if (keyCoder != null) {
+          if (timerService == null) {
+            timerService = new HeapInternalTimerService<>(
+                totalKeyGroups,
+                localKeyGroupRange,
+                this,
+                getRuntimeContext().getProcessingTimeService());
+          }
+          timerService.restoreTimersForKeyGroup(div, keyGroupIdx, getUserCodeClassloader());
+        }
+      }
+    }
+  }
+
+  @Override
+  public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception {
+    if (!sideInputs.isEmpty() && keyCoder != null) {
+      if (pushbackStateInternals == null) {
+        pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
+            getKeyedStateBackend());
+      }
+      ((FlinkKeyGroupStateInternals) pushbackStateInternals)
+          .restoreKeyGroupState(keyGroupIndex, in, getUserCodeClassloader());
+    }
+  }
+
+  @Override
+  public void onEventTime(InternalTimer<Object, TimerData> timer) throws Exception {
+    fireTimer(timer);
+  }
+
+  @Override
+  public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws Exception {
+    fireTimer(timer);
+  }
+
+  // allow overriding this in WindowDoFnOperator
+  public void fireTimer(InternalTimer<?, TimerData> timer) {
+    TimerInternals.TimerData timerData = timer.getNamespace();
+    StateNamespace namespace = timerData.getNamespace();
+    // This is a user timer, so namespace must be WindowNamespace
+    checkArgument(namespace instanceof WindowNamespace);
+    BoundedWindow window = ((WindowNamespace) namespace).getWindow();
+    pushbackDoFnRunner.onTimer(timerData.getTimerId(), window,
+        timerData.getTimestamp(), timerData.getDomain());
+  }
+
+  /**
+   * Factory for creating an {@link DoFnRunners.OutputManager} from
+   * a Flink {@link Output}.
+   */
+  interface OutputManagerFactory<OutputT> extends Serializable {
+    DoFnRunners.OutputManager create(Output<StreamRecord<OutputT>> output);
+  }
+
+  /**
+   * Default implementation of {@link OutputManagerFactory} that creates an
+   * {@link DoFnRunners.OutputManager} that only writes to
+   * a single logical output.
+   */
+  public static class DefaultOutputManagerFactory<OutputT>
+      implements OutputManagerFactory<OutputT> {
+    @Override
+    public DoFnRunners.OutputManager create(final Output<StreamRecord<OutputT>> output) {
+      return new DoFnRunners.OutputManager() {
+        @Override
+        public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
+          // with tagged outputs we can't get around this because we don't
+          // know our own output type...
+          @SuppressWarnings("unchecked")
+          OutputT castValue = (OutputT) value;
+          output.collect(new StreamRecord<>(castValue));
+        }
+      };
+    }
+  }
+
+  /**
+   * Implementation of {@link OutputManagerFactory} that creates an
+   * {@link DoFnRunners.OutputManager} that can write to multiple logical
+   * outputs by unioning them in a {@link RawUnionValue}.
+   */
+  public static class MultiOutputOutputManagerFactory
+      implements OutputManagerFactory<RawUnionValue> {
+
+    Map<TupleTag<?>, Integer> mapping;
+
+    public MultiOutputOutputManagerFactory(Map<TupleTag<?>, Integer> mapping) {
+      this.mapping = mapping;
+    }
+
+    @Override
+    public DoFnRunners.OutputManager create(final Output<StreamRecord<RawUnionValue>> output) {
+      return new DoFnRunners.OutputManager() {
+        @Override
+        public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
+          int intTag = mapping.get(tag);
+          output.collect(new StreamRecord<>(new RawUnionValue(intTag, value)));
+        }
+      };
+    }
+  }
+
+  /**
+   * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow
+   * accessing state or timer internals.
+   */
+  protected class StepContext implements ExecutionContext.StepContext {
+
+    @Override
+    public String getStepName() {
+      return null;
+    }
+
+    @Override
+    public String getTransformName() {
+      return null;
+    }
+
+    @Override
+    public void noteOutput(WindowedValue<?> output) {}
+
+    @Override
+    public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {}
+
+    @Override
+    public <T, W extends BoundedWindow> void writePCollectionViewData(
+        TupleTag<?> tag,
+        Iterable<WindowedValue<T>> data,
+        Coder<Iterable<WindowedValue<T>>> dataCoder,
+        W window,
+        Coder<W> windowCoder) throws IOException {
+      throw new UnsupportedOperationException("Writing side-input data is not supported.");
+    }
+
+    @Override
+    public StateInternals<?> stateInternals() {
+      return stateInternals;
+    }
+
+    @Override
+    public TimerInternals timerInternals() {
+      return timerInternals;
+    }
+  }
+
+  private class FlinkTimerInternals implements TimerInternals {
+
+    @Override
+    public void setTimer(
+        StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
+      setTimer(TimerData.of(timerId, namespace, target, timeDomain));
+    }
+
+    @Deprecated
+    @Override
+    public void setTimer(TimerData timerKey) {
+      long time = timerKey.getTimestamp().getMillis();
+      if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
+        timerService.registerEventTimeTimer(timerKey, time);
+      } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
+        timerService.registerProcessingTimeTimer(timerKey, time);
+      } else {
+        throw new UnsupportedOperationException(
+            "Unsupported time domain: " + timerKey.getDomain());
+      }
+    }
+
+    @Deprecated
+    @Override
+    public void deleteTimer(StateNamespace namespace, String timerId) {
+      throw new UnsupportedOperationException(
+          "Canceling of a timer by ID is not yet supported.");
+    }
+
+    @Override
+    public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+      throw new UnsupportedOperationException(
+          "Canceling of a timer by ID is not yet supported.");
+    }
+
+    @Deprecated
+    @Override
+    public void deleteTimer(TimerData timerKey) {
+      long time = timerKey.getTimestamp().getMillis();
+      if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
+        timerService.deleteEventTimeTimer(timerKey, time);
+      } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
+        timerService.deleteProcessingTimeTimer(timerKey, time);
+      } else {
+        throw new UnsupportedOperationException(
+            "Unsupported time domain: " + timerKey.getDomain());
+      }
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return new Instant(timerService.currentProcessingTime());
+    }
+
+    @Nullable
+    @Override
+    public Instant currentSynchronizedProcessingTime() {
+      return new Instant(timerService.currentProcessingTime());
+    }
+
+    @Override
+    public Instant currentInputWatermarkTime() {
+      return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold()));
+    }
+
+    @Nullable
+    @Override
+    public Instant currentOutputWatermarkTime() {
+      return new Instant(currentOutputWatermark);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
new file mode 100644
index 0000000..dce2e68
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import java.nio.ByteBuffer;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+/**
+ * {@link KeySelector} that retrieves a key from a {@link KV}. This will return
+ * the key as encoded by the provided {@link Coder} in a {@link ByteBuffer}. This ensures
+ * that all key comparisons/hashing happen on the encoded form.
+ */
+public class KvToByteBufferKeySelector<K, V>
+    implements KeySelector<WindowedValue<KV<K, V>>, ByteBuffer>,
+    ResultTypeQueryable<ByteBuffer> {
+
+  private final Coder<K> keyCoder;
+
+  public KvToByteBufferKeySelector(Coder<K> keyCoder) {
+    this.keyCoder = keyCoder;
+  }
+
+  @Override
+  public ByteBuffer getKey(WindowedValue<KV<K, V>> value) throws Exception {
+    K key = value.getValue().getKey();
+    byte[] keyBytes = CoderUtils.encodeToByteArray(keyCoder, key);
+    return ByteBuffer.wrap(keyBytes);
+  }
+
+  @Override
+  public TypeInformation<ByteBuffer> getProducedType() {
+    return new GenericTypeInfo<>(ByteBuffer.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
new file mode 100644
index 0000000..e843660
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import java.util.Collections;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Singleton keyed word item.
+ */
+public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
+
+  final K key;
+  final WindowedValue<ElemT> value;
+
+  public SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) {
+    this.key = key;
+    this.value = value;
+  }
+
+  @Override
+  public K key() {
+    return key;
+  }
+
+  public WindowedValue<ElemT> value() {
+    return value;
+  }
+
+  @Override
+  public Iterable<TimerInternals.TimerData> timersIterable() {
+    return Collections.EMPTY_LIST;
+  }
+
+  @Override
+  public Iterable<WindowedValue<ElemT>> elementsIterable() {
+    return Collections.singletonList(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
new file mode 100644
index 0000000..9a52330
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItemCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Singleton keyed work item coder.
+ */
+public class SingletonKeyedWorkItemCoder<K, ElemT>
+    extends StandardCoder<SingletonKeyedWorkItem<K, ElemT>> {
+  /**
+   * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
+   * coder.
+   */
+  public static <K, ElemT> SingletonKeyedWorkItemCoder<K, ElemT> of(
+      Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
+    return new SingletonKeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
+  }
+
+  @JsonCreator
+  public static <K, ElemT> SingletonKeyedWorkItemCoder<K, ElemT> of(
+      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
+    checkArgument(components.size() == 3, "Expecting 3 components, got %s", components.size());
+    @SuppressWarnings("unchecked")
+    Coder<K> keyCoder = (Coder<K>) components.get(0);
+    @SuppressWarnings("unchecked")
+    Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1);
+    @SuppressWarnings("unchecked")
+    Coder<? extends BoundedWindow> windowCoder = (Coder<? extends BoundedWindow>) components.get(2);
+    return new SingletonKeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
+  }
+
+  private final Coder<K> keyCoder;
+  private final Coder<ElemT> elemCoder;
+  private final Coder<? extends BoundedWindow> windowCoder;
+  private final WindowedValue.FullWindowedValueCoder<ElemT> valueCoder;
+
+  private SingletonKeyedWorkItemCoder(
+      Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
+    this.keyCoder = keyCoder;
+    this.elemCoder = elemCoder;
+    this.windowCoder = windowCoder;
+    valueCoder = WindowedValue.FullWindowedValueCoder.of(elemCoder, windowCoder);
+  }
+
+  public Coder<K> getKeyCoder() {
+    return keyCoder;
+  }
+
+  public Coder<ElemT> getElementCoder() {
+    return elemCoder;
+  }
+
+  @Override
+  public void encode(SingletonKeyedWorkItem<K, ElemT> value,
+                     OutputStream outStream,
+                     Context context)
+      throws CoderException, IOException {
+    keyCoder.encode(value.key(), outStream, context.nested());
+    valueCoder.encode(value.value, outStream, context);
+  }
+
+  @Override
+  public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context context)
+      throws CoderException, IOException {
+    K key = keyCoder.decode(inStream, context.nested());
+    WindowedValue<ElemT> value = valueCoder.decode(inStream, context);
+    return new SingletonKeyedWorkItem<>(key, value);
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return ImmutableList.of(keyCoder, elemCoder, windowCoder);
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    keyCoder.verifyDeterministic();
+    elemCoder.verifyDeterministic();
+    windowCoder.verifyDeterministic();
+  }
+
+  /**
+   * {@inheritDoc}.
+   *
+   * {@link KeyedWorkItemCoder} is not consistent with equals as it can return a
+   * {@link KeyedWorkItem} of a type different from the originally encoded type.
+   */
+  @Override
+  public boolean consistentWithEquals() {
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
new file mode 100644
index 0000000..40f70e4
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import org.apache.beam.runners.core.ElementAndRestriction;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Flink operator for executing splittable {@link DoFn DoFns}. Specifically, for executing
+ * the {@code @ProcessElement} method of a splittable {@link DoFn}.
+ */
+public class SplittableDoFnOperator<
+    InputT, FnOutputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+    extends DoFnOperator<
+    KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT, OutputT> {
+
+  public SplittableDoFnOperator(
+      DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT> doFn,
+      Coder<
+          WindowedValue<
+              KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
+      TupleTag<FnOutputT> mainOutputTag,
+      List<TupleTag<?>> additionalOutputTags,
+      OutputManagerFactory<OutputT> outputManagerFactory,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<Integer, PCollectionView<?>> sideInputTagMapping,
+      Collection<PCollectionView<?>> sideInputs,
+      PipelineOptions options,
+      Coder<?> keyCoder) {
+    super(
+        doFn,
+        inputCoder,
+        mainOutputTag,
+        additionalOutputTags,
+        outputManagerFactory,
+        windowingStrategy,
+        sideInputTagMapping,
+        sideInputs,
+        options,
+        keyCoder);
+
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+
+    checkState(doFn instanceof SplittableParDo.ProcessFn);
+
+    StateInternalsFactory<String> stateInternalsFactory = new StateInternalsFactory<String>() {
+      @Override
+      public StateInternals<String> stateInternalsForKey(String key) {
+        //this will implicitly be keyed by the key of the incoming
+        // element or by the key of a firing timer
+        return (StateInternals<String>) stateInternals;
+      }
+    };
+    TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() {
+      @Override
+      public TimerInternals timerInternalsForKey(String key) {
+        //this will implicitly be keyed like the StateInternalsFactory
+        return timerInternals;
+      }
+    };
+
+    ((SplittableParDo.ProcessFn) doFn).setStateInternalsFactory(stateInternalsFactory);
+    ((SplittableParDo.ProcessFn) doFn).setTimerInternalsFactory(timerInternalsFactory);
+    ((SplittableParDo.ProcessFn) doFn).setProcessElementInvoker(
+        new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
+            doFn,
+            serializedOptions.getPipelineOptions(),
+            new OutputWindowedValue<FnOutputT>() {
+              @Override
+              public void outputWindowedValue(
+                  FnOutputT output,
+                  Instant timestamp,
+                  Collection<? extends BoundedWindow> windows,
+                  PaneInfo pane) {
+                outputManager.output(
+                    mainOutputTag,
+                    WindowedValue.of(output, timestamp, windows, pane));
+              }
+
+              @Override
+              public <AdditionalOutputT> void outputWindowedValue(
+                  TupleTag<AdditionalOutputT> tag,
+                  AdditionalOutputT output,
+                  Instant timestamp,
+                  Collection<? extends BoundedWindow> windows,
+                  PaneInfo pane) {
+                outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
+              }
+            },
+            sideInputReader,
+            Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
+            10000,
+            Duration.standardSeconds(10)));
+  }
+
+  @Override
+  public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
+    doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+        KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
+            (String) stateInternals.getKey(),
+            Collections.singletonList(timer.getNamespace()))));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
new file mode 100644
index 0000000..9b2136c
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import static org.apache.beam.runners.core.TimerInternals.TimerData;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+
+/**
+ * Flink operator for executing window {@link DoFn DoFns}.
+ */
+public class WindowDoFnOperator<K, InputT, OutputT>
+    extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> {
+
+  private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;
+
+  public WindowDoFnOperator(
+      SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn,
+      Coder<WindowedValue<KeyedWorkItem<K, InputT>>> inputCoder,
+      TupleTag<KV<K, OutputT>> mainOutputTag,
+      List<TupleTag<?>> additionalOutputTags,
+      OutputManagerFactory<WindowedValue<KV<K, OutputT>>> outputManagerFactory,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<Integer, PCollectionView<?>> sideInputTagMapping,
+      Collection<PCollectionView<?>> sideInputs,
+      PipelineOptions options,
+      Coder<K> keyCoder) {
+    super(
+        null,
+        inputCoder,
+        mainOutputTag,
+        additionalOutputTags,
+        outputManagerFactory,
+        windowingStrategy,
+        sideInputTagMapping,
+        sideInputs,
+        options,
+        keyCoder);
+
+    this.systemReduceFn = systemReduceFn;
+
+  }
+
+  @Override
+  protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
+    StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() {
+      @Override
+      public StateInternals<K> stateInternalsForKey(K key) {
+        //this will implicitly be keyed by the key of the incoming
+        // element or by the key of a firing timer
+        return (StateInternals<K>) stateInternals;
+      }
+    };
+    TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() {
+      @Override
+      public TimerInternals timerInternalsForKey(K key) {
+        //this will implicitly be keyed like the StateInternalsFactory
+        return timerInternals;
+      }
+    };
+
+    // we have to do the unchecked cast because GroupAlsoByWindowViaWindowSetDoFn.create
+    // has the window type as generic parameter while WindowingStrategy is almost always
+    // untyped.
+    @SuppressWarnings("unchecked")
+    DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn =
+        GroupAlsoByWindowViaWindowSetNewDoFn.create(
+            windowingStrategy, stateInternalsFactory, timerInternalsFactory, sideInputReader,
+                (SystemReduceFn) systemReduceFn, outputManager, mainOutputTag);
+    return doFn;
+  }
+
+  @Override
+  public void fireTimer(InternalTimer<?, TimerData> timer) {
+    doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+        KeyedWorkItems.<K, InputT>timersWorkItem(
+            (K) stateInternals.getKey(),
+            Collections.singletonList(timer.getNamespace()))));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
new file mode 100644
index 0000000..1dff367
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import java.nio.ByteBuffer;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+/**
+ * {@link KeySelector} that retrieves a key from a {@link KeyedWorkItem}. This will return
+ * the key as encoded by the provided {@link Coder} in a {@link ByteBuffer}. This ensures
+ * that all key comparisons/hashing happen on the encoded form.
+ */
+public class WorkItemKeySelector<K, V>
+    implements KeySelector<WindowedValue<SingletonKeyedWorkItem<K, V>>, ByteBuffer>,
+    ResultTypeQueryable<ByteBuffer> {
+
+  private final Coder<K> keyCoder;
+
+  public WorkItemKeySelector(Coder<K> keyCoder) {
+    this.keyCoder = keyCoder;
+  }
+
+  @Override
+  public ByteBuffer getKey(WindowedValue<SingletonKeyedWorkItem<K, V>> value) throws Exception {
+    K key = value.getValue().key();
+    byte[] keyBytes = CoderUtils.encodeToByteArray(keyCoder, key);
+    return ByteBuffer.wrap(keyBytes);
+  }
+
+  @Override
+  public TypeInformation<ByteBuffer> getProducedType() {
+    return new GenericTypeInfo<>(ByteBuffer.class);
+  }
+}