You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/15 17:07:05 UTC

[10/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
new file mode 100644
index 0000000..ca667ee
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowingInternals;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.collect.ImmutableList;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Encapsulates a {@link com.google.cloud.dataflow.sdk.transforms.DoFn} that uses side outputs
+ * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
+ *
+ * We get a mapping from {@link com.google.cloud.dataflow.sdk.values.TupleTag} to output index
+ * and must tag all outputs with the output number. Afterwards a filter will filter out
+ * those elements that are not to be in a specific output.
+ */
+public class FlinkMultiOutputDoFnFunction<IN, OUT> extends RichMapPartitionFunction<IN, RawUnionValue> {
+
+  private final DoFn<IN, OUT> doFn;
+  private transient PipelineOptions options;
+  private final Map<TupleTag<?>, Integer> outputMap;
+
+  public FlinkMultiOutputDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions options, Map<TupleTag<?>, Integer> outputMap) {
+    this.doFn = doFn;
+    this.options = options;
+    this.outputMap = outputMap;
+  }
+
+  private void writeObject(ObjectOutputStream out)
+      throws IOException, ClassNotFoundException {
+    out.defaultWriteObject();
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.writeValue(out, options);
+  }
+
+  private void readObject(ObjectInputStream in)
+      throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+    ObjectMapper mapper = new ObjectMapper();
+    options = mapper.readValue(in, PipelineOptions.class);
+
+  }
+
+  @Override
+  public void mapPartition(Iterable<IN> values, Collector<RawUnionValue> out) throws Exception {
+    ProcessContext context = new ProcessContext(doFn, out);
+    this.doFn.startBundle(context);
+    for (IN value : values) {
+      context.inValue = value;
+      doFn.processElement(context);
+    }
+    this.doFn.finishBundle(context);
+  }
+
+  private class ProcessContext extends DoFn<IN, OUT>.ProcessContext {
+
+    IN inValue;
+    Collector<RawUnionValue> outCollector;
+
+    public ProcessContext(DoFn<IN, OUT> fn, Collector<RawUnionValue> outCollector) {
+      fn.super();
+      this.outCollector = outCollector;
+    }
+
+    @Override
+    public IN element() {
+      return this.inValue;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return Instant.now();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return GlobalWindow.INSTANCE;
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return PaneInfo.NO_FIRING;
+    }
+
+    @Override
+    public WindowingInternals<IN, OUT> windowingInternals() {
+      return null;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return options;
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      List<T> sideInput = getRuntimeContext().getBroadcastVariable(view.getTagInternal()
+          .getId());
+      List<WindowedValue<?>> windowedValueList = new ArrayList<>(sideInput.size());
+      for (T input : sideInput) {
+        windowedValueList.add(WindowedValue.of(input, Instant.now(), ImmutableList.of(GlobalWindow.INSTANCE), pane()));
+      }
+      return view.fromIterableInternal(windowedValueList);
+    }
+
+    @Override
+    public void output(OUT value) {
+      // assume that index 0 is the default output
+      outCollector.collect(new RawUnionValue(0, value));
+    }
+
+    @Override
+    public void outputWithTimestamp(OUT output, Instant timestamp) {
+      // not FLink's way, just output normally
+      output(output);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> void sideOutput(TupleTag<T> tag, T value) {
+      Integer index = outputMap.get(tag);
+      if (index != null) {
+        outCollector.collect(new RawUnionValue(index, value));
+      }
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      sideOutput(tag, output);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = new SerializableFnAggregatorWrapper<>(combiner);
+      getRuntimeContext().addAccumulator(name, wrapper);
+      return null;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
new file mode 100644
index 0000000..37de37e
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * A FlatMap function that filters out those elements that don't belong in this output. We need
+ * this to implement MultiOutput ParDo functions.
+ */
+public class FlinkMultiOutputPruningFunction<T> implements FlatMapFunction<RawUnionValue, T> {
+
+  private final int outputTag;
+
+  public FlinkMultiOutputPruningFunction(int outputTag) {
+    this.outputTag = outputTag;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void flatMap(RawUnionValue rawUnionValue, Collector<T> collector) throws Exception {
+    if (rawUnionValue.getUnionTag() == outputTag) {
+      collector.collect((T) rawUnionValue.getValue());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
new file mode 100644
index 0000000..2de681b
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.values.KV;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Iterator;
+
+/**
+ * Flink {@link org.apache.flink.api.common.functions.GroupCombineFunction} for executing a
+ * {@link com.google.cloud.dataflow.sdk.transforms.Combine.PerKey} operation. This reads the input
+ * {@link com.google.cloud.dataflow.sdk.values.KV} elements VI, extracts the key and emits accumulated
+ * values which have the intermediate format VA.
+ */
+public class FlinkPartialReduceFunction<K, VI, VA> implements GroupCombineFunction<KV<K, VI>, KV<K, VA>> {
+
+  private final Combine.KeyedCombineFn<K, VI, VA, ?> keyedCombineFn;
+
+  public FlinkPartialReduceFunction(Combine.KeyedCombineFn<K, VI, VA, ?>
+                                        keyedCombineFn) {
+    this.keyedCombineFn = keyedCombineFn;
+  }
+
+  @Override
+  public void combine(Iterable<KV<K, VI>> elements, Collector<KV<K, VA>> out) throws Exception {
+
+    final Iterator<KV<K, VI>> iterator = elements.iterator();
+    // create accumulator using the first elements key
+    KV<K, VI> first = iterator.next();
+    K key = first.getKey();
+    VI value = first.getValue();
+    VA accumulator = keyedCombineFn.createAccumulator(key);
+    accumulator = keyedCombineFn.addInput(key, accumulator, value);
+
+    while(iterator.hasNext()) {
+      value = iterator.next().getValue();
+      accumulator = keyedCombineFn.addInput(key, accumulator, value);
+    }
+
+    out.collect(KV.of(key, accumulator));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
new file mode 100644
index 0000000..29193a2
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.common.collect.ImmutableList;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Iterator;
+
+/**
+ * Flink {@link org.apache.flink.api.common.functions.GroupReduceFunction} for executing a
+ * {@link com.google.cloud.dataflow.sdk.transforms.Combine.PerKey} operation. This reads the input
+ * {@link com.google.cloud.dataflow.sdk.values.KV} elements, extracts the key and merges the
+ * accumulators resulting from the PartialReduce which produced the input VA.
+ */
+public class FlinkReduceFunction<K, VA, VO> implements GroupReduceFunction<KV<K, VA>, KV<K, VO>> {
+
+  private final Combine.KeyedCombineFn<K, ?, VA, VO> keyedCombineFn;
+
+  public FlinkReduceFunction(Combine.KeyedCombineFn<K, ?, VA, VO> keyedCombineFn) {
+    this.keyedCombineFn = keyedCombineFn;
+  }
+
+  @Override
+  public void reduce(Iterable<KV<K, VA>> values, Collector<KV<K, VO>> out) throws Exception {
+    Iterator<KV<K, VA>> it = values.iterator();
+
+    KV<K, VA> current = it.next();
+    K k = current.getKey();
+    VA accumulator = current.getValue();
+
+    while (it.hasNext()) {
+      current = it.next();
+      keyedCombineFn.mergeAccumulators(k, ImmutableList.of(accumulator, current.getValue()) );
+    }
+
+    out.collect(KV.of(k, keyedCombineFn.extractOutput(k, accumulator)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java
new file mode 100644
index 0000000..05f4415
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.beam.runners.flink.translation.functions;
+
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.StandardCoder;
+import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
+import com.google.cloud.dataflow.sdk.util.PropertyNames;
+import com.google.cloud.dataflow.sdk.util.VarInt;
+import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * A UnionCoder encodes RawUnionValues.
+ *
+ * This file copied from {@link com.google.cloud.dataflow.sdk.transforms.join.UnionCoder}
+ */
+@SuppressWarnings("serial")
+public class UnionCoder extends StandardCoder<RawUnionValue> {
+  // TODO: Think about how to integrate this with a schema object (i.e.
+  // a tuple of tuple tags).
+  /**
+   * Builds a union coder with the given list of element coders.  This list
+   * corresponds to a mapping of union tag to Coder.  Union tags start at 0.
+   */
+  public static UnionCoder of(List<Coder<?>> elementCoders) {
+    return new UnionCoder(elementCoders);
+  }
+
+  @JsonCreator
+  public static UnionCoder jsonOf(
+      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+      List<Coder<?>> elements) {
+    return UnionCoder.of(elements);
+  }
+
+  private int getIndexForEncoding(RawUnionValue union) {
+    if (union == null) {
+      throw new IllegalArgumentException("cannot encode a null tagged union");
+    }
+    int index = union.getUnionTag();
+    if (index < 0 || index >= elementCoders.size()) {
+      throw new IllegalArgumentException(
+          "union value index " + index + " not in range [0.." +
+              (elementCoders.size() - 1) + "]");
+    }
+    return index;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void encode(
+      RawUnionValue union,
+      OutputStream outStream,
+      Context context)
+      throws IOException  {
+    int index = getIndexForEncoding(union);
+    // Write out the union tag.
+    VarInt.encode(index, outStream);
+
+    // Write out the actual value.
+    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
+    coder.encode(
+        union.getValue(),
+        outStream,
+        context);
+  }
+
+  @Override
+  public RawUnionValue decode(InputStream inStream, Context context)
+      throws IOException {
+    int index = VarInt.decodeInt(inStream);
+    Object value = elementCoders.get(index).decode(inStream, context);
+    return new RawUnionValue(index, value);
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return null;
+  }
+
+  @Override
+  public List<? extends Coder<?>> getComponents() {
+    return elementCoders;
+  }
+
+  /**
+   * Since this coder uses elementCoders.get(index) and coders that are known to run in constant
+   * time, we defer the return value to that coder.
+   */
+  @Override
+  public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context context) {
+    int index = getIndexForEncoding(union);
+    @SuppressWarnings("unchecked")
+    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
+    return coder.isRegisterByteSizeObserverCheap(union.getValue(), context);
+  }
+
+  /**
+   * Notifies ElementByteSizeObserver about the byte size of the encoded value using this coder.
+   */
+  @Override
+  public void registerByteSizeObserver(
+      RawUnionValue union, ElementByteSizeObserver observer, Context context)
+      throws Exception {
+    int index = getIndexForEncoding(union);
+    // Write out the union tag.
+    observer.update(VarInt.getLength(index));
+    // Write out the actual value.
+    @SuppressWarnings("unchecked")
+    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
+    coder.registerByteSizeObserver(union.getValue(), observer, context);
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  private final List<Coder<?>> elementCoders;
+
+  private UnionCoder(List<Coder<?>> elementCoders) {
+    this.elementCoders = elementCoders;
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    verifyDeterministic(
+        "UnionCoder is only deterministic if all element coders are",
+        elementCoders);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java
new file mode 100644
index 0000000..1249036
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for
+ * {@link com.google.cloud.dataflow.sdk.coders.Coder}.
+ */
+public class CoderComparator<T> extends TypeComparator<T> {
+
+  private Coder<T> coder;
+
+  // We use these for internal encoding/decoding for creating copies and comparing
+  // serialized forms using a Coder
+  private transient InspectableByteArrayOutputStream buffer1;
+  private transient InspectableByteArrayOutputStream buffer2;
+
+  // For storing the Reference in encoded form
+  private transient InspectableByteArrayOutputStream referenceBuffer;
+
+  public CoderComparator(Coder<T> coder) {
+    this.coder = coder;
+    buffer1 = new InspectableByteArrayOutputStream();
+    buffer2 = new InspectableByteArrayOutputStream();
+    referenceBuffer = new InspectableByteArrayOutputStream();
+  }
+
+  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+    buffer1 = new InspectableByteArrayOutputStream();
+    buffer2 = new InspectableByteArrayOutputStream();
+    referenceBuffer = new InspectableByteArrayOutputStream();
+  }
+
+  @Override
+  public int hash(T record) {
+    return record.hashCode();
+  }
+
+  @Override
+  public void setReference(T toCompare) {
+    referenceBuffer.reset();
+    try {
+      coder.encode(toCompare, referenceBuffer, Coder.Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not set reference " + toCompare + ": " + e);
+    }
+  }
+
+  @Override
+  public boolean equalToReference(T candidate) {
+    try {
+      buffer2.reset();
+      coder.encode(candidate, buffer2, Coder.Context.OUTER);
+      byte[] arr = referenceBuffer.getBuffer();
+      byte[] arrOther = buffer2.getBuffer();
+      if (referenceBuffer.size() != buffer2.size()) {
+        return false;
+      }
+      int len = buffer2.size();
+      for(int i = 0; i < len; i++ ) {
+        if (arr[i] != arrOther[i]) {
+          return false;
+        }
+      }
+      return true;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not compare reference.", e);
+    }
+  }
+
+  @Override
+  public int compareToReference(TypeComparator<T> other) {
+    InspectableByteArrayOutputStream otherReferenceBuffer = ((CoderComparator<T>) other).referenceBuffer;
+
+    byte[] arr = referenceBuffer.getBuffer();
+    byte[] arrOther = otherReferenceBuffer.getBuffer();
+    if (referenceBuffer.size() != otherReferenceBuffer.size()) {
+      return referenceBuffer.size() - otherReferenceBuffer.size();
+    }
+    int len = referenceBuffer.size();
+    for (int i = 0; i < len; i++) {
+      if (arr[i] != arrOther[i]) {
+        return arr[i] - arrOther[i];
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public int compare(T first, T second) {
+    try {
+      buffer1.reset();
+      buffer2.reset();
+      coder.encode(first, buffer1, Coder.Context.OUTER);
+      coder.encode(second, buffer2, Coder.Context.OUTER);
+      byte[] arr = buffer1.getBuffer();
+      byte[] arrOther = buffer2.getBuffer();
+      if (buffer1.size() != buffer2.size()) {
+        return buffer1.size() - buffer2.size();
+      }
+      int len = buffer1.size();
+      for(int i = 0; i < len; i++ ) {
+        if (arr[i] != arrOther[i]) {
+          return arr[i] - arrOther[i];
+        }
+      }
+      return 0;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not compare: ", e);
+    }
+  }
+
+  @Override
+  public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+    CoderTypeSerializer<T> serializer = new CoderTypeSerializer<>(coder);
+    T first = serializer.deserialize(firstSource);
+    T second = serializer.deserialize(secondSource);
+    return compare(first, second);
+  }
+
+  @Override
+  public boolean supportsNormalizedKey() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsSerializationWithKeyNormalization() {
+    return false;
+  }
+
+  @Override
+  public int getNormalizeKeyLen() {
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+    return true;
+  }
+
+  @Override
+  public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
+    buffer1.reset();
+    try {
+      coder.encode(record, buffer1, Coder.Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e);
+    }
+    final byte[] data = buffer1.getBuffer();
+    final int limit = offset + numBytes;
+
+    target.put(offset, data, 0, Math.min(numBytes, buffer1.size()));
+
+    offset += buffer1.size();
+
+    while (offset < limit) {
+      target.put(offset++, (byte) 0);
+    }
+  }
+
+  @Override
+  public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean invertNormalizedKey() {
+    return false;
+  }
+
+  @Override
+  public TypeComparator<T> duplicate() {
+    return new CoderComparator<>(coder);
+  }
+
+  @Override
+  public int extractKeys(Object record, Object[] target, int index) {
+    target[index] = record;
+    return 1;
+  }
+
+  @Override
+  public TypeComparator[] getFlatComparators() {
+    return new TypeComparator[] { this.duplicate() };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
new file mode 100644
index 0000000..f9d4dcd
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.VoidCoder;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import com.google.common.base.Preconditions;
+
+/**
+ * Flink {@link org.apache.flink.api.common.typeinfo.TypeInformation} for
+ * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s.
+ */
+public class CoderTypeInformation<T> extends TypeInformation<T> implements AtomicType<T> {
+
+  private final Coder<T> coder;
+
+  public CoderTypeInformation(Coder<T> coder) {
+    Preconditions.checkNotNull(coder);
+    this.coder = coder;
+  }
+
+  @Override
+  public boolean isBasicType() {
+    return false;
+  }
+
+  @Override
+  public boolean isTupleType() {
+    return false;
+  }
+
+  @Override
+  public int getArity() {
+    return 1;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Class<T> getTypeClass() {
+    // We don't have the Class, so we have to pass null here. What a shame...
+    return (Class<T>) Object.class;
+  }
+
+  @Override
+  public boolean isKeyType() {
+    return true;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+    if (coder instanceof VoidCoder) {
+      return (TypeSerializer<T>) new VoidCoderTypeSerializer();
+    }
+    return new CoderTypeSerializer<>(coder);
+  }
+
+  @Override
+  public int getTotalFields() {
+    return 2;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    CoderTypeInformation that = (CoderTypeInformation) o;
+
+    return coder.equals(that.coder);
+
+  }
+
+  @Override
+  public int hashCode() {
+    return coder.hashCode();
+  }
+
+  @Override
+  public boolean canEqual(Object obj) {
+    return obj instanceof CoderTypeInformation;
+  }
+
+  @Override
+  public String toString() {
+    return "CoderTypeInformation{" +
+        "coder=" + coder +
+        '}';
+  }
+
+  @Override
+  public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig
+      executionConfig) {
+    return new CoderComparator<>(coder);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
new file mode 100644
index 0000000..4e81054
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
+import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for
+ * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s
+ */
+public class CoderTypeSerializer<T> extends TypeSerializer<T> {
+  
+  private Coder<T> coder;
+  private transient DataInputViewWrapper inputWrapper;
+  private transient DataOutputViewWrapper outputWrapper;
+
+  // We use this for internal encoding/decoding for creating copies using the Coder.
+  private transient InspectableByteArrayOutputStream buffer;
+
+  public CoderTypeSerializer(Coder<T> coder) {
+    this.coder = coder;
+    this.inputWrapper = new DataInputViewWrapper(null);
+    this.outputWrapper = new DataOutputViewWrapper(null);
+
+    buffer = new InspectableByteArrayOutputStream();
+  }
+  
+  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+    this.inputWrapper = new DataInputViewWrapper(null);
+    this.outputWrapper = new DataOutputViewWrapper(null);
+
+    buffer = new InspectableByteArrayOutputStream();
+  }
+  
+  @Override
+  public boolean isImmutableType() {
+    return false;
+  }
+
+  @Override
+  public CoderTypeSerializer<T> duplicate() {
+    return new CoderTypeSerializer<>(coder);
+  }
+
+  @Override
+  public T createInstance() {
+    return null;
+  }
+
+  @Override
+  public T copy(T t) {
+    buffer.reset();
+    try {
+      coder.encode(t, buffer, Coder.Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not copy.", e);
+    }
+    try {
+      return coder.decode(new ByteArrayInputStream(buffer.getBuffer(), 0, buffer
+          .size()), Coder.Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not copy.", e);
+    }
+  }
+
+  @Override
+  public T copy(T t, T reuse) {
+    return copy(t);
+  }
+
+  @Override
+  public int getLength() {
+    return 0;
+  }
+
+  @Override
+  public void serialize(T t, DataOutputView dataOutputView) throws IOException {
+    outputWrapper.setOutputView(dataOutputView);
+    coder.encode(t, outputWrapper, Coder.Context.NESTED);
+  }
+
+  @Override
+  public T deserialize(DataInputView dataInputView) throws IOException {
+    try {
+      inputWrapper.setInputView(dataInputView);
+      return coder.decode(inputWrapper, Coder.Context.NESTED);
+    } catch (CoderException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof EOFException) {
+        throw (EOFException) cause;
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  @Override
+  public T deserialize(T t, DataInputView dataInputView) throws IOException {
+    return deserialize(dataInputView);
+  }
+
+  @Override
+  public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
+    serialize(deserialize(dataInputView), dataOutputView);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    CoderTypeSerializer that = (CoderTypeSerializer) o;
+    return coder.equals(that.coder);
+  }
+
+  @Override
+  public boolean canEqual(Object obj) {
+    return obj instanceof CoderTypeSerializer;
+  }
+
+  @Override
+  public int hashCode() {
+    return coder.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java
new file mode 100644
index 0000000..36b5ba3
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import java.io.ByteArrayOutputStream;
+
+/**
+ * Version of {@link java.io.ByteArrayOutputStream} that allows to retrieve the internal
+ * byte[] buffer without incurring an array copy.
+ */
+public class InspectableByteArrayOutputStream extends ByteArrayOutputStream {
+
+  /**
+   * Get the underlying byte array.
+   */
+  public byte[] getBuffer() {
+    return buf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java
new file mode 100644
index 0000000..3912295
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.values.KV;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for
+ * {@link com.google.cloud.dataflow.sdk.coders.KvCoder}. We have a special comparator
+ * for {@link KV} that always compares on the key only.
+ */
+public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> {
+  
+  private KvCoder<K, V> coder;
+  private Coder<K> keyCoder;
+
+  // We use these for internal encoding/decoding for creating copies and comparing
+  // serialized forms using a Coder
+  private transient InspectableByteArrayOutputStream buffer1;
+  private transient InspectableByteArrayOutputStream buffer2;
+
+  // For storing the Reference in encoded form
+  private transient InspectableByteArrayOutputStream referenceBuffer;
+
+
+  // For deserializing the key
+  private transient DataInputViewWrapper inputWrapper;
+
+  public KvCoderComperator(KvCoder<K, V> coder) {
+    this.coder = coder;
+    this.keyCoder = coder.getKeyCoder();
+
+    buffer1 = new InspectableByteArrayOutputStream();
+    buffer2 = new InspectableByteArrayOutputStream();
+    referenceBuffer = new InspectableByteArrayOutputStream();
+
+    inputWrapper = new DataInputViewWrapper(null);
+  }
+
+  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+
+    buffer1 = new InspectableByteArrayOutputStream();
+    buffer2 = new InspectableByteArrayOutputStream();
+    referenceBuffer = new InspectableByteArrayOutputStream();
+
+    inputWrapper = new DataInputViewWrapper(null);
+  }
+
+  @Override
+  public int hash(KV<K, V> record) {
+    K key = record.getKey();
+    if (key != null) {
+      return key.hashCode();
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public void setReference(KV<K, V> toCompare) {
+    referenceBuffer.reset();
+    try {
+      keyCoder.encode(toCompare.getKey(), referenceBuffer, Coder.Context.OUTER);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not set reference " + toCompare + ": " + e);
+    }
+  }
+
+  @Override
+  public boolean equalToReference(KV<K, V> candidate) {
+    try {
+      buffer2.reset();
+      keyCoder.encode(candidate.getKey(), buffer2, Coder.Context.OUTER);
+      byte[] arr = referenceBuffer.getBuffer();
+      byte[] arrOther = buffer2.getBuffer();
+      if (referenceBuffer.size() != buffer2.size()) {
+        return false;
+      }
+      int len = buffer2.size();
+      for(int i = 0; i < len; i++ ) {
+        if (arr[i] != arrOther[i]) {
+          return false;
+        }
+      }
+      return true;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not compare reference.", e);
+    }
+  }
+
+  @Override
+  public int compareToReference(TypeComparator<KV<K, V>> other) {
+    InspectableByteArrayOutputStream otherReferenceBuffer = ((KvCoderComperator<K, V>) other).referenceBuffer;
+
+    byte[] arr = referenceBuffer.getBuffer();
+    byte[] arrOther = otherReferenceBuffer.getBuffer();
+    if (referenceBuffer.size() != otherReferenceBuffer.size()) {
+      return referenceBuffer.size() - otherReferenceBuffer.size();
+    }
+    int len = referenceBuffer.size();
+    for (int i = 0; i < len; i++) {
+      if (arr[i] != arrOther[i]) {
+        return arr[i] - arrOther[i];
+      }
+    }
+    return 0;
+  }
+
+
+  @Override
+  public int compare(KV<K, V> first, KV<K, V> second) {
+    try {
+      buffer1.reset();
+      buffer2.reset();
+      keyCoder.encode(first.getKey(), buffer1, Coder.Context.OUTER);
+      keyCoder.encode(second.getKey(), buffer2, Coder.Context.OUTER);
+      byte[] arr = buffer1.getBuffer();
+      byte[] arrOther = buffer2.getBuffer();
+      if (buffer1.size() != buffer2.size()) {
+        return buffer1.size() - buffer2.size();
+      }
+      int len = buffer1.size();
+      for(int i = 0; i < len; i++ ) {
+        if (arr[i] != arrOther[i]) {
+          return arr[i] - arrOther[i];
+        }
+      }
+      return 0;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not compare reference.", e);
+    }
+  }
+
+  @Override
+  public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+
+    inputWrapper.setInputView(firstSource);
+    K firstKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED);
+    inputWrapper.setInputView(secondSource);
+    K secondKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED);
+
+    try {
+      buffer1.reset();
+      buffer2.reset();
+      keyCoder.encode(firstKey, buffer1, Coder.Context.OUTER);
+      keyCoder.encode(secondKey, buffer2, Coder.Context.OUTER);
+      byte[] arr = buffer1.getBuffer();
+      byte[] arrOther = buffer2.getBuffer();
+      if (buffer1.size() != buffer2.size()) {
+        return buffer1.size() - buffer2.size();
+      }
+      int len = buffer1.size();
+      for(int i = 0; i < len; i++ ) {
+        if (arr[i] != arrOther[i]) {
+          return arr[i] - arrOther[i];
+        }
+      }
+      return 0;
+    } catch (IOException e) {
+      throw new RuntimeException("Could not compare reference.", e);
+    }
+  }
+
+  @Override
+  public boolean supportsNormalizedKey() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsSerializationWithKeyNormalization() {
+    return false;
+  }
+
+  @Override
+  public int getNormalizeKeyLen() {
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+    return true;
+  }
+
+  @Override
+  public void putNormalizedKey(KV<K, V> record, MemorySegment target, int offset, int numBytes) {
+    buffer1.reset();
+    try {
+      keyCoder.encode(record.getKey(), buffer1, Coder.Context.NESTED);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e);
+    }
+    final byte[] data = buffer1.getBuffer();
+    final int limit = offset + numBytes;
+
+    int numBytesPut = Math.min(numBytes, buffer1.size());
+
+    target.put(offset, data, 0, numBytesPut);
+
+    offset += numBytesPut;
+
+    while (offset < limit) {
+      target.put(offset++, (byte) 0);
+    }
+  }
+
+  @Override
+  public void writeWithKeyNormalization(KV<K, V> record, DataOutputView target) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public KV<K, V> readWithKeyDenormalization(KV<K, V> reuse, DataInputView source) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean invertNormalizedKey() {
+    return false;
+  }
+
+  @Override
+  public TypeComparator<KV<K, V>> duplicate() {
+    return new KvCoderComperator<>(coder);
+  }
+
+  @Override
+  public int extractKeys(Object record, Object[] target, int index) {
+    KV<K, V> kv = (KV<K, V>) record;
+    K k = kv.getKey();
+    target[index] = k;
+    return 1;
+  }
+
+  @Override
+  public TypeComparator[] getFlatComparators() {
+    return new TypeComparator[] {new CoderComparator<>(keyCoder)};
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java
new file mode 100644
index 0000000..8862d48
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.values.KV;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+
+/**
+ * Flink {@link org.apache.flink.api.common.typeinfo.TypeInformation} for
+ * Dataflow {@link com.google.cloud.dataflow.sdk.coders.KvCoder}.
+ */
+public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> {
+
+  private KvCoder<K, V> coder;
+
+  // We don't have the Class, so we have to pass null here. What a shame...
+  private static Object DUMMY = new Object();
+
+  @SuppressWarnings("unchecked")
+  public KvCoderTypeInformation(KvCoder<K, V> coder) {
+    super(((Class<KV<K,V>>) DUMMY.getClass()));
+    this.coder = coder;
+    Preconditions.checkNotNull(coder);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public TypeComparator<KV<K, V>> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset, ExecutionConfig config) {
+    return new KvCoderComperator((KvCoder) coder);
+  }
+
+  @Override
+  public boolean isBasicType() {
+    return false;
+  }
+
+  @Override
+  public boolean isTupleType() {
+    return false;
+  }
+
+  @Override
+  public int getArity() {
+    return 2;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Class<KV<K, V>> getTypeClass() {
+    return privateGetTypeClass();
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <X> Class<X> privateGetTypeClass() {
+    return (Class<X>) Object.class;
+  }
+
+  @Override
+  public boolean isKeyType() {
+    return true;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public TypeSerializer<KV<K, V>> createSerializer(ExecutionConfig config) {
+    return new CoderTypeSerializer<>(coder);
+  }
+
+  @Override
+  public int getTotalFields() {
+    return 2;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    KvCoderTypeInformation that = (KvCoderTypeInformation) o;
+
+    return coder.equals(that.coder);
+
+  }
+
+  @Override
+  public int hashCode() {
+    return coder.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "CoderTypeInformation{" +
+        "coder=" + coder +
+        '}';
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <X> TypeInformation<X> getTypeAt(int pos) {
+    if (pos == 0) {
+      return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder());
+    } else if (pos == 1) {
+      return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder());
+    } else {
+      throw new RuntimeException("Invalid field position " + pos);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
+    switch (fieldExpression) {
+      case "key":
+        return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder());
+      case "value":
+        return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder());
+      default:
+        throw new UnsupportedOperationException("Only KvCoder has fields.");
+    }
+  }
+
+  @Override
+  public String[] getFieldNames() {
+    return new String[]{"key", "value"};
+  }
+
+  @Override
+  public int getFieldIndex(String fieldName) {
+    switch (fieldName) {
+      case "key":
+        return 0;
+      case "value":
+        return 1;
+      default:
+        return -1;
+    }
+  }
+
+  @Override
+  public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
+      CoderTypeInformation keyTypeInfo = new CoderTypeInformation<>(coder.getKeyCoder());
+      result.add(new FlatFieldDescriptor(0, keyTypeInfo));
+  }
+
+  @Override
+  protected TypeComparatorBuilder<KV<K, V>> createTypeComparatorBuilder() {
+    return new KvCoderTypeComparatorBuilder();
+  }
+
+  private class KvCoderTypeComparatorBuilder implements TypeComparatorBuilder<KV<K, V>> {
+
+    @Override
+    public void initializeTypeComparatorBuilder(int size) {}
+
+    @Override
+    public void addComparatorField(int fieldId, TypeComparator<?> comparator) {}
+
+    @Override
+    public TypeComparator<KV<K, V>> createTypeComparator(ExecutionConfig config) {
+      return new KvCoderComperator<>(coder);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java
new file mode 100644
index 0000000..8bc3620
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.types;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Special Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for
+ * {@link com.google.cloud.dataflow.sdk.coders.VoidCoder}. We need this because Flink does not
+ * allow returning {@code null} from an input reader. We return a {@link VoidValue} instead
+ * that behaves like a {@code null}, hopefully.
+ */
+public class VoidCoderTypeSerializer extends TypeSerializer<VoidCoderTypeSerializer.VoidValue> {
+
+  @Override
+  public boolean isImmutableType() {
+    return false;
+  }
+
+  @Override
+  public VoidCoderTypeSerializer duplicate() {
+    return this;
+  }
+
+  @Override
+  public VoidValue createInstance() {
+    return VoidValue.INSTANCE;
+  }
+
+  @Override
+  public VoidValue copy(VoidValue from) {
+    return from;
+  }
+
+  @Override
+  public VoidValue copy(VoidValue from, VoidValue reuse) {
+    return from;
+  }
+
+  @Override
+  public int getLength() {
+    return 0;
+  }
+
+  @Override
+  public void serialize(VoidValue record, DataOutputView target) throws IOException {
+    target.writeByte(1);
+  }
+
+  @Override
+  public VoidValue deserialize(DataInputView source) throws IOException {
+    source.readByte();
+    return VoidValue.INSTANCE;
+  }
+
+  @Override
+  public VoidValue deserialize(VoidValue reuse, DataInputView source) throws IOException {
+    return deserialize(source);
+  }
+
+  @Override
+  public void copy(DataInputView source, DataOutputView target) throws IOException {
+    source.readByte();
+    target.writeByte(1);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof VoidCoderTypeSerializer) {
+      VoidCoderTypeSerializer other = (VoidCoderTypeSerializer) obj;
+      return other.canEqual(this);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean canEqual(Object obj) {
+    return obj instanceof VoidCoderTypeSerializer;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  public static class VoidValue {
+    private VoidValue() {}
+    
+    public static VoidValue INSTANCE = new VoidValue();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java
new file mode 100644
index 0000000..445d411
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.accumulators.Accumulator;
+
+import java.io.Serializable;
+
+/**
+ * Wrapper that wraps a {@link com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn}
+ * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using
+ * the combine function as an aggregator in a {@link com.google.cloud.dataflow.sdk.transforms.ParDo}
+ * operation.
+ */
+public class CombineFnAggregatorWrapper<AI, AA, AR> implements Aggregator<AI, AR>, Accumulator<AI, Serializable> {
+  
+  private AA aa;
+  private Combine.CombineFn<? super AI, AA, AR> combiner;
+
+  public CombineFnAggregatorWrapper() {
+  }
+
+  public CombineFnAggregatorWrapper(Combine.CombineFn<? super AI, AA, AR> combiner) {
+    this.combiner = combiner;
+    this.aa = combiner.createAccumulator();
+  }
+
+  @Override
+  public void add(AI value) {
+    combiner.addInput(aa, value);
+  }
+
+  @Override
+  public Serializable getLocalValue() {
+    return (Serializable) combiner.extractOutput(aa);
+  }
+
+  @Override
+  public void resetLocal() {
+    aa = combiner.createAccumulator();
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void merge(Accumulator<AI, Serializable> other) {
+    aa = combiner.mergeAccumulators(Lists.newArrayList(aa, ((CombineFnAggregatorWrapper<AI, AA, AR>)other).aa));
+  }
+
+  @Override
+  public Accumulator<AI, Serializable> clone() {
+    // copy it by merging
+    AA aaCopy = combiner.mergeAccumulators(Lists.newArrayList(aa));
+    CombineFnAggregatorWrapper<AI, AA, AR> result = new
+        CombineFnAggregatorWrapper<>(combiner);
+    result.aa = aaCopy;
+    return result;
+  }
+
+  @Override
+  public void addValue(AI value) {
+    add(value);
+  }
+
+  @Override
+  public String getName() {
+    return "CombineFn: " + combiner.toString();
+  }
+
+  @Override
+  public Combine.CombineFn getCombineFn() {
+    return combiner;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java
new file mode 100644
index 0000000..6a3cf50
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import org.apache.flink.core.memory.DataInputView;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Wrapper for {@link DataInputView}. We need this because Flink reads data using a
+ * {@link org.apache.flink.core.memory.DataInputView} while
+ * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s expect an
+ * {@link java.io.InputStream}.
+ */
+public class DataInputViewWrapper extends InputStream {
+
+  private DataInputView inputView;
+
+  public DataInputViewWrapper(DataInputView inputView) {
+    this.inputView = inputView;
+  }
+
+  public void setInputView(DataInputView inputView) {
+    this.inputView = inputView;
+  }
+
+  @Override
+  public int read() throws IOException {
+    try {
+      return inputView.readUnsignedByte();
+    } catch (EOFException e) {
+      // translate between DataInput and InputStream,
+      // DataInput signals EOF by exception, InputStream does it by returning -1
+      return -1;
+    }
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    return inputView.read(b, off, len);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
new file mode 100644
index 0000000..6bd2240
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Wrapper for {@link org.apache.flink.core.memory.DataOutputView}. We need this because
+ * Flink writes data using a {@link org.apache.flink.core.memory.DataInputView} while
+ * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s expect an
+ * {@link java.io.OutputStream}.
+ */
+public class DataOutputViewWrapper extends OutputStream {
+  
+  private DataOutputView outputView;
+
+  public DataOutputViewWrapper(DataOutputView outputView) {
+    this.outputView = outputView;
+  }
+
+  public void setOutputView(DataOutputView outputView) {
+    this.outputView = outputView;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    outputView.write(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    outputView.write(b, off, len);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
new file mode 100644
index 0000000..4409586
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.accumulators.Accumulator;
+
+import java.io.Serializable;
+
+/**
+ * Wrapper that wraps a {@link com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn}
+ * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using
+ * the function as an aggregator in a {@link com.google.cloud.dataflow.sdk.transforms.ParDo}
+ * operation.
+ */
+public class SerializableFnAggregatorWrapper<AI, AO> implements Aggregator<AI, AO>, Accumulator<AI, Serializable> {
+
+  private AO aa;
+  private Combine.CombineFn<AI, ?, AO> combiner;
+
+  public SerializableFnAggregatorWrapper(Combine.CombineFn<AI, ?, AO> combiner) {
+    this.combiner = combiner;
+    resetLocal();
+  }
+  
+  @Override
+  @SuppressWarnings("unchecked")
+  public void add(AI value) {
+    this.aa = combiner.apply(ImmutableList.of((AI) aa, value));
+  }
+
+  @Override
+  public Serializable getLocalValue() {
+    return (Serializable) aa;
+  }
+
+  @Override
+  public void resetLocal() {
+    this.aa = combiner.apply(ImmutableList.<AI>of());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void merge(Accumulator<AI, Serializable> other) {
+    this.aa = combiner.apply(ImmutableList.of((AI) aa, (AI) other.getLocalValue()));
+  }
+
+  @Override
+  public void addValue(AI value) {
+    add(value);
+  }
+
+  @Override
+  public String getName() {
+    return "Aggregator :" + combiner.toString();
+  }
+
+  @Override
+  public Combine.CombineFn<AI, ?, AO> getCombineFn() {
+    return combiner;
+  }
+
+  @Override
+  public Accumulator<AI, Serializable> clone() {
+    // copy it by merging
+    AO resultCopy = combiner.apply(Lists.newArrayList((AI) aa));
+    SerializableFnAggregatorWrapper<AI, AO> result = new
+        SerializableFnAggregatorWrapper<>(combiner);
+
+    result.aa = resultCopy;
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
new file mode 100644
index 0000000..4c2475d
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.cloud.dataflow.sdk.io.Sink;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.common.base.Preconditions;
+import com.google.cloud.dataflow.sdk.transforms.Write;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Field;
+
+/**
+ * Wrapper class to use generic Write.Bound transforms as sinks.
+ * @param <T> The type of the incoming records.
+ */
+public class SinkOutputFormat<T> implements OutputFormat<T> {
+
+  private final Sink<T> sink;
+
+  private transient PipelineOptions pipelineOptions;
+
+  private Sink.WriteOperation<T, ?> writeOperation;
+  private Sink.Writer<T, ?> writer;
+
+  private AbstractID uid = new AbstractID();
+
+  public SinkOutputFormat(Write.Bound<T> transform, PipelineOptions pipelineOptions) {
+    this.sink = extractSink(transform);
+    this.pipelineOptions = Preconditions.checkNotNull(pipelineOptions);
+  }
+
+  private Sink<T> extractSink(Write.Bound<T> transform) {
+    // TODO possibly add a getter in the upstream
+    try {
+      Field sinkField = transform.getClass().getDeclaredField("sink");
+      sinkField.setAccessible(true);
+      @SuppressWarnings("unchecked")
+      Sink<T> extractedSink = (Sink<T>) sinkField.get(transform);
+      return extractedSink;
+    } catch (NoSuchFieldException | IllegalAccessException e) {
+      throw new RuntimeException("Could not acquire custom sink field.", e);
+    }
+  }
+
+  @Override
+  public void configure(Configuration configuration) {
+    writeOperation = sink.createWriteOperation(pipelineOptions);
+    try {
+      writeOperation.initialize(pipelineOptions);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to initialize the write operation.", e);
+    }
+  }
+
+  @Override
+  public void open(int taskNumber, int numTasks) throws IOException {
+    try {
+      writer = writeOperation.createWriter(pipelineOptions);
+    } catch (Exception e) {
+      throw new IOException("Couldn't create writer.", e);
+    }
+    try {
+      writer.open(uid + "-" + String.valueOf(taskNumber));
+    } catch (Exception e) {
+      throw new IOException("Couldn't open writer.", e);
+    }
+  }
+
+  @Override
+  public void writeRecord(T record) throws IOException {
+    try {
+      writer.write(record);
+    } catch (Exception e) {
+      throw new IOException("Couldn't write record.", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      writer.close();
+    } catch (Exception e) {
+      throw new IOException("Couldn't close writer.", e);
+    }
+  }
+
+  private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException {
+    out.defaultWriteObject();
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.writeValue(out, pipelineOptions);
+  }
+
+  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+    ObjectMapper mapper = new ObjectMapper();
+    pipelineOptions = mapper.readValue(in, PipelineOptions.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
new file mode 100644
index 0000000..cd5cd40
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.io.BoundedSource;
+import com.google.cloud.dataflow.sdk.io.Source;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A Flink {@link org.apache.flink.api.common.io.InputFormat} that wraps a
+ * Dataflow {@link com.google.cloud.dataflow.sdk.io.Source}.
+ */
+public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> {
+  private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);
+
+  private final BoundedSource<T> initialSource;
+  private transient PipelineOptions options;
+
+  private BoundedSource.BoundedReader<T> reader = null;
+  private boolean reachedEnd = true;
+
+  public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) {
+    this.initialSource = initialSource;
+    this.options = options;
+  }
+
+  private void writeObject(ObjectOutputStream out)
+      throws IOException, ClassNotFoundException {
+    out.defaultWriteObject();
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.writeValue(out, options);
+  }
+
+  private void readObject(ObjectInputStream in)
+      throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+    ObjectMapper mapper = new ObjectMapper();
+    options = mapper.readValue(in, PipelineOptions.class);
+  }
+
+  @Override
+  public void configure(Configuration configuration) {}
+
+  @Override
+  public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
+    reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);
+    reachedEnd = false;
+  }
+
+  @Override
+  public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
+    try {
+      final long estimatedSize = initialSource.getEstimatedSizeBytes(options);
+
+      return new BaseStatistics() {
+        @Override
+        public long getTotalInputSize() {
+          return estimatedSize;
+
+        }
+
+        @Override
+        public long getNumberOfRecords() {
+          return BaseStatistics.NUM_RECORDS_UNKNOWN;
+        }
+
+        @Override
+        public float getAverageRecordWidth() {
+          return BaseStatistics.AVG_RECORD_BYTES_UNKNOWN;
+        }
+      };
+    } catch (Exception e) {
+      LOG.warn("Could not read Source statistics: {}", e);
+    }
+
+    return null;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
+    long desiredSizeBytes;
+    try {
+      desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
+      List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes,
+          options);
+      List<SourceInputSplit<T>> splits = new ArrayList<>();
+      int splitCount = 0;
+      for (Source<T> shard: shards) {
+        splits.add(new SourceInputSplit<>(shard, splitCount++));
+      }
+      return splits.toArray(new SourceInputSplit[splits.size()]);
+    } catch (Exception e) {
+      throw new IOException("Could not create input splits from Source.", e);
+    }
+  }
+
+  @Override
+  public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) {
+    return new InputSplitAssigner() {
+      private int index = 0;
+      private final SourceInputSplit[] splits = sourceInputSplits;
+      @Override
+      public InputSplit getNextInputSplit(String host, int taskId) {
+        if (index < splits.length) {
+          return splits[index++];
+        } else {
+          return null;
+        }
+      }
+    };
+  }
+
+
+  @Override
+  public boolean reachedEnd() throws IOException {
+    return reachedEnd;
+  }
+
+  @Override
+  public T nextRecord(T t) throws IOException {
+
+    reachedEnd = !reader.advance();
+    if (!reachedEnd) {
+      return reader.getCurrent();
+    }
+    return null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java
new file mode 100644
index 0000000..cde2b35
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers;
+
+import com.google.cloud.dataflow.sdk.io.Source;
+import org.apache.flink.core.io.InputSplit;
+
+/**
+ * {@link org.apache.flink.core.io.InputSplit} for
+ * {@link org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}. We pass
+ * the sharded Source around in the input split because Sources simply split up into several
+ * Sources for sharding. This is different to how Flink creates a separate InputSplit from
+ * an InputFormat.
+ */
+public class SourceInputSplit<T> implements InputSplit {
+
+  private Source<T> source;
+  private int splitNumber;
+
+  public SourceInputSplit() {
+  }
+
+  public SourceInputSplit(Source<T> source, int splitNumber) {
+    this.source = source;
+    this.splitNumber = splitNumber;
+  }
+
+  @Override
+  public int getSplitNumber() {
+    return splitNumber;
+  }
+
+  public Source<T> getSource() {
+    return source;
+  }
+
+}