You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:24 UTC

[29/50] [abbrv] incubator-beam git commit: [flink] adjust directories according to package name

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
deleted file mode 100644
index 83b19ee..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.common.base.Preconditions;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlinkStreamingTranslationContext {
-
-  private final StreamExecutionEnvironment env;
-  private final PipelineOptions options;
-
-  /**
-   * Keeps a mapping between the output value of the PTransform (in Dataflow) and the
-   * Flink Operator that produced it, after the translation of the correspondinf PTransform
-   * to its Flink equivalent.
-   * */
-  private final Map<PValue, DataStream<?>> dataStreams;
-
-  private AppliedPTransform<?, ?, ?> currentTransform;
-
-  public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) {
-    this.env = Preconditions.checkNotNull(env);
-    this.options = Preconditions.checkNotNull(options);
-    this.dataStreams = new HashMap<>();
-  }
-
-  public StreamExecutionEnvironment getExecutionEnvironment() {
-    return env;
-  }
-
-  public PipelineOptions getPipelineOptions() {
-    return options;
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> DataStream<T> getInputDataStream(PValue value) {
-    return (DataStream<T>) dataStreams.get(value);
-  }
-
-  public void setOutputDataStream(PValue value, DataStream<?> set) {
-    if (!dataStreams.containsKey(value)) {
-      dataStreams.put(value, set);
-    }
-  }
-
-  /**
-   * Sets the AppliedPTransform which carries input/output.
-   * @param currentTransform
-   */
-  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
-    this.currentTransform = currentTransform;
-  }
-
-  @SuppressWarnings("unchecked")
-  public <I extends PInput> I getInput(PTransform<I, ?> transform) {
-    return (I) currentTransform.getInput();
-  }
-
-  @SuppressWarnings("unchecked")
-  public <O extends POutput> O getOutput(PTransform<?, O> transform) {
-    return (O) currentTransform.getOutput();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java
deleted file mode 100644
index 0e855ad..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResultSchema;
-import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class FlinkCoGroupKeyedListAggregator<K,V1,V2> implements CoGroupFunction<KV<K,V1>, KV<K,V2>, KV<K, CoGbkResult>>{
-
-  private CoGbkResultSchema schema;
-  private TupleTag<?> tupleTag1;
-  private TupleTag<?> tupleTag2;
-
-  public FlinkCoGroupKeyedListAggregator(CoGbkResultSchema schema, TupleTag<?> tupleTag1, TupleTag<?> tupleTag2) {
-    this.schema = schema;
-    this.tupleTag1 = tupleTag1;
-    this.tupleTag2 = tupleTag2;
-  }
-
-  @Override
-  public void coGroup(Iterable<KV<K,V1>> first, Iterable<KV<K,V2>> second, Collector<KV<K, CoGbkResult>> out) throws Exception {
-    K k = null;
-    List<RawUnionValue> result = new ArrayList<>();
-    int index1 = schema.getIndex(tupleTag1);
-    for (KV<K,?> entry : first) {
-      k = entry.getKey();
-      result.add(new RawUnionValue(index1, entry.getValue()));
-    }
-    int index2 = schema.getIndex(tupleTag2);
-    for (KV<K,?> entry : second) {
-      k = entry.getKey();
-      result.add(new RawUnionValue(index2, entry.getValue()));
-    }
-    out.collect(KV.of(k, new CoGbkResult(schema, result)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCreateFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCreateFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCreateFunction.java
deleted file mode 100644
index 32efe41..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCreateFunction.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-
-import java.io.ByteArrayInputStream;
-import java.util.List;
-
-/**
- * This is a hack for transforming a {@link com.google.cloud.dataflow.sdk.transforms.Create}
- * operation. Flink does not allow {@code null} in it's equivalent operation:
- * {@link org.apache.flink.api.java.ExecutionEnvironment#fromElements(Object[])}. Therefore
- * we use a DataSource with one dummy element and output the elements of the Create operation
- * inside this FlatMap.
- */
-public class FlinkCreateFunction<IN, OUT> implements FlatMapFunction<IN, OUT> {
-
-  private final List<byte[]> elements;
-  private final Coder<OUT> coder;
-
-  public FlinkCreateFunction(List<byte[]> elements, Coder<OUT> coder) {
-    this.elements = elements;
-    this.coder = coder;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void flatMap(IN value, Collector<OUT> out) throws Exception {
-
-    for (byte[] element : elements) {
-      ByteArrayInputStream bai = new ByteArrayInputStream(element);
-      OUT outValue = coder.decode(bai, Coder.Context.OUTER);
-      if (outValue == null) {
-        // TODO Flink doesn't allow null values in records
-        out.collect((OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE);
-      } else {
-        out.collect(outValue);
-      }
-    }
-
-    out.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkDoFnFunction.java
deleted file mode 100644
index 9e8b7cd..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkDoFnFunction.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.TimerInternals;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.collect.ImmutableList;
-import org.apache.flink.api.common.functions.RichMapPartitionFunction;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Encapsulates a {@link com.google.cloud.dataflow.sdk.transforms.DoFn}
- * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
- */
-public class FlinkDoFnFunction<IN, OUT> extends RichMapPartitionFunction<IN, OUT> {
-
-  private final DoFn<IN, OUT> doFn;
-  private transient PipelineOptions options;
-
-  public FlinkDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions options) {
-    this.doFn = doFn;
-    this.options = options;
-  }
-
-  private void writeObject(ObjectOutputStream out)
-      throws IOException, ClassNotFoundException {
-    out.defaultWriteObject();
-    ObjectMapper mapper = new ObjectMapper();
-    mapper.writeValue(out, options);
-  }
-
-  private void readObject(ObjectInputStream in)
-      throws IOException, ClassNotFoundException {
-    in.defaultReadObject();
-    ObjectMapper mapper = new ObjectMapper();
-    options = mapper.readValue(in, PipelineOptions.class);
-  }
-
-  @Override
-  public void mapPartition(Iterable<IN> values, Collector<OUT> out) throws Exception {
-    ProcessContext context = new ProcessContext(doFn, out);
-    this.doFn.startBundle(context);
-    for (IN value : values) {
-      context.inValue = value;
-      doFn.processElement(context);
-    }
-    this.doFn.finishBundle(context);
-  }
-  
-  private class ProcessContext extends DoFn<IN, OUT>.ProcessContext {
-
-    IN inValue;
-    Collector<OUT> outCollector;
-
-    public ProcessContext(DoFn<IN, OUT> fn, Collector<OUT> outCollector) {
-      fn.super();
-      super.setupDelegateAggregators();
-      this.outCollector = outCollector;
-    }
-
-    @Override
-    public IN element() {
-      return this.inValue;
-    }
-
-
-    @Override
-    public Instant timestamp() {
-      return Instant.now();
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return GlobalWindow.INSTANCE;
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return PaneInfo.NO_FIRING;
-    }
-
-    @Override
-    public WindowingInternals<IN, OUT> windowingInternals() {
-      return new WindowingInternals<IN, OUT>() {
-        @Override
-        public StateInternals stateInternals() {
-          return null;
-        }
-
-        @Override
-        public void outputWindowedValue(OUT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return null;
-        }
-
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          return ImmutableList.of(GlobalWindow.INSTANCE);
-        }
-
-        @Override
-        public PaneInfo pane() {
-          return PaneInfo.NO_FIRING;
-        }
-
-        @Override
-        public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-          throw new RuntimeException("sideInput() not implemented.");
-        }
-      };
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      List<T> sideInput = getRuntimeContext().getBroadcastVariable(view.getTagInternal().getId());
-      List<WindowedValue<?>> windowedValueList = new ArrayList<>(sideInput.size());
-      for (T input : sideInput) {
-        windowedValueList.add(WindowedValue.of(input, Instant.now(), ImmutableList.of(GlobalWindow.INSTANCE), pane()));
-      }
-      return view.fromIterableInternal(windowedValueList);
-    }
-
-    @Override
-    public void output(OUT output) {
-      outCollector.collect(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OUT output, Instant timestamp) {
-      // not FLink's way, just output normally
-      output(output);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      // ignore the side output, this can happen when a user does not register
-      // side outputs but then outputs using a freshly created TupleTag.
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      sideOutput(tag, output);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = new SerializableFnAggregatorWrapper<>(combiner);
-      getRuntimeContext().addAccumulator(name, wrapper);
-      return wrapper;
-    }
-
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkKeyedListAggregationFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkKeyedListAggregationFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkKeyedListAggregationFunction.java
deleted file mode 100644
index 56fa88d..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkKeyedListAggregationFunction.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.cloud.dataflow.sdk.values.KV;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.Iterator;
-
-/**
- * Flink {@link org.apache.flink.api.common.functions.GroupReduceFunction} for executing a
- * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey} operation. This reads the input
- * {@link com.google.cloud.dataflow.sdk.values.KV} elements, extracts the key and collects
- * the values in a {@code List}.
- */
-public class FlinkKeyedListAggregationFunction<K,V> implements GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> {
-
-  @Override
-  public void reduce(Iterable<KV<K, V>> values, Collector<KV<K, Iterable<V>>> out) throws Exception {
-    Iterator<KV<K, V>> it = values.iterator();
-    KV<K, V> first = it.next();
-    Iterable<V> passThrough = new PassThroughIterable<>(first, it);
-    out.collect(KV.of(first.getKey(), passThrough));
-  }
-
-  private static class PassThroughIterable<K, V> implements Iterable<V>, Iterator<V>  {
-    private KV<K, V> first;
-    private Iterator<KV<K, V>> iterator;
-
-    public PassThroughIterable(KV<K, V> first, Iterator<KV<K, V>> iterator) {
-      this.first = first;
-      this.iterator = iterator;
-    }
-
-    @Override
-    public Iterator<V> iterator() {
-      return this;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return first != null || iterator.hasNext();
-    }
-
-    @Override
-    public V next() {
-      if (first != null) {
-        V result = first.getValue();
-        first = null;
-        return result;
-      } else {
-        return iterator.next().getValue();
-      }
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException("Cannot remove elements from input.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputDoFnFunction.java
deleted file mode 100644
index bd8a968..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.collect.ImmutableList;
-import org.apache.flink.api.common.functions.RichMapPartitionFunction;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Encapsulates a {@link com.google.cloud.dataflow.sdk.transforms.DoFn} that uses side outputs
- * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
- *
- * We get a mapping from {@link com.google.cloud.dataflow.sdk.values.TupleTag} to output index
- * and must tag all outputs with the output number. Afterwards a filter will filter out
- * those elements that are not to be in a specific output.
- */
-public class FlinkMultiOutputDoFnFunction<IN, OUT> extends RichMapPartitionFunction<IN, RawUnionValue> {
-
-  private final DoFn<IN, OUT> doFn;
-  private transient PipelineOptions options;
-  private final Map<TupleTag<?>, Integer> outputMap;
-
-  public FlinkMultiOutputDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions options, Map<TupleTag<?>, Integer> outputMap) {
-    this.doFn = doFn;
-    this.options = options;
-    this.outputMap = outputMap;
-  }
-
-  private void writeObject(ObjectOutputStream out)
-      throws IOException, ClassNotFoundException {
-    out.defaultWriteObject();
-    ObjectMapper mapper = new ObjectMapper();
-    mapper.writeValue(out, options);
-  }
-
-  private void readObject(ObjectInputStream in)
-      throws IOException, ClassNotFoundException {
-    in.defaultReadObject();
-    ObjectMapper mapper = new ObjectMapper();
-    options = mapper.readValue(in, PipelineOptions.class);
-
-  }
-
-  @Override
-  public void mapPartition(Iterable<IN> values, Collector<RawUnionValue> out) throws Exception {
-    ProcessContext context = new ProcessContext(doFn, out);
-    this.doFn.startBundle(context);
-    for (IN value : values) {
-      context.inValue = value;
-      doFn.processElement(context);
-    }
-    this.doFn.finishBundle(context);
-  }
-
-  private class ProcessContext extends DoFn<IN, OUT>.ProcessContext {
-
-    IN inValue;
-    Collector<RawUnionValue> outCollector;
-
-    public ProcessContext(DoFn<IN, OUT> fn, Collector<RawUnionValue> outCollector) {
-      fn.super();
-      this.outCollector = outCollector;
-    }
-
-    @Override
-    public IN element() {
-      return this.inValue;
-    }
-
-    @Override
-    public Instant timestamp() {
-      return Instant.now();
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return GlobalWindow.INSTANCE;
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return PaneInfo.NO_FIRING;
-    }
-
-    @Override
-    public WindowingInternals<IN, OUT> windowingInternals() {
-      return null;
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      List<T> sideInput = getRuntimeContext().getBroadcastVariable(view.getTagInternal()
-          .getId());
-      List<WindowedValue<?>> windowedValueList = new ArrayList<>(sideInput.size());
-      for (T input : sideInput) {
-        windowedValueList.add(WindowedValue.of(input, Instant.now(), ImmutableList.of(GlobalWindow.INSTANCE), pane()));
-      }
-      return view.fromIterableInternal(windowedValueList);
-    }
-
-    @Override
-    public void output(OUT value) {
-      // assume that index 0 is the default output
-      outCollector.collect(new RawUnionValue(0, value));
-    }
-
-    @Override
-    public void outputWithTimestamp(OUT output, Instant timestamp) {
-      // not FLink's way, just output normally
-      output(output);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public <T> void sideOutput(TupleTag<T> tag, T value) {
-      Integer index = outputMap.get(tag);
-      if (index != null) {
-        outCollector.collect(new RawUnionValue(index, value));
-      }
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      sideOutput(tag, output);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = new SerializableFnAggregatorWrapper<>(combiner);
-      getRuntimeContext().addAccumulator(name, wrapper);
-      return null;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputPruningFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputPruningFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputPruningFunction.java
deleted file mode 100644
index 3e1cb65..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputPruningFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-
-/**
- * A FlatMap function that filters out those elements that don't belong in this output. We need
- * this to implement MultiOutput ParDo functions.
- */
-public class FlinkMultiOutputPruningFunction<T> implements FlatMapFunction<RawUnionValue, T> {
-
-  private final int outputTag;
-
-  public FlinkMultiOutputPruningFunction(int outputTag) {
-    this.outputTag = outputTag;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void flatMap(RawUnionValue rawUnionValue, Collector<T> collector) throws Exception {
-    if (rawUnionValue.getUnionTag() == outputTag) {
-      collector.collect((T) rawUnionValue.getValue());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkPartialReduceFunction.java
deleted file mode 100644
index 1ff06ba..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkPartialReduceFunction.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.values.KV;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.Iterator;
-
-/**
- * Flink {@link org.apache.flink.api.common.functions.GroupCombineFunction} for executing a
- * {@link com.google.cloud.dataflow.sdk.transforms.Combine.PerKey} operation. This reads the input
- * {@link com.google.cloud.dataflow.sdk.values.KV} elements VI, extracts the key and emits accumulated
- * values which have the intermediate format VA.
- */
-public class FlinkPartialReduceFunction<K, VI, VA> implements GroupCombineFunction<KV<K, VI>, KV<K, VA>> {
-
-  private final Combine.KeyedCombineFn<K, VI, VA, ?> keyedCombineFn;
-
-  public FlinkPartialReduceFunction(Combine.KeyedCombineFn<K, VI, VA, ?>
-                                        keyedCombineFn) {
-    this.keyedCombineFn = keyedCombineFn;
-  }
-
-  @Override
-  public void combine(Iterable<KV<K, VI>> elements, Collector<KV<K, VA>> out) throws Exception {
-
-    final Iterator<KV<K, VI>> iterator = elements.iterator();
-    // create accumulator using the first elements key
-    KV<K, VI> first = iterator.next();
-    K key = first.getKey();
-    VI value = first.getValue();
-    VA accumulator = keyedCombineFn.createAccumulator(key);
-    accumulator = keyedCombineFn.addInput(key, accumulator, value);
-
-    while(iterator.hasNext()) {
-      value = iterator.next().getValue();
-      accumulator = keyedCombineFn.addInput(key, accumulator, value);
-    }
-
-    out.collect(KV.of(key, accumulator));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkReduceFunction.java
deleted file mode 100644
index 580ac01..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkReduceFunction.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.collect.ImmutableList;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.Iterator;
-
-/**
- * Flink {@link org.apache.flink.api.common.functions.GroupReduceFunction} for executing a
- * {@link com.google.cloud.dataflow.sdk.transforms.Combine.PerKey} operation. This reads the input
- * {@link com.google.cloud.dataflow.sdk.values.KV} elements, extracts the key and merges the
- * accumulators resulting from the PartialReduce which produced the input VA.
- */
-public class FlinkReduceFunction<K, VA, VO> implements GroupReduceFunction<KV<K, VA>, KV<K, VO>> {
-
-  private final Combine.KeyedCombineFn<K, ?, VA, VO> keyedCombineFn;
-
-  public FlinkReduceFunction(Combine.KeyedCombineFn<K, ?, VA, VO> keyedCombineFn) {
-    this.keyedCombineFn = keyedCombineFn;
-  }
-
-  @Override
-  public void reduce(Iterable<KV<K, VA>> values, Collector<KV<K, VO>> out) throws Exception {
-    Iterator<KV<K, VA>> it = values.iterator();
-
-    KV<K, VA> current = it.next();
-    K k = current.getKey();
-    VA accumulator = current.getValue();
-
-    while (it.hasNext()) {
-      current = it.next();
-      keyedCombineFn.mergeAccumulators(k, ImmutableList.of(accumulator, current.getValue()) );
-    }
-
-    out.collect(KV.of(k, keyedCombineFn.extractOutput(k, accumulator)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java
deleted file mode 100644
index 05f4415..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/UnionCoder.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.beam.runners.flink.translation.functions;
-
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.StandardCoder;
-import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-
-/**
- * A UnionCoder encodes RawUnionValues.
- *
- * This file copied from {@link com.google.cloud.dataflow.sdk.transforms.join.UnionCoder}
- */
-@SuppressWarnings("serial")
-public class UnionCoder extends StandardCoder<RawUnionValue> {
-  // TODO: Think about how to integrate this with a schema object (i.e.
-  // a tuple of tuple tags).
-  /**
-   * Builds a union coder with the given list of element coders.  This list
-   * corresponds to a mapping of union tag to Coder.  Union tags start at 0.
-   */
-  public static UnionCoder of(List<Coder<?>> elementCoders) {
-    return new UnionCoder(elementCoders);
-  }
-
-  @JsonCreator
-  public static UnionCoder jsonOf(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> elements) {
-    return UnionCoder.of(elements);
-  }
-
-  private int getIndexForEncoding(RawUnionValue union) {
-    if (union == null) {
-      throw new IllegalArgumentException("cannot encode a null tagged union");
-    }
-    int index = union.getUnionTag();
-    if (index < 0 || index >= elementCoders.size()) {
-      throw new IllegalArgumentException(
-          "union value index " + index + " not in range [0.." +
-              (elementCoders.size() - 1) + "]");
-    }
-    return index;
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void encode(
-      RawUnionValue union,
-      OutputStream outStream,
-      Context context)
-      throws IOException  {
-    int index = getIndexForEncoding(union);
-    // Write out the union tag.
-    VarInt.encode(index, outStream);
-
-    // Write out the actual value.
-    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-    coder.encode(
-        union.getValue(),
-        outStream,
-        context);
-  }
-
-  @Override
-  public RawUnionValue decode(InputStream inStream, Context context)
-      throws IOException {
-    int index = VarInt.decodeInt(inStream);
-    Object value = elementCoders.get(index).decode(inStream, context);
-    return new RawUnionValue(index, value);
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return null;
-  }
-
-  @Override
-  public List<? extends Coder<?>> getComponents() {
-    return elementCoders;
-  }
-
-  /**
-   * Since this coder uses elementCoders.get(index) and coders that are known to run in constant
-   * time, we defer the return value to that coder.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context context) {
-    int index = getIndexForEncoding(union);
-    @SuppressWarnings("unchecked")
-    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-    return coder.isRegisterByteSizeObserverCheap(union.getValue(), context);
-  }
-
-  /**
-   * Notifies ElementByteSizeObserver about the byte size of the encoded value using this coder.
-   */
-  @Override
-  public void registerByteSizeObserver(
-      RawUnionValue union, ElementByteSizeObserver observer, Context context)
-      throws Exception {
-    int index = getIndexForEncoding(union);
-    // Write out the union tag.
-    observer.update(VarInt.getLength(index));
-    // Write out the actual value.
-    @SuppressWarnings("unchecked")
-    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-    coder.registerByteSizeObserver(union.getValue(), observer, context);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private final List<Coder<?>> elementCoders;
-
-  private UnionCoder(List<Coder<?>> elementCoders) {
-    this.elementCoders = elementCoders;
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(
-        "UnionCoder is only deterministic if all element coders are",
-        elementCoders);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java
deleted file mode 100644
index ecfb95d..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderComparator.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-
-/**
- * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for
- * {@link com.google.cloud.dataflow.sdk.coders.Coder}.
- */
-public class CoderComparator<T> extends TypeComparator<T> {
-
-  private Coder<T> coder;
-
-  // We use these for internal encoding/decoding for creating copies and comparing
-  // serialized forms using a Coder
-  private transient InspectableByteArrayOutputStream buffer1;
-  private transient InspectableByteArrayOutputStream buffer2;
-
-  // For storing the Reference in encoded form
-  private transient InspectableByteArrayOutputStream referenceBuffer;
-
-  public CoderComparator(Coder<T> coder) {
-    this.coder = coder;
-    buffer1 = new InspectableByteArrayOutputStream();
-    buffer2 = new InspectableByteArrayOutputStream();
-    referenceBuffer = new InspectableByteArrayOutputStream();
-  }
-
-  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-    in.defaultReadObject();
-    buffer1 = new InspectableByteArrayOutputStream();
-    buffer2 = new InspectableByteArrayOutputStream();
-    referenceBuffer = new InspectableByteArrayOutputStream();
-  }
-
-  @Override
-  public int hash(T record) {
-    return record.hashCode();
-  }
-
-  @Override
-  public void setReference(T toCompare) {
-    referenceBuffer.reset();
-    try {
-      coder.encode(toCompare, referenceBuffer, Coder.Context.OUTER);
-    } catch (IOException e) {
-      throw new RuntimeException("Could not set reference " + toCompare + ": " + e);
-    }
-  }
-
-  @Override
-  public boolean equalToReference(T candidate) {
-    try {
-      buffer2.reset();
-      coder.encode(candidate, buffer2, Coder.Context.OUTER);
-      byte[] arr = referenceBuffer.getBuffer();
-      byte[] arrOther = buffer2.getBuffer();
-      if (referenceBuffer.size() != buffer2.size()) {
-        return false;
-      }
-      int len = buffer2.size();
-      for(int i = 0; i < len; i++ ) {
-        if (arr[i] != arrOther[i]) {
-          return false;
-        }
-      }
-      return true;
-    } catch (IOException e) {
-      throw new RuntimeException("Could not compare reference.", e);
-    }
-  }
-
-  @Override
-  public int compareToReference(TypeComparator<T> other) {
-    InspectableByteArrayOutputStream otherReferenceBuffer = ((CoderComparator<T>) other).referenceBuffer;
-
-    byte[] arr = referenceBuffer.getBuffer();
-    byte[] arrOther = otherReferenceBuffer.getBuffer();
-    if (referenceBuffer.size() != otherReferenceBuffer.size()) {
-      return referenceBuffer.size() - otherReferenceBuffer.size();
-    }
-    int len = referenceBuffer.size();
-    for (int i = 0; i < len; i++) {
-      if (arr[i] != arrOther[i]) {
-        return arr[i] - arrOther[i];
-      }
-    }
-    return 0;
-  }
-
-  @Override
-  public int compare(T first, T second) {
-    try {
-      buffer1.reset();
-      buffer2.reset();
-      coder.encode(first, buffer1, Coder.Context.OUTER);
-      coder.encode(second, buffer2, Coder.Context.OUTER);
-      byte[] arr = buffer1.getBuffer();
-      byte[] arrOther = buffer2.getBuffer();
-      if (buffer1.size() != buffer2.size()) {
-        return buffer1.size() - buffer2.size();
-      }
-      int len = buffer1.size();
-      for(int i = 0; i < len; i++ ) {
-        if (arr[i] != arrOther[i]) {
-          return arr[i] - arrOther[i];
-        }
-      }
-      return 0;
-    } catch (IOException e) {
-      throw new RuntimeException("Could not compare: ", e);
-    }
-  }
-
-  @Override
-  public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
-    CoderTypeSerializer<T> serializer = new CoderTypeSerializer<>(coder);
-    T first = serializer.deserialize(firstSource);
-    T second = serializer.deserialize(secondSource);
-    return compare(first, second);
-  }
-
-  @Override
-  public boolean supportsNormalizedKey() {
-    return true;
-  }
-
-  @Override
-  public boolean supportsSerializationWithKeyNormalization() {
-    return false;
-  }
-
-  @Override
-  public int getNormalizeKeyLen() {
-    return Integer.MAX_VALUE;
-  }
-
-  @Override
-  public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-    return true;
-  }
-
-  @Override
-  public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
-    buffer1.reset();
-    try {
-      coder.encode(record, buffer1, Coder.Context.OUTER);
-    } catch (IOException e) {
-      throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e);
-    }
-    final byte[] data = buffer1.getBuffer();
-    final int limit = offset + numBytes;
-
-    target.put(offset, data, 0, Math.min(numBytes, buffer1.size()));
-
-    offset += buffer1.size();
-
-    while (offset < limit) {
-      target.put(offset++, (byte) 0);
-    }
-  }
-
-  @Override
-  public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean invertNormalizedKey() {
-    return false;
-  }
-
-  @Override
-  public TypeComparator<T> duplicate() {
-    return new CoderComparator<>(coder);
-  }
-
-  @Override
-  public int extractKeys(Object record, Object[] target, int index) {
-    target[index] = record;
-    return 1;
-  }
-
-  @Override
-  public TypeComparator[] getFlatComparators() {
-    return new TypeComparator[] { this.duplicate() };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java
deleted file mode 100644
index 8880b48..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeInformation.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import com.google.common.base.Preconditions;
-
-/**
- * Flink {@link org.apache.flink.api.common.typeinfo.TypeInformation} for
- * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s.
- */
-public class CoderTypeInformation<T> extends TypeInformation<T> implements AtomicType<T> {
-
-  private final Coder<T> coder;
-
-  public CoderTypeInformation(Coder<T> coder) {
-    Preconditions.checkNotNull(coder);
-    this.coder = coder;
-  }
-
-  @Override
-  public boolean isBasicType() {
-    return false;
-  }
-
-  @Override
-  public boolean isTupleType() {
-    return false;
-  }
-
-  @Override
-  public int getArity() {
-    return 1;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public Class<T> getTypeClass() {
-    // We don't have the Class, so we have to pass null here. What a shame...
-    return (Class<T>) Object.class;
-  }
-
-  @Override
-  public boolean isKeyType() {
-    return true;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-    if (coder instanceof VoidCoder) {
-      return (TypeSerializer<T>) new VoidCoderTypeSerializer();
-    }
-    return new CoderTypeSerializer<>(coder);
-  }
-
-  @Override
-  public int getTotalFields() {
-    return 2;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    CoderTypeInformation that = (CoderTypeInformation) o;
-
-    return coder.equals(that.coder);
-
-  }
-
-  @Override
-  public int hashCode() {
-    return coder.hashCode();
-  }
-
-  @Override
-  public boolean canEqual(Object obj) {
-    return obj instanceof CoderTypeInformation;
-  }
-
-  @Override
-  public String toString() {
-    return "CoderTypeInformation{" +
-        "coder=" + coder +
-        '}';
-  }
-
-  @Override
-  public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig
-      executionConfig) {
-    return new CoderComparator<>(coder);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java
deleted file mode 100644
index 481ee31..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/CoderTypeSerializer.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
-import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.ByteArrayInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-
-/**
- * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for
- * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s
- */
-public class CoderTypeSerializer<T> extends TypeSerializer<T> {
-  
-  private Coder<T> coder;
-  private transient DataInputViewWrapper inputWrapper;
-  private transient DataOutputViewWrapper outputWrapper;
-
-  // We use this for internal encoding/decoding for creating copies using the Coder.
-  private transient InspectableByteArrayOutputStream buffer;
-
-  public CoderTypeSerializer(Coder<T> coder) {
-    this.coder = coder;
-    this.inputWrapper = new DataInputViewWrapper(null);
-    this.outputWrapper = new DataOutputViewWrapper(null);
-
-    buffer = new InspectableByteArrayOutputStream();
-  }
-  
-  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-    in.defaultReadObject();
-    this.inputWrapper = new DataInputViewWrapper(null);
-    this.outputWrapper = new DataOutputViewWrapper(null);
-
-    buffer = new InspectableByteArrayOutputStream();
-  }
-  
-  @Override
-  public boolean isImmutableType() {
-    return false;
-  }
-
-  @Override
-  public CoderTypeSerializer<T> duplicate() {
-    return new CoderTypeSerializer<>(coder);
-  }
-
-  @Override
-  public T createInstance() {
-    return null;
-  }
-
-  @Override
-  public T copy(T t) {
-    buffer.reset();
-    try {
-      coder.encode(t, buffer, Coder.Context.OUTER);
-    } catch (IOException e) {
-      throw new RuntimeException("Could not copy.", e);
-    }
-    try {
-      return coder.decode(new ByteArrayInputStream(buffer.getBuffer(), 0, buffer
-          .size()), Coder.Context.OUTER);
-    } catch (IOException e) {
-      throw new RuntimeException("Could not copy.", e);
-    }
-  }
-
-  @Override
-  public T copy(T t, T reuse) {
-    return copy(t);
-  }
-
-  @Override
-  public int getLength() {
-    return 0;
-  }
-
-  @Override
-  public void serialize(T t, DataOutputView dataOutputView) throws IOException {
-    outputWrapper.setOutputView(dataOutputView);
-    coder.encode(t, outputWrapper, Coder.Context.NESTED);
-  }
-
-  @Override
-  public T deserialize(DataInputView dataInputView) throws IOException {
-    try {
-      inputWrapper.setInputView(dataInputView);
-      return coder.decode(inputWrapper, Coder.Context.NESTED);
-    } catch (CoderException e) {
-      Throwable cause = e.getCause();
-      if (cause instanceof EOFException) {
-        throw (EOFException) cause;
-      } else {
-        throw e;
-      }
-    }
-  }
-
-  @Override
-  public T deserialize(T t, DataInputView dataInputView) throws IOException {
-    return deserialize(dataInputView);
-  }
-
-  @Override
-  public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
-    serialize(deserialize(dataInputView), dataOutputView);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    CoderTypeSerializer that = (CoderTypeSerializer) o;
-    return coder.equals(that.coder);
-  }
-
-  @Override
-  public boolean canEqual(Object obj) {
-    return obj instanceof CoderTypeSerializer;
-  }
-
-  @Override
-  public int hashCode() {
-    return coder.hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java
deleted file mode 100644
index 619fa55..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/InspectableByteArrayOutputStream.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import java.io.ByteArrayOutputStream;
-
-/**
- * Version of {@link java.io.ByteArrayOutputStream} that allows to retrieve the internal
- * byte[] buffer without incurring an array copy.
- */
-public class InspectableByteArrayOutputStream extends ByteArrayOutputStream {
-
-  /**
-   * Get the underlying byte array.
-   */
-  public byte[] getBuffer() {
-    return buf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java
deleted file mode 100644
index 4599c6a..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderComperator.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.values.KV;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-
-/**
- * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for
- * {@link com.google.cloud.dataflow.sdk.coders.KvCoder}. We have a special comparator
- * for {@link KV} that always compares on the key only.
- */
-public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> {
-  
-  private KvCoder<K, V> coder;
-  private Coder<K> keyCoder;
-
-  // We use these for internal encoding/decoding for creating copies and comparing
-  // serialized forms using a Coder
-  private transient InspectableByteArrayOutputStream buffer1;
-  private transient InspectableByteArrayOutputStream buffer2;
-
-  // For storing the Reference in encoded form
-  private transient InspectableByteArrayOutputStream referenceBuffer;
-
-
-  // For deserializing the key
-  private transient DataInputViewWrapper inputWrapper;
-
-  public KvCoderComperator(KvCoder<K, V> coder) {
-    this.coder = coder;
-    this.keyCoder = coder.getKeyCoder();
-
-    buffer1 = new InspectableByteArrayOutputStream();
-    buffer2 = new InspectableByteArrayOutputStream();
-    referenceBuffer = new InspectableByteArrayOutputStream();
-
-    inputWrapper = new DataInputViewWrapper(null);
-  }
-
-  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-    in.defaultReadObject();
-
-    buffer1 = new InspectableByteArrayOutputStream();
-    buffer2 = new InspectableByteArrayOutputStream();
-    referenceBuffer = new InspectableByteArrayOutputStream();
-
-    inputWrapper = new DataInputViewWrapper(null);
-  }
-
-  @Override
-  public int hash(KV<K, V> record) {
-    K key = record.getKey();
-    if (key != null) {
-      return key.hashCode();
-    } else {
-      return 0;
-    }
-  }
-
-  @Override
-  public void setReference(KV<K, V> toCompare) {
-    referenceBuffer.reset();
-    try {
-      keyCoder.encode(toCompare.getKey(), referenceBuffer, Coder.Context.OUTER);
-    } catch (IOException e) {
-      throw new RuntimeException("Could not set reference " + toCompare + ": " + e);
-    }
-  }
-
-  @Override
-  public boolean equalToReference(KV<K, V> candidate) {
-    try {
-      buffer2.reset();
-      keyCoder.encode(candidate.getKey(), buffer2, Coder.Context.OUTER);
-      byte[] arr = referenceBuffer.getBuffer();
-      byte[] arrOther = buffer2.getBuffer();
-      if (referenceBuffer.size() != buffer2.size()) {
-        return false;
-      }
-      int len = buffer2.size();
-      for(int i = 0; i < len; i++ ) {
-        if (arr[i] != arrOther[i]) {
-          return false;
-        }
-      }
-      return true;
-    } catch (IOException e) {
-      throw new RuntimeException("Could not compare reference.", e);
-    }
-  }
-
-  @Override
-  public int compareToReference(TypeComparator<KV<K, V>> other) {
-    InspectableByteArrayOutputStream otherReferenceBuffer = ((KvCoderComperator<K, V>) other).referenceBuffer;
-
-    byte[] arr = referenceBuffer.getBuffer();
-    byte[] arrOther = otherReferenceBuffer.getBuffer();
-    if (referenceBuffer.size() != otherReferenceBuffer.size()) {
-      return referenceBuffer.size() - otherReferenceBuffer.size();
-    }
-    int len = referenceBuffer.size();
-    for (int i = 0; i < len; i++) {
-      if (arr[i] != arrOther[i]) {
-        return arr[i] - arrOther[i];
-      }
-    }
-    return 0;
-  }
-
-
-  @Override
-  public int compare(KV<K, V> first, KV<K, V> second) {
-    try {
-      buffer1.reset();
-      buffer2.reset();
-      keyCoder.encode(first.getKey(), buffer1, Coder.Context.OUTER);
-      keyCoder.encode(second.getKey(), buffer2, Coder.Context.OUTER);
-      byte[] arr = buffer1.getBuffer();
-      byte[] arrOther = buffer2.getBuffer();
-      if (buffer1.size() != buffer2.size()) {
-        return buffer1.size() - buffer2.size();
-      }
-      int len = buffer1.size();
-      for(int i = 0; i < len; i++ ) {
-        if (arr[i] != arrOther[i]) {
-          return arr[i] - arrOther[i];
-        }
-      }
-      return 0;
-    } catch (IOException e) {
-      throw new RuntimeException("Could not compare reference.", e);
-    }
-  }
-
-  @Override
-  public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
-
-    inputWrapper.setInputView(firstSource);
-    K firstKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED);
-    inputWrapper.setInputView(secondSource);
-    K secondKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED);
-
-    try {
-      buffer1.reset();
-      buffer2.reset();
-      keyCoder.encode(firstKey, buffer1, Coder.Context.OUTER);
-      keyCoder.encode(secondKey, buffer2, Coder.Context.OUTER);
-      byte[] arr = buffer1.getBuffer();
-      byte[] arrOther = buffer2.getBuffer();
-      if (buffer1.size() != buffer2.size()) {
-        return buffer1.size() - buffer2.size();
-      }
-      int len = buffer1.size();
-      for(int i = 0; i < len; i++ ) {
-        if (arr[i] != arrOther[i]) {
-          return arr[i] - arrOther[i];
-        }
-      }
-      return 0;
-    } catch (IOException e) {
-      throw new RuntimeException("Could not compare reference.", e);
-    }
-  }
-
-  @Override
-  public boolean supportsNormalizedKey() {
-    return true;
-  }
-
-  @Override
-  public boolean supportsSerializationWithKeyNormalization() {
-    return false;
-  }
-
-  @Override
-  public int getNormalizeKeyLen() {
-    return Integer.MAX_VALUE;
-  }
-
-  @Override
-  public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
-    return true;
-  }
-
-  @Override
-  public void putNormalizedKey(KV<K, V> record, MemorySegment target, int offset, int numBytes) {
-    buffer1.reset();
-    try {
-      keyCoder.encode(record.getKey(), buffer1, Coder.Context.NESTED);
-    } catch (IOException e) {
-      throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e);
-    }
-    final byte[] data = buffer1.getBuffer();
-    final int limit = offset + numBytes;
-
-    int numBytesPut = Math.min(numBytes, buffer1.size());
-
-    target.put(offset, data, 0, numBytesPut);
-
-    offset += numBytesPut;
-
-    while (offset < limit) {
-      target.put(offset++, (byte) 0);
-    }
-  }
-
-  @Override
-  public void writeWithKeyNormalization(KV<K, V> record, DataOutputView target) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public KV<K, V> readWithKeyDenormalization(KV<K, V> reuse, DataInputView source) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean invertNormalizedKey() {
-    return false;
-  }
-
-  @Override
-  public TypeComparator<KV<K, V>> duplicate() {
-    return new KvCoderComperator<>(coder);
-  }
-
-  @Override
-  public int extractKeys(Object record, Object[] target, int index) {
-    KV<K, V> kv = (KV<K, V>) record;
-    K k = kv.getKey();
-    target[index] = k;
-    return 1;
-  }
-
-  @Override
-  public TypeComparator[] getFlatComparators() {
-    return new TypeComparator[] {new CoderComparator<>(keyCoder)};
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java
deleted file mode 100644
index 7a0d999..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/KvCoderTypeInformation.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.values.KV;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import com.google.common.base.Preconditions;
-
-import java.util.List;
-
-/**
- * Flink {@link org.apache.flink.api.common.typeinfo.TypeInformation} for
- * Dataflow {@link com.google.cloud.dataflow.sdk.coders.KvCoder}.
- */
-public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> {
-
-  private KvCoder<K, V> coder;
-
-  // We don't have the Class, so we have to pass null here. What a shame...
-  private static Object DUMMY = new Object();
-
-  @SuppressWarnings("unchecked")
-  public KvCoderTypeInformation(KvCoder<K, V> coder) {
-    super(((Class<KV<K,V>>) DUMMY.getClass()));
-    this.coder = coder;
-    Preconditions.checkNotNull(coder);
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public TypeComparator<KV<K, V>> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset, ExecutionConfig config) {
-    return new KvCoderComperator((KvCoder) coder);
-  }
-
-  @Override
-  public boolean isBasicType() {
-    return false;
-  }
-
-  @Override
-  public boolean isTupleType() {
-    return false;
-  }
-
-  @Override
-  public int getArity() {
-    return 2;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public Class<KV<K, V>> getTypeClass() {
-    return privateGetTypeClass();
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <X> Class<X> privateGetTypeClass() {
-    return (Class<X>) Object.class;
-  }
-
-  @Override
-  public boolean isKeyType() {
-    return true;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public TypeSerializer<KV<K, V>> createSerializer(ExecutionConfig config) {
-    return new CoderTypeSerializer<>(coder);
-  }
-
-  @Override
-  public int getTotalFields() {
-    return 2;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    KvCoderTypeInformation that = (KvCoderTypeInformation) o;
-
-    return coder.equals(that.coder);
-
-  }
-
-  @Override
-  public int hashCode() {
-    return coder.hashCode();
-  }
-
-  @Override
-  public String toString() {
-    return "CoderTypeInformation{" +
-        "coder=" + coder +
-        '}';
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public <X> TypeInformation<X> getTypeAt(int pos) {
-    if (pos == 0) {
-      return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder());
-    } else if (pos == 1) {
-      return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder());
-    } else {
-      throw new RuntimeException("Invalid field position " + pos);
-    }
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
-    switch (fieldExpression) {
-      case "key":
-        return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder());
-      case "value":
-        return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder());
-      default:
-        throw new UnsupportedOperationException("Only KvCoder has fields.");
-    }
-  }
-
-  @Override
-  public String[] getFieldNames() {
-    return new String[]{"key", "value"};
-  }
-
-  @Override
-  public int getFieldIndex(String fieldName) {
-    switch (fieldName) {
-      case "key":
-        return 0;
-      case "value":
-        return 1;
-      default:
-        return -1;
-    }
-  }
-
-  @Override
-  public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
-      CoderTypeInformation keyTypeInfo = new CoderTypeInformation<>(coder.getKeyCoder());
-      result.add(new FlatFieldDescriptor(0, keyTypeInfo));
-  }
-
-  @Override
-  protected TypeComparatorBuilder<KV<K, V>> createTypeComparatorBuilder() {
-    return new KvCoderTypeComparatorBuilder();
-  }
-
-  private class KvCoderTypeComparatorBuilder implements TypeComparatorBuilder<KV<K, V>> {
-
-    @Override
-    public void initializeTypeComparatorBuilder(int size) {}
-
-    @Override
-    public void addComparatorField(int fieldId, TypeComparator<?> comparator) {}
-
-    @Override
-    public TypeComparator<KV<K, V>> createTypeComparator(ExecutionConfig config) {
-      return new KvCoderComperator<>(coder);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java
deleted file mode 100644
index c7b6ea2..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/types/VoidCoderTypeSerializer.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-
-/**
- * Special Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for
- * {@link com.google.cloud.dataflow.sdk.coders.VoidCoder}. We need this because Flink does not
- * allow returning {@code null} from an input reader. We return a {@link VoidValue} instead
- * that behaves like a {@code null}, hopefully.
- */
-public class VoidCoderTypeSerializer extends TypeSerializer<VoidCoderTypeSerializer.VoidValue> {
-
-  @Override
-  public boolean isImmutableType() {
-    return false;
-  }
-
-  @Override
-  public VoidCoderTypeSerializer duplicate() {
-    return this;
-  }
-
-  @Override
-  public VoidValue createInstance() {
-    return VoidValue.INSTANCE;
-  }
-
-  @Override
-  public VoidValue copy(VoidValue from) {
-    return from;
-  }
-
-  @Override
-  public VoidValue copy(VoidValue from, VoidValue reuse) {
-    return from;
-  }
-
-  @Override
-  public int getLength() {
-    return 0;
-  }
-
-  @Override
-  public void serialize(VoidValue record, DataOutputView target) throws IOException {
-    target.writeByte(1);
-  }
-
-  @Override
-  public VoidValue deserialize(DataInputView source) throws IOException {
-    source.readByte();
-    return VoidValue.INSTANCE;
-  }
-
-  @Override
-  public VoidValue deserialize(VoidValue reuse, DataInputView source) throws IOException {
-    return deserialize(source);
-  }
-
-  @Override
-  public void copy(DataInputView source, DataOutputView target) throws IOException {
-    source.readByte();
-    target.writeByte(1);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof VoidCoderTypeSerializer) {
-      VoidCoderTypeSerializer other = (VoidCoderTypeSerializer) obj;
-      return other.canEqual(this);
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public boolean canEqual(Object obj) {
-    return obj instanceof VoidCoderTypeSerializer;
-  }
-
-  @Override
-  public int hashCode() {
-    return 0;
-  }
-
-  public static class VoidValue {
-    private VoidValue() {}
-    
-    public static VoidValue INSTANCE = new VoidValue();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java
deleted file mode 100644
index 815765c..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/CombineFnAggregatorWrapper.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.accumulators.Accumulator;
-
-import java.io.Serializable;
-
-/**
- * Wrapper that wraps a {@link com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn}
- * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using
- * the combine function as an aggregator in a {@link com.google.cloud.dataflow.sdk.transforms.ParDo}
- * operation.
- */
-public class CombineFnAggregatorWrapper<AI, AA, AR> implements Aggregator<AI, AR>, Accumulator<AI, Serializable> {
-  
-  private AA aa;
-  private Combine.CombineFn<? super AI, AA, AR> combiner;
-
-  public CombineFnAggregatorWrapper() {
-  }
-
-  public CombineFnAggregatorWrapper(Combine.CombineFn<? super AI, AA, AR> combiner) {
-    this.combiner = combiner;
-    this.aa = combiner.createAccumulator();
-  }
-
-  @Override
-  public void add(AI value) {
-    combiner.addInput(aa, value);
-  }
-
-  @Override
-  public Serializable getLocalValue() {
-    return (Serializable) combiner.extractOutput(aa);
-  }
-
-  @Override
-  public void resetLocal() {
-    aa = combiner.createAccumulator();
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void merge(Accumulator<AI, Serializable> other) {
-    aa = combiner.mergeAccumulators(Lists.newArrayList(aa, ((CombineFnAggregatorWrapper<AI, AA, AR>)other).aa));
-  }
-
-  @Override
-  public Accumulator<AI, Serializable> clone() {
-    // copy it by merging
-    AA aaCopy = combiner.mergeAccumulators(Lists.newArrayList(aa));
-    CombineFnAggregatorWrapper<AI, AA, AR> result = new
-        CombineFnAggregatorWrapper<>(combiner);
-    result.aa = aaCopy;
-    return result;
-  }
-
-  @Override
-  public void addValue(AI value) {
-    add(value);
-  }
-
-  @Override
-  public String getName() {
-    return "CombineFn: " + combiner.toString();
-  }
-
-  @Override
-  public Combine.CombineFn getCombineFn() {
-    return combiner;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java
deleted file mode 100644
index b56a90e..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataInputViewWrapper.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import org.apache.flink.core.memory.DataInputView;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Wrapper for {@link DataInputView}. We need this because Flink reads data using a
- * {@link org.apache.flink.core.memory.DataInputView} while
- * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s expect an
- * {@link java.io.InputStream}.
- */
-public class DataInputViewWrapper extends InputStream {
-
-  private DataInputView inputView;
-
-  public DataInputViewWrapper(DataInputView inputView) {
-    this.inputView = inputView;
-  }
-
-  public void setInputView(DataInputView inputView) {
-    this.inputView = inputView;
-  }
-
-  @Override
-  public int read() throws IOException {
-    try {
-      return inputView.readUnsignedByte();
-    } catch (EOFException e) {
-      // translate between DataInput and InputStream,
-      // DataInput signals EOF by exception, InputStream does it by returning -1
-      return -1;
-    }
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) throws IOException {
-    return inputView.read(b, off, len);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java
deleted file mode 100644
index 513d7f8..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/DataOutputViewWrapper.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Wrapper for {@link org.apache.flink.core.memory.DataOutputView}. We need this because
- * Flink writes data using a {@link org.apache.flink.core.memory.DataInputView} while
- * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s expect an
- * {@link java.io.OutputStream}.
- */
-public class DataOutputViewWrapper extends OutputStream {
-  
-  private DataOutputView outputView;
-
-  public DataOutputViewWrapper(DataOutputView outputView) {
-    this.outputView = outputView;
-  }
-
-  public void setOutputView(DataOutputView outputView) {
-    this.outputView = outputView;
-  }
-
-  @Override
-  public void write(int b) throws IOException {
-    outputView.write(b);
-  }
-
-  @Override
-  public void write(byte[] b, int off, int len) throws IOException {
-    outputView.write(b, off, len);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java
deleted file mode 100644
index 0d03f9f..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/SerializableFnAggregatorWrapper.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.accumulators.Accumulator;
-
-import java.io.Serializable;
-
-/**
- * Wrapper that wraps a {@link com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn}
- * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using
- * the function as an aggregator in a {@link com.google.cloud.dataflow.sdk.transforms.ParDo}
- * operation.
- */
-public class SerializableFnAggregatorWrapper<AI, AO> implements Aggregator<AI, AO>, Accumulator<AI, Serializable> {
-
-  private AO aa;
-  private Combine.CombineFn<AI, ?, AO> combiner;
-
-  public SerializableFnAggregatorWrapper(Combine.CombineFn<AI, ?, AO> combiner) {
-    this.combiner = combiner;
-    resetLocal();
-  }
-  
-  @Override
-  @SuppressWarnings("unchecked")
-  public void add(AI value) {
-    this.aa = combiner.apply(ImmutableList.of((AI) aa, value));
-  }
-
-  @Override
-  public Serializable getLocalValue() {
-    return (Serializable) aa;
-  }
-
-  @Override
-  public void resetLocal() {
-    this.aa = combiner.apply(ImmutableList.<AI>of());
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void merge(Accumulator<AI, Serializable> other) {
-    this.aa = combiner.apply(ImmutableList.of((AI) aa, (AI) other.getLocalValue()));
-  }
-
-  @Override
-  public void addValue(AI value) {
-    add(value);
-  }
-
-  @Override
-  public String getName() {
-    return "Aggregator :" + combiner.toString();
-  }
-
-  @Override
-  public Combine.CombineFn<AI, ?, AO> getCombineFn() {
-    return combiner;
-  }
-
-  @Override
-  public Accumulator<AI, Serializable> clone() {
-    // copy it by merging
-    AO resultCopy = combiner.apply(Lists.newArrayList((AI) aa));
-    SerializableFnAggregatorWrapper<AI, AO> result = new
-        SerializableFnAggregatorWrapper<>(combiner);
-
-    result.aa = resultCopy;
-    return result;
-  }
-}