You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/05/20 07:15:21 UTC

[03/14] incubator-beam git commit: [BEAM-270] Support Timestamps/Windows in Flink Batch

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index 43e458f..9cbc6b9 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -17,43 +17,179 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
-import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
+import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 
-import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 /**
- * Flink {@link org.apache.flink.api.common.functions.GroupReduceFunction} for executing a
- * {@link org.apache.beam.sdk.transforms.Combine.PerKey} operation. This reads the input
- * {@link org.apache.beam.sdk.values.KV} elements, extracts the key and merges the
- * accumulators resulting from the PartialReduce which produced the input VA.
+ * This is the second part for executing a {@link org.apache.beam.sdk.transforms.Combine.PerKey}
+ * on Flink, the second part is {@link FlinkReduceFunction}. This function performs the final
+ * combination of the pre-combined values after a shuffle.
+ *
+ * <p>The input to {@link #reduce(Iterable, Collector)} are elements of the same key but
+ * for different windows. We have to ensure that we only combine elements of matching
+ * windows.
  */
-public class FlinkReduceFunction<K, VA, VO> implements GroupReduceFunction<KV<K, VA>, KV<K, VO>> {
+public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
+    extends RichGroupReduceFunction<WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> {
+
+  protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn;
+
+  protected final DoFn<KV<K, AccumT>, KV<K, OutputT>> doFn;
+
+  protected final WindowingStrategy<?, W> windowingStrategy;
+
+  protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+
+  protected final SerializedPipelineOptions serializedOptions;
 
-  private final Combine.KeyedCombineFn<K, ?, VA, VO> keyedCombineFn;
+  public FlinkReduceFunction(
+      CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn,
+      WindowingStrategy<?, W> windowingStrategy,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+      PipelineOptions pipelineOptions) {
 
-  public FlinkReduceFunction(Combine.KeyedCombineFn<K, ?, VA, VO> keyedCombineFn) {
-    this.keyedCombineFn = keyedCombineFn;
+    this.combineFn = keyedCombineFn;
+
+    this.windowingStrategy = windowingStrategy;
+    this.sideInputs = sideInputs;
+
+    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+
+    // dummy DoFn because we need one for ProcessContext
+    this.doFn = new DoFn<KV<K, AccumT>, KV<K, OutputT>>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+
+      }
+    };
   }
 
   @Override
-  public void reduce(Iterable<KV<K, VA>> values, Collector<KV<K, VO>> out) throws Exception {
-    Iterator<KV<K, VA>> it = values.iterator();
+  public void reduce(
+      Iterable<WindowedValue<KV<K, AccumT>>> elements,
+      Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
+
+    FlinkProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext =
+        new FlinkProcessContext<>(
+            serializedOptions.getPipelineOptions(),
+            getRuntimeContext(),
+            doFn,
+            windowingStrategy,
+            out,
+            sideInputs);
+
+    PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
+        PerKeyCombineFnRunners.create(combineFn);
 
-    KV<K, VA> current = it.next();
-    K k = current.getKey();
-    VA accumulator = current.getValue();
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
 
-    while (it.hasNext()) {
-      current = it.next();
-      keyedCombineFn.mergeAccumulators(k, ImmutableList.of(accumulator, current.getValue()) );
+
+    // get all elements so that we can sort them, has to fit into
+    // memory
+    // this seems very unprudent, but correct, for now
+    ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList();
+    for (WindowedValue<KV<K, AccumT>> inputValue: elements) {
+      for (WindowedValue<KV<K, AccumT>> exploded: inputValue.explodeWindows()) {
+        sortedInput.add(exploded);
+      }
+    }
+    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, AccumT>>>() {
+      @Override
+      public int compare(
+          WindowedValue<KV<K, AccumT>> o1,
+          WindowedValue<KV<K, AccumT>> o2) {
+        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
+            .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
+      }
+    });
+
+    // iterate over the elements that are sorted by window timestamp
+    //
+    final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedInput.iterator();
+
+    // get the first accumulator
+    WindowedValue<KV<K, AccumT>> currentValue = iterator.next();
+    K key = currentValue.getValue().getKey();
+    BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null);
+    AccumT accumulator = currentValue.getValue().getValue();
+
+    // we use this to keep track of the timestamps assigned by the OutputTimeFn,
+    // in FlinkPartialReduceFunction we already merge the timestamps assigned
+    // to individual elements, here we just merge them
+    List<Instant> windowTimestamps = new ArrayList<>();
+    windowTimestamps.add(currentValue.getTimestamp());
+
+    while (iterator.hasNext()) {
+      WindowedValue<KV<K, AccumT>> nextValue = iterator.next();
+      BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
+
+      if (nextWindow.equals(currentWindow)) {
+        // continue accumulating
+        processContext = processContext.forWindowedValue(nextValue);
+        accumulator = combineFnRunner.mergeAccumulators(
+            key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), processContext);
+
+        windowTimestamps.add(nextValue.getTimestamp());
+      } else {
+        // emit the value that we currently have
+        processContext = processContext.forWindowedValue(currentValue);
+        out.collect(
+            WindowedValue.of(
+                KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)),
+                outputTimeFn.merge(currentWindow, windowTimestamps),
+                currentWindow,
+                PaneInfo.NO_FIRING));
+
+        windowTimestamps.clear();
+
+        currentWindow = nextWindow;
+        accumulator = nextValue.getValue().getValue();
+        windowTimestamps.add(nextValue.getTimestamp());
+      }
+
+      // we have to keep track so that we can set the context to the right
+      // windowed value when windows change in the iterable
+      currentValue = nextValue;
     }
 
-    out.collect(KV.of(k, keyedCombineFn.extractOutput(k, accumulator)));
+    // if at the end of the iteration we have a change in windows
+    // the ProcessContext will not have been updated
+    processContext = processContext.forWindowedValue(currentValue);
+
+    // emit the final accumulator
+    out.collect(
+        WindowedValue.of(
+            KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)),
+            outputTimeFn.merge(currentWindow, windowTimestamps),
+            currentWindow,
+            PaneInfo.NO_FIRING));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
new file mode 100644
index 0000000..451b31b
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link BroadcastVariableInitializer} that initializes the broadcast input as a {@code Map}
+ * from window to side input.
+ */
+public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow>
+    implements BroadcastVariableInitializer<WindowedValue<ElemT>, Map<BoundedWindow, ViewT>> {
+
+  PCollectionView<ViewT> view;
+
+  public SideInputInitializer(PCollectionView<ViewT> view) {
+    this.view = view;
+  }
+
+  @Override
+  public Map<BoundedWindow, ViewT> initializeBroadcastVariable(
+      Iterable<WindowedValue<ElemT>> inputValues) {
+
+    // first partition into windows
+    Map<BoundedWindow, List<WindowedValue<ElemT>>> partitionedElements = new HashMap<>();
+    for (WindowedValue<ElemT> value: inputValues) {
+      for (BoundedWindow window: value.getWindows()) {
+        List<WindowedValue<ElemT>> windowedValues = partitionedElements.get(window);
+        if (windowedValues == null) {
+          windowedValues = new ArrayList<>();
+          partitionedElements.put(window, windowedValues);
+        }
+        windowedValues.add(value);
+      }
+    }
+
+    Map<BoundedWindow, ViewT> resultMap = new HashMap<>();
+
+    for (Map.Entry<BoundedWindow, List<WindowedValue<ElemT>>> elements:
+        partitionedElements.entrySet()) {
+
+      @SuppressWarnings("unchecked")
+      Iterable<WindowedValue<?>> elementsIterable =
+          (List<WindowedValue<?>>) (List<?>) elements.getValue();
+
+      resultMap.put(elements.getKey(), view.fromIterableInternal(elementsIterable));
+    }
+
+    return resultMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java
deleted file mode 100644
index cc6fd8b..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.VarInt;
-import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-
-/**
- * A UnionCoder encodes RawUnionValues.
- *
- * This file copied from {@link org.apache.beam.sdk.transforms.join.UnionCoder}
- */
-@SuppressWarnings("serial")
-public class UnionCoder extends StandardCoder<RawUnionValue> {
-  // TODO: Think about how to integrate this with a schema object (i.e.
-  // a tuple of tuple tags).
-  /**
-   * Builds a union coder with the given list of element coders.  This list
-   * corresponds to a mapping of union tag to Coder.  Union tags start at 0.
-   */
-  public static UnionCoder of(List<Coder<?>> elementCoders) {
-    return new UnionCoder(elementCoders);
-  }
-
-  @JsonCreator
-  public static UnionCoder jsonOf(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> elements) {
-    return UnionCoder.of(elements);
-  }
-
-  private int getIndexForEncoding(RawUnionValue union) {
-    if (union == null) {
-      throw new IllegalArgumentException("cannot encode a null tagged union");
-    }
-    int index = union.getUnionTag();
-    if (index < 0 || index >= elementCoders.size()) {
-      throw new IllegalArgumentException(
-          "union value index " + index + " not in range [0.." +
-              (elementCoders.size() - 1) + "]");
-    }
-    return index;
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void encode(
-      RawUnionValue union,
-      OutputStream outStream,
-      Context context)
-      throws IOException  {
-    int index = getIndexForEncoding(union);
-    // Write out the union tag.
-    VarInt.encode(index, outStream);
-
-    // Write out the actual value.
-    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-    coder.encode(
-        union.getValue(),
-        outStream,
-        context);
-  }
-
-  @Override
-  public RawUnionValue decode(InputStream inStream, Context context)
-      throws IOException {
-    int index = VarInt.decodeInt(inStream);
-    Object value = elementCoders.get(index).decode(inStream, context);
-    return new RawUnionValue(index, value);
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return null;
-  }
-
-  @Override
-  public List<? extends Coder<?>> getComponents() {
-    return elementCoders;
-  }
-
-  /**
-   * Since this coder uses elementCoders.get(index) and coders that are known to run in constant
-   * time, we defer the return value to that coder.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context context) {
-    int index = getIndexForEncoding(union);
-    @SuppressWarnings("unchecked")
-    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-    return coder.isRegisterByteSizeObserverCheap(union.getValue(), context);
-  }
-
-  /**
-   * Notifies ElementByteSizeObserver about the byte size of the encoded value using this coder.
-   */
-  @Override
-  public void registerByteSizeObserver(
-      RawUnionValue union, ElementByteSizeObserver observer, Context context)
-      throws Exception {
-    int index = getIndexForEncoding(union);
-    // Write out the union tag.
-    observer.update(VarInt.getLength(index));
-    // Write out the actual value.
-    @SuppressWarnings("unchecked")
-    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-    coder.registerByteSizeObserver(union.getValue(), observer, context);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private final List<Coder<?>> elementCoders;
-
-  private UnionCoder(List<Coder<?>> elementCoders) {
-    this.elementCoders = elementCoders;
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(
-        "UnionCoder is only deterministic if all element coders are",
-        elementCoders);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
index 895ecef..4434cf8 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java
@@ -18,7 +18,8 @@
 package org.apache.beam.runners.flink.translation.types;
 
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.util.WindowedValue;
 
 import com.google.common.base.Preconditions;
 
@@ -71,9 +72,6 @@ public class CoderTypeInformation<T> extends TypeInformation<T> implements Atomi
   @Override
   @SuppressWarnings("unchecked")
   public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-    if (coder instanceof VoidCoder) {
-      return (TypeSerializer<T>) new VoidCoderTypeSerializer();
-    }
     return new CoderTypeSerializer<>(coder);
   }
 
@@ -84,8 +82,12 @@ public class CoderTypeInformation<T> extends TypeInformation<T> implements Atomi
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
 
     CoderTypeInformation that = (CoderTypeInformation) o;
 
@@ -113,6 +115,11 @@ public class CoderTypeInformation<T> extends TypeInformation<T> implements Atomi
   @Override
   public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig
       executionConfig) {
-    return new CoderComparator<>(coder);
+    WindowedValue.WindowedValueCoder windowCoder = (WindowedValue.WindowedValueCoder) coder;
+    if (windowCoder.getValueCoder() instanceof KvCoder) {
+      return new KvCoderComperator(windowCoder);
+    } else {
+      return new CoderComparator<>(coder);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
index c6f3921..097316b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -33,7 +33,7 @@ import java.io.ObjectInputStream;
 
 /**
  * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for
- * Dataflow {@link org.apache.beam.sdk.coders.Coder}s
+ * Dataflow {@link org.apache.beam.sdk.coders.Coder Coders}.
  */
 public class CoderTypeSerializer<T> extends TypeSerializer<T> {
   
@@ -128,14 +128,20 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> {
   }
 
   @Override
-  public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
+  public void copy(
+      DataInputView dataInputView,
+      DataOutputView dataOutputView) throws IOException {
     serialize(deserialize(dataInputView), dataOutputView);
   }
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
 
     CoderTypeSerializer that = (CoderTypeSerializer) o;
     return coder.equals(that.coder);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java
index 6f0c651..79b127d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.flink.translation.types;
 import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -31,14 +33,13 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 
 /**
- * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for
- * {@link org.apache.beam.sdk.coders.KvCoder}. We have a special comparator
+ * Flink {@link TypeComparator} for {@link KvCoder}. We have a special comparator
  * for {@link KV} that always compares on the key only.
  */
-public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> {
+public class KvCoderComperator <K, V> extends TypeComparator<WindowedValue<KV<K, V>>> {
   
-  private KvCoder<K, V> coder;
-  private Coder<K> keyCoder;
+  private final WindowedValue.WindowedValueCoder<KV<K, V>> coder;
+  private final Coder<K> keyCoder;
 
   // We use these for internal encoding/decoding for creating copies and comparing
   // serialized forms using a Coder
@@ -52,9 +53,10 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> {
   // For deserializing the key
   private transient DataInputViewWrapper inputWrapper;
 
-  public KvCoderComperator(KvCoder<K, V> coder) {
+  public KvCoderComperator(WindowedValue.WindowedValueCoder<KV<K, V>> coder) {
     this.coder = coder;
-    this.keyCoder = coder.getKeyCoder();
+    KvCoder<K, V> kvCoder = (KvCoder<K, V>) coder.getValueCoder();
+    this.keyCoder = kvCoder.getKeyCoder();
 
     buffer1 = new InspectableByteArrayOutputStream();
     buffer2 = new InspectableByteArrayOutputStream();
@@ -74,8 +76,8 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> {
   }
 
   @Override
-  public int hash(KV<K, V> record) {
-    K key = record.getKey();
+  public int hash(WindowedValue<KV<K, V>> record) {
+    K key = record.getValue().getKey();
     if (key != null) {
       return key.hashCode();
     } else {
@@ -84,27 +86,27 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> {
   }
 
   @Override
-  public void setReference(KV<K, V> toCompare) {
+  public void setReference(WindowedValue<KV<K, V>> toCompare) {
     referenceBuffer.reset();
     try {
-      keyCoder.encode(toCompare.getKey(), referenceBuffer, Coder.Context.OUTER);
+      keyCoder.encode(toCompare.getValue().getKey(), referenceBuffer, Coder.Context.OUTER);
     } catch (IOException e) {
       throw new RuntimeException("Could not set reference " + toCompare + ": " + e);
     }
   }
 
   @Override
-  public boolean equalToReference(KV<K, V> candidate) {
+  public boolean equalToReference(WindowedValue<KV<K, V>> candidate) {
     try {
       buffer2.reset();
-      keyCoder.encode(candidate.getKey(), buffer2, Coder.Context.OUTER);
+      keyCoder.encode(candidate.getValue().getKey(), buffer2, Coder.Context.OUTER);
       byte[] arr = referenceBuffer.getBuffer();
       byte[] arrOther = buffer2.getBuffer();
       if (referenceBuffer.size() != buffer2.size()) {
         return false;
       }
       int len = buffer2.size();
-      for(int i = 0; i < len; i++ ) {
+      for (int i = 0; i < len; i++) {
         if (arr[i] != arrOther[i]) {
           return false;
         }
@@ -116,8 +118,9 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> {
   }
 
   @Override
-  public int compareToReference(TypeComparator<KV<K, V>> other) {
-    InspectableByteArrayOutputStream otherReferenceBuffer = ((KvCoderComperator<K, V>) other).referenceBuffer;
+  public int compareToReference(TypeComparator<WindowedValue<KV<K, V>>> other) {
+    InspectableByteArrayOutputStream otherReferenceBuffer =
+        ((KvCoderComperator<K, V>) other).referenceBuffer;
 
     byte[] arr = referenceBuffer.getBuffer();
     byte[] arrOther = otherReferenceBuffer.getBuffer();
@@ -135,19 +138,19 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> {
 
 
   @Override
-  public int compare(KV<K, V> first, KV<K, V> second) {
+  public int compare(WindowedValue<KV<K, V>> first, WindowedValue<KV<K, V>> second) {
     try {
       buffer1.reset();
       buffer2.reset();
-      keyCoder.encode(first.getKey(), buffer1, Coder.Context.OUTER);
-      keyCoder.encode(second.getKey(), buffer2, Coder.Context.OUTER);
+      keyCoder.encode(first.getValue().getKey(), buffer1, Coder.Context.OUTER);
+      keyCoder.encode(second.getValue().getKey(), buffer2, Coder.Context.OUTER);
       byte[] arr = buffer1.getBuffer();
       byte[] arrOther = buffer2.getBuffer();
       if (buffer1.size() != buffer2.size()) {
         return buffer1.size() - buffer2.size();
       }
       int len = buffer1.size();
-      for(int i = 0; i < len; i++ ) {
+      for (int i = 0; i < len; i++) {
         if (arr[i] != arrOther[i]) {
           return arr[i] - arrOther[i];
         }
@@ -159,38 +162,19 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> {
   }
 
   @Override
-  public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
-
+  public int compareSerialized(
+      DataInputView firstSource,
+      DataInputView secondSource) throws IOException {
     inputWrapper.setInputView(firstSource);
-    K firstKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED);
+    WindowedValue<KV<K, V>> first = coder.decode(inputWrapper, Coder.Context.NESTED);
     inputWrapper.setInputView(secondSource);
-    K secondKey = keyCoder.decode(inputWrapper, Coder.Context.NESTED);
-
-    try {
-      buffer1.reset();
-      buffer2.reset();
-      keyCoder.encode(firstKey, buffer1, Coder.Context.OUTER);
-      keyCoder.encode(secondKey, buffer2, Coder.Context.OUTER);
-      byte[] arr = buffer1.getBuffer();
-      byte[] arrOther = buffer2.getBuffer();
-      if (buffer1.size() != buffer2.size()) {
-        return buffer1.size() - buffer2.size();
-      }
-      int len = buffer1.size();
-      for(int i = 0; i < len; i++ ) {
-        if (arr[i] != arrOther[i]) {
-          return arr[i] - arrOther[i];
-        }
-      }
-      return 0;
-    } catch (IOException e) {
-      throw new RuntimeException("Could not compare reference.", e);
-    }
+    WindowedValue<KV<K, V>> second = coder.decode(inputWrapper, Coder.Context.NESTED);
+    return compare(first, second);
   }
 
   @Override
   public boolean supportsNormalizedKey() {
-    return true;
+    return false;
   }
 
   @Override
@@ -209,12 +193,18 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> {
   }
 
   @Override
-  public void putNormalizedKey(KV<K, V> record, MemorySegment target, int offset, int numBytes) {
+  public void putNormalizedKey(
+      WindowedValue<KV<K, V>> record,
+      MemorySegment target,
+      int offset,
+      int numBytes) {
+
     buffer1.reset();
     try {
-      keyCoder.encode(record.getKey(), buffer1, Coder.Context.NESTED);
+      keyCoder.encode(record.getValue().getKey(), buffer1, Coder.Context.NESTED);
     } catch (IOException e) {
-      throw new RuntimeException("Could not serializer " + record + " using coder " + coder + ": " + e);
+      throw new RuntimeException(
+          "Could not serializer " + record + " using coder " + coder + ": " + e);
     }
     final byte[] data = buffer1.getBuffer();
     final int limit = offset + numBytes;
@@ -231,12 +221,16 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> {
   }
 
   @Override
-  public void writeWithKeyNormalization(KV<K, V> record, DataOutputView target) throws IOException {
+  public void writeWithKeyNormalization(
+      WindowedValue<KV<K, V>> record,
+      DataOutputView target) throws IOException {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public KV<K, V> readWithKeyDenormalization(KV<K, V> reuse, DataInputView source) throws IOException {
+  public WindowedValue<KV<K, V>> readWithKeyDenormalization(
+      WindowedValue<KV<K, V>> reuse,
+      DataInputView source) throws IOException {
     throw new UnsupportedOperationException();
   }
 
@@ -246,14 +240,14 @@ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> {
   }
 
   @Override
-  public TypeComparator<KV<K, V>> duplicate() {
+  public TypeComparator<WindowedValue<KV<K, V>>> duplicate() {
     return new KvCoderComperator<>(coder);
   }
 
   @Override
   public int extractKeys(Object record, Object[] target, int index) {
-    KV<K, V> kv = (KV<K, V>) record;
-    K k = kv.getKey();
+    WindowedValue<KV<K, V>> kv = (WindowedValue<KV<K, V>>) record;
+    K k = kv.getValue().getKey();
     target[index] = k;
     return 1;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java
index 74f3821..ba53f64 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.flink.translation.types;
 
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
 import com.google.common.base.Preconditions;
@@ -31,27 +32,32 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import java.util.List;
 
 /**
- * Flink {@link org.apache.flink.api.common.typeinfo.TypeInformation} for
- * Dataflow {@link org.apache.beam.sdk.coders.KvCoder}.
+ * Flink {@link TypeInformation} for {@link KvCoder}. This creates special comparator
+ * for {@link KV} that always compares on the key only.
  */
-public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> {
+public class KvCoderTypeInformation<K, V> extends CompositeType<WindowedValue<KV<K, V>>> {
 
-  private KvCoder<K, V> coder;
+  private final WindowedValue.WindowedValueCoder<KV<K, V>> coder;
+//  private KvCoder<K, V> coder;
 
   // We don't have the Class, so we have to pass null here. What a shame...
-  private static Object DUMMY = new Object();
+  private static Object dummy = new Object();
 
   @SuppressWarnings("unchecked")
-  public KvCoderTypeInformation(KvCoder<K, V> coder) {
-    super(((Class<KV<K,V>>) DUMMY.getClass()));
+  public KvCoderTypeInformation(WindowedValue.WindowedValueCoder<KV<K, V>> coder) {
+    super((Class) dummy.getClass());
     this.coder = coder;
     Preconditions.checkNotNull(coder);
   }
 
   @Override
   @SuppressWarnings("unchecked")
-  public TypeComparator<KV<K, V>> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset, ExecutionConfig config) {
-    return new KvCoderComperator((KvCoder) coder);
+  public TypeComparator<WindowedValue<KV<K, V>>> createComparator(
+      int[] logicalKeyFields,
+      boolean[] orders,
+      int logicalFieldOffset,
+      ExecutionConfig config) {
+    return new KvCoderComperator(coder);
   }
 
   @Override
@@ -71,7 +77,7 @@ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> {
 
   @Override
   @SuppressWarnings("unchecked")
-  public Class<KV<K, V>> getTypeClass() {
+  public Class<WindowedValue<KV<K, V>>> getTypeClass() {
     return privateGetTypeClass();
   }
 
@@ -87,7 +93,7 @@ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> {
 
   @Override
   @SuppressWarnings("unchecked")
-  public TypeSerializer<KV<K, V>> createSerializer(ExecutionConfig config) {
+  public TypeSerializer<WindowedValue<KV<K, V>>> createSerializer(ExecutionConfig config) {
     return new CoderTypeSerializer<>(coder);
   }
 
@@ -98,8 +104,12 @@ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
 
     KvCoderTypeInformation that = (KvCoderTypeInformation) o;
 
@@ -122,10 +132,11 @@ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> {
   @Override
   @SuppressWarnings("unchecked")
   public <X> TypeInformation<X> getTypeAt(int pos) {
+    KvCoder<K, V> kvCoder = (KvCoder<K, V>) coder.getValueCoder();
     if (pos == 0) {
-      return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder());
+      return (TypeInformation<X>) new CoderTypeInformation<>(kvCoder.getKeyCoder());
     } else if (pos == 1) {
-      return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder());
+      return (TypeInformation<X>) new CoderTypeInformation<>(kvCoder.getValueCoder());
     } else {
       throw new RuntimeException("Invalid field position " + pos);
     }
@@ -134,11 +145,12 @@ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> {
   @Override
   @SuppressWarnings("unchecked")
   public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
+    KvCoder<K, V> kvCoder = (KvCoder<K, V>) coder.getValueCoder();
     switch (fieldExpression) {
       case "key":
-        return (TypeInformation<X>) new CoderTypeInformation<>(coder.getKeyCoder());
+        return (TypeInformation<X>) new CoderTypeInformation<>(kvCoder.getKeyCoder());
       case "value":
-        return (TypeInformation<X>) new CoderTypeInformation<>(coder.getValueCoder());
+        return (TypeInformation<X>) new CoderTypeInformation<>(kvCoder.getValueCoder());
       default:
         throw new UnsupportedOperationException("Only KvCoder has fields.");
     }
@@ -162,17 +174,24 @@ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> {
   }
 
   @Override
-  public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
-      CoderTypeInformation keyTypeInfo = new CoderTypeInformation<>(coder.getKeyCoder());
+  public void getFlatFields(
+      String fieldExpression,
+      int offset,
+      List<FlatFieldDescriptor> result) {
+    KvCoder<K, V> kvCoder = (KvCoder<K, V>) coder.getValueCoder();
+
+    CoderTypeInformation keyTypeInfo =
+        new CoderTypeInformation<>(kvCoder.getKeyCoder());
       result.add(new FlatFieldDescriptor(0, keyTypeInfo));
   }
 
   @Override
-  protected TypeComparatorBuilder<KV<K, V>> createTypeComparatorBuilder() {
+  protected TypeComparatorBuilder<WindowedValue<KV<K, V>>> createTypeComparatorBuilder() {
     return new KvCoderTypeComparatorBuilder();
   }
 
-  private class KvCoderTypeComparatorBuilder implements TypeComparatorBuilder<KV<K, V>> {
+  private class KvCoderTypeComparatorBuilder
+      implements TypeComparatorBuilder<WindowedValue<KV<K, V>>> {
 
     @Override
     public void initializeTypeComparatorBuilder(int size) {}
@@ -181,7 +200,7 @@ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> {
     public void addComparatorField(int fieldId, TypeComparator<?> comparator) {}
 
     @Override
-    public TypeComparator<KV<K, V>> createTypeComparator(ExecutionConfig config) {
+    public TypeComparator<WindowedValue<KV<K, V>>> createTypeComparator(ExecutionConfig config) {
       return new KvCoderComperator<>(coder);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java
deleted file mode 100644
index 7b48208..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-
-/**
- * Special Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for
- * {@link org.apache.beam.sdk.coders.VoidCoder}. We need this because Flink does not
- * allow returning {@code null} from an input reader. We return a {@link VoidValue} instead
- * that behaves like a {@code null}, hopefully.
- */
-public class VoidCoderTypeSerializer extends TypeSerializer<VoidCoderTypeSerializer.VoidValue> {
-
-  @Override
-  public boolean isImmutableType() {
-    return false;
-  }
-
-  @Override
-  public VoidCoderTypeSerializer duplicate() {
-    return this;
-  }
-
-  @Override
-  public VoidValue createInstance() {
-    return VoidValue.INSTANCE;
-  }
-
-  @Override
-  public VoidValue copy(VoidValue from) {
-    return from;
-  }
-
-  @Override
-  public VoidValue copy(VoidValue from, VoidValue reuse) {
-    return from;
-  }
-
-  @Override
-  public int getLength() {
-    return 0;
-  }
-
-  @Override
-  public void serialize(VoidValue record, DataOutputView target) throws IOException {
-    target.writeByte(1);
-  }
-
-  @Override
-  public VoidValue deserialize(DataInputView source) throws IOException {
-    source.readByte();
-    return VoidValue.INSTANCE;
-  }
-
-  @Override
-  public VoidValue deserialize(VoidValue reuse, DataInputView source) throws IOException {
-    return deserialize(source);
-  }
-
-  @Override
-  public void copy(DataInputView source, DataOutputView target) throws IOException {
-    source.readByte();
-    target.writeByte(1);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof VoidCoderTypeSerializer) {
-      VoidCoderTypeSerializer other = (VoidCoderTypeSerializer) obj;
-      return other.canEqual(this);
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public boolean canEqual(Object obj) {
-    return obj instanceof VoidCoderTypeSerializer;
-  }
-
-  @Override
-  public int hashCode() {
-    return 0;
-  }
-
-  public static class VoidValue {
-    private VoidValue() {}
-    
-    public static VoidValue INSTANCE = new VoidValue();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java
deleted file mode 100644
index e5567d3..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers;
-
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-
-import com.google.common.collect.Lists;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-
-import java.io.Serializable;
-
-/**
- * Wrapper that wraps a {@link org.apache.beam.sdk.transforms.Combine.CombineFn}
- * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using
- * the combine function as an aggregator in a {@link org.apache.beam.sdk.transforms.ParDo}
- * operation.
- */
-public class CombineFnAggregatorWrapper<AI, AA, AR> implements Aggregator<AI, AR>, Accumulator<AI, Serializable> {
-  
-  private AA aa;
-  private Combine.CombineFn<? super AI, AA, AR> combiner;
-
-  public CombineFnAggregatorWrapper() {
-  }
-
-  public CombineFnAggregatorWrapper(Combine.CombineFn<? super AI, AA, AR> combiner) {
-    this.combiner = combiner;
-    this.aa = combiner.createAccumulator();
-  }
-
-  @Override
-  public void add(AI value) {
-    combiner.addInput(aa, value);
-  }
-
-  @Override
-  public Serializable getLocalValue() {
-    return (Serializable) combiner.extractOutput(aa);
-  }
-
-  @Override
-  public void resetLocal() {
-    aa = combiner.createAccumulator();
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void merge(Accumulator<AI, Serializable> other) {
-    aa = combiner.mergeAccumulators(Lists.newArrayList(aa, ((CombineFnAggregatorWrapper<AI, AA, AR>)other).aa));
-  }
-
-  @Override
-  public Accumulator<AI, Serializable> clone() {
-    // copy it by merging
-    AA aaCopy = combiner.mergeAccumulators(Lists.newArrayList(aa));
-    CombineFnAggregatorWrapper<AI, AA, AR> result = new
-        CombineFnAggregatorWrapper<>(combiner);
-    result.aa = aaCopy;
-    return result;
-  }
-
-  @Override
-  public void addValue(AI value) {
-    add(value);
-  }
-
-  @Override
-  public String getName() {
-    return "CombineFn: " + combiner.toString();
-  }
-
-  @Override
-  public Combine.CombineFn getCombineFn() {
-    return combiner;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
index eb32fa2..82d3fb8 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java
@@ -33,20 +33,21 @@ import java.io.Serializable;
  * the function as an aggregator in a {@link org.apache.beam.sdk.transforms.ParDo}
  * operation.
  */
-public class SerializableFnAggregatorWrapper<AI, AO> implements Aggregator<AI, AO>, Accumulator<AI, Serializable> {
+public class SerializableFnAggregatorWrapper<InputT, OutputT>
+    implements Aggregator<InputT, OutputT>, Accumulator<InputT, Serializable> {
 
-  private AO aa;
-  private Combine.CombineFn<AI, ?, AO> combiner;
+  private OutputT aa;
+  private Combine.CombineFn<InputT, ?, OutputT> combiner;
 
-  public SerializableFnAggregatorWrapper(Combine.CombineFn<AI, ?, AO> combiner) {
+  public SerializableFnAggregatorWrapper(Combine.CombineFn<InputT, ?, OutputT> combiner) {
     this.combiner = combiner;
     resetLocal();
   }
-  
+
   @Override
   @SuppressWarnings("unchecked")
-  public void add(AI value) {
-    this.aa = combiner.apply(ImmutableList.of((AI) aa, value));
+  public void add(InputT value) {
+    this.aa = combiner.apply(ImmutableList.of((InputT) aa, value));
   }
 
   @Override
@@ -56,17 +57,17 @@ public class SerializableFnAggregatorWrapper<AI, AO> implements Aggregator<AI, A
 
   @Override
   public void resetLocal() {
-    this.aa = combiner.apply(ImmutableList.<AI>of());
+    this.aa = combiner.apply(ImmutableList.<InputT>of());
   }
 
   @Override
   @SuppressWarnings("unchecked")
-  public void merge(Accumulator<AI, Serializable> other) {
-    this.aa = combiner.apply(ImmutableList.of((AI) aa, (AI) other.getLocalValue()));
+  public void merge(Accumulator<InputT, Serializable> other) {
+    this.aa = combiner.apply(ImmutableList.of((InputT) aa, (InputT) other.getLocalValue()));
   }
 
   @Override
-  public void addValue(AI value) {
+  public void addValue(InputT value) {
     add(value);
   }
 
@@ -76,15 +77,15 @@ public class SerializableFnAggregatorWrapper<AI, AO> implements Aggregator<AI, A
   }
 
   @Override
-  public Combine.CombineFn<AI, ?, AO> getCombineFn() {
+  public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() {
     return combiner;
   }
 
   @Override
-  public Accumulator<AI, Serializable> clone() {
+  public Accumulator<InputT, Serializable> clone() {
     // copy it by merging
-    AO resultCopy = combiner.apply(Lists.newArrayList((AI) aa));
-    SerializableFnAggregatorWrapper<AI, AO> result = new
+    OutputT resultCopy = combiner.apply(Lists.newArrayList((InputT) aa));
+    SerializableFnAggregatorWrapper<InputT, OutputT> result = new
         SerializableFnAggregatorWrapper<>(combiner);
 
     result.aa = resultCopy;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
index 53e544d..c0a7132 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java
@@ -22,6 +22,7 @@ import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions
 import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.WindowedValue;
 
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.configuration.Configuration;
@@ -31,10 +32,11 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 
 /**
- * Wrapper class to use generic Write.Bound transforms as sinks.
+ * Wrapper for executing a {@link Sink} on Flink as an {@link OutputFormat}.
+ *
  * @param <T> The type of the incoming records.
  */
-public class SinkOutputFormat<T> implements OutputFormat<T> {
+public class SinkOutputFormat<T> implements OutputFormat<WindowedValue<T>> {
 
   private final Sink<T> sink;
 
@@ -75,9 +77,9 @@ public class SinkOutputFormat<T> implements OutputFormat<T> {
   }
 
   @Override
-  public void writeRecord(T record) throws IOException {
+  public void writeRecord(WindowedValue<T> record) throws IOException {
     try {
-      writer.write(record);
+      writer.write(record.getValue());
     } catch (Exception e) {
       throw new IOException("Couldn't write record.", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index debd1a1..1d06b1a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -21,12 +21,16 @@ import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
 
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitAssigner;
+import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,10 +39,10 @@ import java.util.List;
 
 
 /**
- * A Flink {@link org.apache.flink.api.common.io.InputFormat} that wraps a
- * Dataflow {@link org.apache.beam.sdk.io.Source}.
+ * Wrapper for executing a {@link Source} as a Flink {@link InputFormat}.
  */
-public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> {
+public class SourceInputFormat<T>
+    implements InputFormat<WindowedValue<T>, SourceInputSplit<T>> {
   private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);
 
   private final BoundedSource<T> initialSource;
@@ -122,12 +126,16 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>>
   }
 
   @Override
-  public T nextRecord(T t) throws IOException {
+  public WindowedValue<T> nextRecord(WindowedValue<T> t) throws IOException {
     if (inputAvailable) {
       final T current = reader.getCurrent();
+      final Instant timestamp = reader.getCurrentTimestamp();
       // advance reader to have a record ready next time
       inputAvailable = reader.advance();
-      return current;
+      return WindowedValue.of(
+          current,
+          timestamp,
+          GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
     }
 
     return null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
index 3bf566b..6b69d54 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
@@ -54,7 +53,7 @@ public class FlinkGroupByKeyWrapper {
 
           @Override
           public K getKey(WindowedValue<KV<K, V>> value) throws Exception {
-            return isKeyVoid ? (K) VoidCoderTypeSerializer.VoidValue.INSTANCE :
+            return isKeyVoid ? (K) VoidValue.INSTANCE :
                 value.getValue().getKey();
           }
 
@@ -64,4 +63,11 @@ public class FlinkGroupByKeyWrapper {
           }
         });
   }
+
+  // special type to return as key for null key
+  public static class VoidValue {
+    private VoidValue() {}
+
+    public static VoidValue INSTANCE = new VoidValue();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
index d6aff7d..8cd8351 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
 
-import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -47,17 +46,11 @@ public class FlinkStreamingCreateFunction<IN, OUT> implements FlatMapFunction<IN
   @Override
   public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception {
 
-    @SuppressWarnings("unchecked")
-    OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE;
     for (byte[] element : elements) {
       ByteArrayInputStream bai = new ByteArrayInputStream(element);
       OUT outValue = coder.decode(bai, Coder.Context.OUTER);
 
-      if (outValue == null) {
-        out.collect(WindowedValue.of(voidValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
-      } else {
-        out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
-      }
+      out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
     }
 
     out.close();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/AvroITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/AvroITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/AvroITCase.java
deleted file mode 100644
index 113fee0..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/AvroITCase.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-
-import com.google.common.base.Joiner;
-
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-
-public class AvroITCase extends JavaProgramTestBase {
-
-  protected String resultPath;
-  protected String tmpPath;
-
-  public AvroITCase(){
-  }
-
-  static final String[] EXPECTED_RESULT = new String[] {
-      "Joe red 3",
-      "Mary blue 4",
-      "Mark green 1",
-      "Julia purple 5"
-  };
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
-    tmpPath = getTempDirPath("tmp");
-
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-  }
-
-  @Override
-  protected void testProgram() throws Exception {
-    runProgram(tmpPath, resultPath);
-  }
-
-  private static void runProgram(String tmpPath, String resultPath) {
-    Pipeline p = FlinkTestPipeline.createForBatch();
-
-    p
-      .apply(Create.of(
-          new User("Joe", 3, "red"),
-          new User("Mary", 4, "blue"),
-          new User("Mark", 1, "green"),
-          new User("Julia", 5, "purple"))
-        .withCoder(AvroCoder.of(User.class)))
-
-      .apply(AvroIO.Write.to(tmpPath)
-        .withSchema(User.class));
-
-    p.run();
-
-    p = FlinkTestPipeline.createForBatch();
-
-    p
-      .apply(AvroIO.Read.from(tmpPath).withSchema(User.class).withoutValidation())
-
-        .apply(ParDo.of(new DoFn<User, String>() {
-          @Override
-          public void processElement(ProcessContext c) throws Exception {
-            User u = c.element();
-            String result = u.getName() + " " + u.getFavoriteColor() + " " + u.getFavoriteNumber();
-            c.output(result);
-          }
-        }))
-
-      .apply(TextIO.Write.to(resultPath));
-
-    p.run();
-  }
-
-  private static class User {
-
-    private String name;
-    private int favoriteNumber;
-    private String favoriteColor;
-
-    public User() {}
-
-    public User(String name, int favoriteNumber, String favoriteColor) {
-      this.name = name;
-      this.favoriteNumber = favoriteNumber;
-      this.favoriteColor = favoriteColor;
-    }
-
-    public String getName() {
-      return name;
-    }
-
-    public String getFavoriteColor() {
-      return favoriteColor;
-    }
-
-    public int getFavoriteNumber() {
-      return favoriteNumber;
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java
deleted file mode 100644
index ac0a3d7..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-
-import com.google.common.base.Joiner;
-
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class FlattenizeITCase extends JavaProgramTestBase {
-
-  private String resultPath;
-  private String resultPath2;
-
-  private static final String[] words = {"hello", "this", "is", "a", "DataSet!"};
-  private static final String[] words2 = {"hello", "this", "is", "another", "DataSet!"};
-  private static final String[] words3 = {"hello", "this", "is", "yet", "another", "DataSet!"};
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
-    resultPath2 = getTempDirPath("result2");
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    String join = Joiner.on('\n').join(words);
-    String join2 = Joiner.on('\n').join(words2);
-    String join3 = Joiner.on('\n').join(words3);
-    compareResultsByLinesInMemory(join + "\n" + join2, resultPath);
-    compareResultsByLinesInMemory(join + "\n" + join2 + "\n" + join3, resultPath2);
-  }
-
-
-  @Override
-  protected void testProgram() throws Exception {
-    Pipeline p = FlinkTestPipeline.createForBatch();
-
-    PCollection<String> p1 = p.apply(Create.of(words));
-    PCollection<String> p2 = p.apply(Create.of(words2));
-
-    PCollectionList<String> list = PCollectionList.of(p1).and(p2);
-
-    list.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath));
-
-    PCollection<String> p3 = p.apply(Create.of(words3));
-
-    PCollectionList<String> list2 = list.and(p3);
-
-    list2.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath2));
-
-    p.run();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java
deleted file mode 100644
index 47685b6..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.runners.flink.util.JoinExamples;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.common.base.Joiner;
-
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.Arrays;
-import java.util.List;
-
-
-/**
- * Unfortunately we need to copy the code from the Dataflow SDK because it is not public there.
- */
-public class JoinExamplesITCase extends JavaProgramTestBase {
-
-  protected String resultPath;
-
-  public JoinExamplesITCase(){
-  }
-
-  private static final TableRow row1 = new TableRow()
-      .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
-      .set("Actor1Name", "BANGKOK").set("SOURCEURL", "http://cnn.com");
-  private static final TableRow row2 = new TableRow()
-      .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
-      .set("Actor1Name", "LAOS").set("SOURCEURL", "http://www.chicagotribune.com");
-  private static final TableRow row3 = new TableRow()
-      .set("ActionGeo_CountryCode", "BE").set("SQLDATE", "20141213")
-      .set("Actor1Name", "AFGHANISTAN").set("SOURCEURL", "http://cnn.com");
-  static final TableRow[] EVENTS = new TableRow[] {
-      row1, row2, row3
-  };
-  static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS);
-
-  private static final TableRow cc1 = new TableRow()
-      .set("FIPSCC", "VM").set("HumanName", "Vietnam");
-  private static final TableRow cc2 = new TableRow()
-      .set("FIPSCC", "BE").set("HumanName", "Belgium");
-  static final TableRow[] CCS = new TableRow[] {
-      cc1, cc2
-  };
-  static final List<TableRow> CC_ARRAY = Arrays.asList(CCS);
-
-  static final String[] JOINED_EVENTS = new String[] {
-      "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, "
-          + "url: http://www.chicagotribune.com",
-      "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, "
-          + "url: http://cnn.com",
-      "Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, "
-          + "url: http://cnn.com"
-  };
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(JOINED_EVENTS), resultPath);
-  }
-
-  @Override
-  protected void testProgram() throws Exception {
-
-    Pipeline p = FlinkTestPipeline.createForBatch();
-
-    PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY));
-    PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY));
-
-    PCollection<String> output = JoinExamples.joinEvents(input1, input2);
-
-    output.apply(TextIO.Write.to(resultPath));
-
-    p.run();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java
deleted file mode 100644
index 4d66fa4..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.io.Serializable;
-
-public class MaybeEmptyTestITCase extends JavaProgramTestBase implements Serializable {
-
-  protected String resultPath;
-
-  protected final String expected = "test";
-
-  public MaybeEmptyTestITCase() {
-  }
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(expected, resultPath);
-  }
-
-  @Override
-  protected void testProgram() throws Exception {
-
-    Pipeline p = FlinkTestPipeline.createForBatch();
-
-    p.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
-        .apply(ParDo.of(
-            new DoFn<Void, String>() {
-              @Override
-              public void processElement(DoFn<Void, String>.ProcessContext c) {
-                c.output(expected);
-              }
-            })).apply(TextIO.Write.to(resultPath));
-    p.run();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java
deleted file mode 100644
index a2ef4e2..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-
-import com.google.common.base.Joiner;
-
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.io.Serializable;
-
-public class ParDoMultiOutputITCase extends JavaProgramTestBase implements Serializable {
-
-  private String resultPath;
-
-  private static String[] expectedWords = {"MAAA", "MAAFOOO"};
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on("\n").join(expectedWords), resultPath);
-  }
-
-  @Override
-  protected void testProgram() throws Exception {
-    Pipeline p = FlinkTestPipeline.createForBatch();
-
-    PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO"));
-
-    // Select words whose length is below a cut off,
-    // plus the lengths of words that are above the cut off.
-    // Also select words starting with "MARKER".
-    final int wordLengthCutOff = 3;
-    // Create tags to use for the main and side outputs.
-    final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){};
-    final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){};
-    final TupleTag<String> markedWordsTag = new TupleTag<String>(){};
-
-    PCollectionTuple results =
-        words.apply(ParDo
-            .withOutputTags(wordsBelowCutOffTag, TupleTagList.of(wordLengthsAboveCutOffTag)
-                .and(markedWordsTag))
-            .of(new DoFn<String, String>() {
-              final TupleTag<String> specialWordsTag = new TupleTag<String>() {
-              };
-
-              public void processElement(ProcessContext c) {
-                String word = c.element();
-                if (word.length() <= wordLengthCutOff) {
-                  c.output(word);
-                } else {
-                  c.sideOutput(wordLengthsAboveCutOffTag, word.length());
-                }
-                if (word.startsWith("MAA")) {
-                  c.sideOutput(markedWordsTag, word);
-                }
-
-                if (word.startsWith("SPECIAL")) {
-                  c.sideOutput(specialWordsTag, word);
-                }
-              }
-            }));
-
-    // Extract the PCollection results, by tag.
-    PCollection<String> wordsBelowCutOff = results.get(wordsBelowCutOffTag);
-    PCollection<Integer> wordLengthsAboveCutOff = results.get
-        (wordLengthsAboveCutOffTag);
-    PCollection<String> markedWords = results.get(markedWordsTag);
-
-    markedWords.apply(TextIO.Write.to(resultPath));
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
index 66c959e..bb79b27 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -28,6 +28,9 @@ import com.google.common.base.Joiner;
 
 import org.apache.flink.test.util.JavaProgramTestBase;
 
+import java.io.File;
+import java.net.URI;
+
 /**
  * Reads from a bounded source in batch execution.
  */
@@ -44,6 +47,13 @@ public class ReadSourceITCase extends JavaProgramTestBase {
   @Override
   protected void preSubmit() throws Exception {
     resultPath = getTempDirPath("result");
+
+    // need to create the dir, otherwise Beam sinks don't
+    // work for these tests
+
+    if (!new File(new URI(resultPath)).mkdirs()) {
+      throw new RuntimeException("Could not create output dir.");
+    }
   }
 
   @Override
@@ -56,7 +66,7 @@ public class ReadSourceITCase extends JavaProgramTestBase {
     runProgram(resultPath);
   }
 
-  private static void runProgram(String resultPath) {
+  private static void runProgram(String resultPath) throws Exception {
 
     Pipeline p = FlinkTestPipeline.createForBatch();
 
@@ -69,7 +79,7 @@ public class ReadSourceITCase extends JavaProgramTestBase {
           }
         }));
 
-    result.apply(TextIO.Write.to(resultPath));
+    result.apply(TextIO.Write.to(new URI(resultPath).getPath() + "/part"));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java
deleted file mode 100644
index 471d326..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.base.Joiner;
-
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.Collections;
-import java.util.List;
-
-
-public class RemoveDuplicatesEmptyITCase extends JavaProgramTestBase {
-
-  protected String resultPath;
-
-  public RemoveDuplicatesEmptyITCase(){
-  }
-
-  static final String[] EXPECTED_RESULT = new String[] {};
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-  }
-
-  @Override
-  protected void testProgram() throws Exception {
-
-    List<String> strings = Collections.emptyList();
-
-    Pipeline p = FlinkTestPipeline.createForBatch();
-
-    PCollection<String> input =
-        p.apply(Create.of(strings))
-            .setCoder(StringUtf8Coder.of());
-
-    PCollection<String> output =
-        input.apply(RemoveDuplicates.<String>create());
-
-    output.apply(TextIO.Write.to(resultPath));
-    p.run();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java
deleted file mode 100644
index 0544f20..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.RemoveDuplicates;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.base.Joiner;
-
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.util.Arrays;
-import java.util.List;
-
-
-public class RemoveDuplicatesITCase extends JavaProgramTestBase {
-
-  protected String resultPath;
-
-  public RemoveDuplicatesITCase(){
-  }
-
-  static final String[] EXPECTED_RESULT = new String[] {
-      "k1", "k5", "k2", "k3"};
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-  }
-
-  @Override
-  protected void testProgram() throws Exception {
-
-    List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", "k3");
-
-    Pipeline p = FlinkTestPipeline.createForBatch();
-
-    PCollection<String> input =
-        p.apply(Create.of(strings))
-            .setCoder(StringUtf8Coder.of());
-
-    PCollection<String> output =
-        input.apply(RemoveDuplicates.<String>create());
-
-    output.apply(TextIO.Write.to(resultPath));
-    p.run();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java
deleted file mode 100644
index 2c7c65e..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.io.Serializable;
-
-public class SideInputITCase extends JavaProgramTestBase implements Serializable {
-
-  private static final String expected = "Hello!";
-
-  protected String resultPath;
-
-  @Override
-  protected void testProgram() throws Exception {
-
-
-    Pipeline p = FlinkTestPipeline.createForBatch();
-
-
-    final PCollectionView<String> sidesInput = p
-        .apply(Create.of(expected))
-        .apply(View.<String>asSingleton());
-
-    p.apply(Create.of("bli"))
-        .apply(ParDo.of(new DoFn<String, String>() {
-          @Override
-          public void processElement(ProcessContext c) throws Exception {
-            String s = c.sideInput(sidesInput);
-            c.output(s);
-          }
-        }).withSideInputs(sidesInput)).apply(TextIO.Write.to(resultPath));
-
-    p.run();
-  }
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(expected, resultPath);
-  }
-}