You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/05/20 07:15:23 UTC

[05/14] incubator-beam git commit: [BEAM-270] Support Timestamps/Windows in Flink Batch

[BEAM-270] Support Timestamps/Windows in Flink Batch

With this change we always use WindowedValue<T> for the underlying Flink
DataSets instead of just T. This allows us to support windowing as well.

This changes also a lot of other stuff enabled by the above:

 - Use WindowedValue throughout
 - Add proper translation for Window.into()
 - Make side inputs window aware
 - Make GroupByKey and Combine transformations window aware, this
   includes support for merging windows. GroupByKey is implemented as a
   Combine with a concatenating CombineFn, for simplicity

This removes Flink specific transformations for things that are handled
by builtin sources/sinks, among other things this:

 - Removes special translation for AvroIO.Read/Write and
   TextIO.Read/Write
 - Removes special support for Write.Bound, this was not working properly
   and is now handled by the Beam machinery that uses DoFns for this
 - Removes special translation for binary Co-Group, the code was still
   in there but was never used
 - Removes ConsoleIO, this can be done using a DoFn

With this change all RunnableOnService tests run on Flink Batch.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/24bfca23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/24bfca23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/24bfca23

Branch: refs/heads/master
Commit: 24bfca230d5db3cb75dd0e30093a10f7523c1238
Parents: 4e60a49
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue May 10 13:53:03 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri May 20 08:08:24 2016 +0200

----------------------------------------------------------------------
 runners/flink/runner/pom.xml                    |  10 -
 .../apache/beam/runners/flink/io/ConsoleIO.java |  82 --
 .../FlinkBatchPipelineTranslator.java           |   4 +-
 .../FlinkBatchTransformTranslators.java         | 846 ++++++++++++-------
 .../FlinkBatchTranslationContext.java           |  56 +-
 .../FlinkStreamingTransformTranslators.java     |  22 +-
 .../FlinkStreamingTranslationContext.java       |  29 +-
 .../functions/FlinkAssignContext.java           |  56 ++
 .../functions/FlinkAssignWindows.java           |  51 ++
 .../FlinkCoGroupKeyedListAggregator.java        |  61 --
 .../functions/FlinkCreateFunction.java          |  63 --
 .../functions/FlinkDoFnFunction.java            | 194 ++---
 .../FlinkKeyedListAggregationFunction.java      |  78 --
 .../FlinkMergingNonShuffleReduceFunction.java   | 238 ++++++
 .../FlinkMergingPartialReduceFunction.java      | 205 +++++
 .../functions/FlinkMergingReduceFunction.java   | 207 +++++
 .../functions/FlinkMultiOutputDoFnFunction.java | 157 ++--
 .../FlinkMultiOutputProcessContext.java         | 176 ++++
 .../FlinkMultiOutputPruningFunction.java        |  25 +-
 .../functions/FlinkNoElementAssignContext.java  |  71 ++
 .../functions/FlinkPartialReduceFunction.java   | 171 +++-
 .../functions/FlinkProcessContext.java          | 324 +++++++
 .../functions/FlinkReduceFunction.java          | 174 +++-
 .../functions/SideInputInitializer.java         |  75 ++
 .../flink/translation/functions/UnionCoder.java | 152 ----
 .../translation/types/CoderTypeInformation.java |  21 +-
 .../translation/types/CoderTypeSerializer.java  |  14 +-
 .../translation/types/KvCoderComperator.java    | 102 ++-
 .../types/KvCoderTypeInformation.java           |  63 +-
 .../types/VoidCoderTypeSerializer.java          | 112 ---
 .../wrappers/CombineFnAggregatorWrapper.java    |  94 ---
 .../SerializableFnAggregatorWrapper.java        |  31 +-
 .../translation/wrappers/SinkOutputFormat.java  |  10 +-
 .../translation/wrappers/SourceInputFormat.java |  18 +-
 .../streaming/FlinkGroupByKeyWrapper.java       |  10 +-
 .../io/FlinkStreamingCreateFunction.java        |   9 +-
 .../apache/beam/runners/flink/AvroITCase.java   | 129 ---
 .../beam/runners/flink/FlattenizeITCase.java    |  76 --
 .../beam/runners/flink/JoinExamplesITCase.java  | 102 ---
 .../runners/flink/MaybeEmptyTestITCase.java     |  66 --
 .../runners/flink/ParDoMultiOutputITCase.java   | 102 ---
 .../beam/runners/flink/ReadSourceITCase.java    |  14 +-
 .../flink/RemoveDuplicatesEmptyITCase.java      |  72 --
 .../runners/flink/RemoveDuplicatesITCase.java   |  73 --
 .../beam/runners/flink/SideInputITCase.java     |  70 --
 .../apache/beam/runners/flink/TfIdfITCase.java  |  80 --
 .../beam/runners/flink/WordCountITCase.java     |  77 --
 .../runners/flink/WordCountJoin2ITCase.java     | 140 ---
 .../runners/flink/WordCountJoin3ITCase.java     | 158 ----
 .../flink/streaming/GroupAlsoByWindowTest.java  |   3 +-
 .../beam/runners/flink/util/JoinExamples.java   | 161 ----
 .../beam/sdk/transforms/join/UnionCoder.java    |   2 +-
 52 files changed, 2605 insertions(+), 2731 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index fda27a8..b29a5bf 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -191,16 +191,6 @@
                   ]
                 </beamTestPipelineOptions>
               </systemPropertyVariables>
-              <excludes>
-                <!-- Tests that use unsupported windowing -->
-                <exclude>**/org/apache/beam/sdk/transforms/CombineTest.java</exclude>
-                <exclude>**/org/apache/beam/sdk/transforms/GroupByKeyTest.java</exclude>
-                <exclude>**/org/apache/beam/sdk/transforms/ViewTest.java</exclude>
-                <exclude>**/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java</exclude>
-                <exclude>**/org/apache/beam/sdk/transforms/windowing/WindowTest.java</exclude>
-                <exclude>**/org/apache/beam/sdk/transforms/windowing/WindowingTest.java</exclude>
-                <exclude>**/org/apache/beam/sdk/util/ReshuffleTest.java</exclude>
-              </excludes>
             </configuration>
           </execution>
           <execution>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
deleted file mode 100644
index 9c36c21..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.io;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-/**
- * Transform for printing the contents of a {@link org.apache.beam.sdk.values.PCollection}.
- * to standard output.
- *
- * This is Flink-specific and will only work when executed using the
- * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}.
- */
-public class ConsoleIO {
-
-  /**
-   * A PTransform that writes a PCollection to a standard output.
-   */
-  public static class Write {
-
-    /**
-     * Returns a ConsoleIO.Write PTransform with a default step name.
-     */
-    public static Bound create() {
-      return new Bound();
-    }
-
-    /**
-     * Returns a ConsoleIO.Write PTransform with the given step name.
-     */
-    public static Bound named(String name) {
-      return new Bound().named(name);
-    }
-
-    /**
-     * A PTransform that writes a bounded PCollection to standard output.
-     */
-    public static class Bound extends PTransform<PCollection<?>, PDone> {
-      private static final long serialVersionUID = 0;
-
-      Bound() {
-        super("ConsoleIO.Write");
-      }
-
-      Bound(String name) {
-        super(name);
-      }
-
-      /**
-       * Returns a new ConsoleIO.Write PTransform that's like this one but with the given
-       * step
-       * name.  Does not modify this object.
-       */
-      public Bound named(String name) {
-        return new Bound(name);
-      }
-
-      @Override
-      public PDone apply(PCollection<?> input) {
-        return PDone.in(input.getPipeline());
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index 512b822..69c02a2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -32,8 +32,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs.
- * This is based on {@link org.apache.beam.runners.dataflow.DataflowPipelineTranslator}
+ * {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a
+ * Flink batch job.
  */
 public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 07785aa..8358807 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -17,23 +17,24 @@
  */
 package org.apache.beam.runners.flink.translation;
 
-import org.apache.beam.runners.flink.io.ConsoleIO;
-import org.apache.beam.runners.flink.translation.functions.FlinkCoGroupKeyedListAggregator;
-import org.apache.beam.runners.flink.translation.functions.FlinkCreateFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkKeyedListAggregationFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkMergingPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkMergingReduceFunction;
 import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputDoFnFunction;
 import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction;
 import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
 import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.UnionCoder;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.KvCoderTypeInformation;
 import org.apache.beam.runners.flink.translation.wrappers.SinkOutputFormat;
 import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -41,60 +42,63 @@ import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
-import com.google.api.client.util.Maps;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
+import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.io.AvroInputFormat;
 import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.io.TextInputFormat;
-import org.apache.flink.api.java.operators.CoGroupOperator;
 import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.operators.FlatMapOperator;
 import org.apache.flink.api.java.operators.GroupCombineOperator;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.Grouping;
+import org.apache.flink.api.java.operators.MapOperator;
 import org.apache.flink.api.java.operators.MapPartitionOperator;
+import org.apache.flink.api.java.operators.SingleInputUdfOperator;
 import org.apache.flink.api.java.operators.UnsortedGrouping;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Translators for transforming
- * Dataflow {@link org.apache.beam.sdk.transforms.PTransform}s to
- * Flink {@link org.apache.flink.api.java.DataSet}s.
+ * Translators for transforming {@link PTransform PTransforms} to
+ * Flink {@link DataSet DataSets}.
  */
 public class FlinkBatchTransformTranslators {
 
@@ -103,113 +107,90 @@ public class FlinkBatchTransformTranslators {
   // --------------------------------------------------------------------------------------------
 
   @SuppressWarnings("rawtypes")
-  private static final Map<Class<? extends PTransform>, FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>();
+  private static final Map<
+      Class<? extends PTransform>,
+      FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>();
 
-  // register the known translators
   static {
     TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch());
 
     TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch());
-    // we don't need this because we translate the Combine.PerKey directly
-    //TRANSLATORS.put(Combine.GroupedValues.class, new CombineGroupedValuesTranslator());
-
-    TRANSLATORS.put(Create.Values.class, new CreateTranslatorBatch());
+    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
 
     TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch());
 
-    // TODO we're currently ignoring windows here but that has to change in the future
-    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
+    TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslatorBatch());
 
-    TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiTranslatorBatch());
     TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslatorBatch());
-
-    TRANSLATORS.put(CoGroupByKey.class, new CoGroupByKeyTranslatorBatch());
-
-    TRANSLATORS.put(AvroIO.Read.Bound.class, new AvroIOReadTranslatorBatch());
-    TRANSLATORS.put(AvroIO.Write.Bound.class, new AvroIOWriteTranslatorBatch());
+    TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiTranslatorBatch());
 
     TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch());
-    TRANSLATORS.put(Write.Bound.class, new WriteSinkTranslatorBatch());
-
-    TRANSLATORS.put(TextIO.Read.Bound.class, new TextIOReadTranslatorBatch());
-    TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteTranslatorBatch());
-
-    // Flink-specific
-    TRANSLATORS.put(ConsoleIO.Write.Bound.class, new ConsoleIOWriteTranslatorBatch());
-
   }
 
 
-  public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
+  public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(
+      PTransform<?, ?> transform) {
     return TRANSLATORS.get(transform.getClass());
   }
 
-  private static class ReadSourceTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> {
+  private static class ReadSourceTranslatorBatch<T>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> {
 
     @Override
     public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContext context) {
       String name = transform.getName();
       BoundedSource<T> source = transform.getSource();
       PCollection<T> output = context.getOutput(transform);
-      Coder<T> coder = output.getCoder();
 
-      TypeInformation<T> typeInformation = context.getTypeInfo(output);
+      TypeInformation<WindowedValue<T>> typeInformation = context.getTypeInfo(output);
 
-      DataSource<T> dataSource = new DataSource<>(context.getExecutionEnvironment(),
-          new SourceInputFormat<>(source, context.getPipelineOptions()), typeInformation, name);
+      DataSource<WindowedValue<T>> dataSource = new DataSource<>(
+          context.getExecutionEnvironment(),
+          new SourceInputFormat<>(source, context.getPipelineOptions()),
+          typeInformation,
+          name);
 
       context.setOutputDataSet(output, dataSource);
     }
   }
 
-  private static class AvroIOReadTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Read.Bound<T>> {
-    private static final Logger LOG = LoggerFactory.getLogger(AvroIOReadTranslatorBatch.class);
+  private static class WriteSinkTranslatorBatch<T>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>> {
 
     @Override
-    public void translateNode(AvroIO.Read.Bound<T> transform, FlinkBatchTranslationContext context) {
-      String path = transform.getFilepattern();
+    public void translateNode(Write.Bound<T> transform, FlinkBatchTranslationContext context) {
       String name = transform.getName();
-//      Schema schema = transform.getSchema();
-      PValue output = context.getOutput(transform);
-
-      TypeInformation<T> typeInformation = context.getTypeInfo(output);
-
-      // This is super hacky, but unfortunately we cannot get the type otherwise
-      Class<T> extractedAvroType;
-      try {
-        Field typeField = transform.getClass().getDeclaredField("type");
-        typeField.setAccessible(true);
-        @SuppressWarnings("unchecked")
-        Class<T> avroType = (Class<T>) typeField.get(transform);
-        extractedAvroType = avroType;
-      } catch (NoSuchFieldException | IllegalAccessException e) {
-        // we know that the field is there and it is accessible
-        throw new RuntimeException("Could not access type from AvroIO.Bound", e);
-      }
-
-      DataSource<T> source = new DataSource<>(context.getExecutionEnvironment(),
-          new AvroInputFormat<>(new Path(path), extractedAvroType),
-          typeInformation, name);
+      PValue input = context.getInput(transform);
+      DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input);
 
-      context.setOutputDataSet(output, source);
+      inputDataSet.output(new SinkOutputFormat<>(transform, context.getPipelineOptions()))
+          .name(name);
     }
   }
 
-  private static class AvroIOWriteTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>> {
+  private static class AvroIOWriteTranslatorBatch<T> implements
+      FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>> {
     private static final Logger LOG = LoggerFactory.getLogger(AvroIOWriteTranslatorBatch.class);
 
+
     @Override
-    public void translateNode(AvroIO.Write.Bound<T> transform, FlinkBatchTranslationContext context) {
-      DataSet<T> inputDataSet = context.getInputDataSet(context.getInput(transform));
+    public void translateNode(
+        AvroIO.Write.Bound<T> transform,
+        FlinkBatchTranslationContext context) {
+      DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(context.getInput(transform));
+
       String filenamePrefix = transform.getFilenamePrefix();
       String filenameSuffix = transform.getFilenameSuffix();
       int numShards = transform.getNumShards();
       String shardNameTemplate = transform.getShardNameTemplate();
 
       // TODO: Implement these. We need Flink support for this.
-      LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
+      LOG.warn(
+          "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
           filenameSuffix);
-      LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate);
+      LOG.warn(
+          "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
+          shardNameTemplate);
 
       // This is super hacky, but unfortunately we cannot get the type otherwise
       Class<T> extractedAvroType;
@@ -224,8 +205,17 @@ public class FlinkBatchTransformTranslators {
         throw new RuntimeException("Could not access type from AvroIO.Bound", e);
       }
 
-      DataSink<T> dataSink = inputDataSet.output(new AvroOutputFormat<>(new Path
-          (filenamePrefix), extractedAvroType));
+      MapOperator<WindowedValue<T>, T> valueStream = inputDataSet.map(
+          new MapFunction<WindowedValue<T>, T>() {
+            @Override
+            public T map(WindowedValue<T> value) throws Exception {
+              return value.getValue();
+            }
+          }).returns(new CoderTypeInformation<>(context.getInput(transform).getCoder()));
+
+
+      DataSink<T> dataSink = valueStream.output(
+          new AvroOutputFormat<>(new Path(filenamePrefix), extractedAvroType));
 
       if (numShards > 0) {
         dataSink.setParallelism(numShards);
@@ -233,37 +223,16 @@ public class FlinkBatchTransformTranslators {
     }
   }
 
-  private static class TextIOReadTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Read.Bound<String>> {
-    private static final Logger LOG = LoggerFactory.getLogger(TextIOReadTranslatorBatch.class);
-
-    @Override
-    public void translateNode(TextIO.Read.Bound<String> transform, FlinkBatchTranslationContext context) {
-      String path = transform.getFilepattern();
-      String name = transform.getName();
-
-      TextIO.CompressionType compressionType = transform.getCompressionType();
-      boolean needsValidation = transform.needsValidation();
-
-      // TODO: Implement these. We need Flink support for this.
-      LOG.warn("Translation of TextIO.CompressionType not yet supported. Is: {}.", compressionType);
-      LOG.warn("Translation of TextIO.Read.needsValidation not yet supported. Is: {}.", needsValidation);
-
-      PValue output = context.getOutput(transform);
-
-      TypeInformation<String> typeInformation = context.getTypeInfo(output);
-      DataSource<String> source = new DataSource<>(context.getExecutionEnvironment(), new TextInputFormat(new Path(path)), typeInformation, name);
-
-      context.setOutputDataSet(output, source);
-    }
-  }
-
-  private static class TextIOWriteTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>> {
+  private static class TextIOWriteTranslatorBatch<T>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>> {
     private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteTranslatorBatch.class);
 
     @Override
-    public void translateNode(TextIO.Write.Bound<T> transform, FlinkBatchTranslationContext context) {
+    public void translateNode(
+        TextIO.Write.Bound<T> transform,
+        FlinkBatchTranslationContext context) {
       PValue input = context.getInput(transform);
-      DataSet<T> inputDataSet = context.getInputDataSet(input);
+      DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input);
 
       String filenamePrefix = transform.getFilenamePrefix();
       String filenameSuffix = transform.getFilenameSuffix();
@@ -272,12 +241,25 @@ public class FlinkBatchTransformTranslators {
       String shardNameTemplate = transform.getShardNameTemplate();
 
       // TODO: Implement these. We need Flink support for this.
-      LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", needsValidation);
-      LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", filenameSuffix);
-      LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate);
+      LOG.warn(
+          "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.",
+          needsValidation);
+      LOG.warn(
+          "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
+          filenameSuffix);
+      LOG.warn(
+          "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
+          shardNameTemplate);
 
-      //inputDataSet.print();
-      DataSink<T> dataSink = inputDataSet.writeAsText(filenamePrefix);
+      MapOperator<WindowedValue<T>, T> valueStream = inputDataSet.map(
+          new MapFunction<WindowedValue<T>, T>() {
+            @Override
+            public T map(WindowedValue<T> value) throws Exception {
+              return value.getValue();
+            }
+          }).returns(new CoderTypeInformation<>(transform.getCoder()));
+
+      DataSink<T> dataSink = valueStream.writeAsText(filenamePrefix);
 
       if (numShards > 0) {
         dataSink.setParallelism(numShards);
@@ -285,148 +267,414 @@ public class FlinkBatchTransformTranslators {
     }
   }
 
-  private static class ConsoleIOWriteTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ConsoleIO.Write.Bound> {
+  private static class WindowBoundTranslatorBatch<T>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Bound<T>> {
+
     @Override
-    public void translateNode(ConsoleIO.Write.Bound transform, FlinkBatchTranslationContext context) {
+    public void translateNode(Window.Bound<T> transform, FlinkBatchTranslationContext context) {
       PValue input = context.getInput(transform);
-      DataSet<?> inputDataSet = context.getInputDataSet(input);
-      inputDataSet.printOnTaskManager(transform.getName());
+
+      TypeInformation<WindowedValue<T>> resultTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input);
+
+      @SuppressWarnings("unchecked")
+      final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy =
+          (WindowingStrategy<T, ? extends BoundedWindow>)
+              context.getOutput(transform).getWindowingStrategy();
+
+      WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+
+      FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
+          new FlinkAssignWindows<>(windowFn);
+
+      DataSet<WindowedValue<T>> resultDataSet = inputDataSet
+          .flatMap(assignWindowsFunction)
+          .name(context.getOutput(transform).getName())
+          .returns(resultTypeInfo);
+
+      context.setOutputDataSet(context.getOutput(transform), resultDataSet);
     }
   }
 
-  private static class WriteSinkTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>> {
+  private static class GroupByKeyTranslatorBatch<K, InputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, InputT>> {
 
     @Override
-    public void translateNode(Write.Bound<T> transform, FlinkBatchTranslationContext context) {
-      String name = transform.getName();
-      PValue input = context.getInput(transform);
-      DataSet<T> inputDataSet = context.getInputDataSet(input);
+    public void translateNode(
+        GroupByKey<K, InputT> transform,
+        FlinkBatchTranslationContext context) {
+
+      // for now, this is copied from the Combine.PerKey translater. Once we have the new runner API
+      // we can replace GroupByKey by a Combine.PerKey with the Concatenate CombineFn
+
+      DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+
+      Combine.KeyedCombineFn<K, InputT, List<InputT>, List<InputT>> combineFn =
+          new Concatenate<InputT>().asKeyedFn();
+
+      KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) context.getInput(transform).getCoder();
+
+      Coder<List<InputT>> accumulatorCoder;
+
+      try {
+        accumulatorCoder =
+            combineFn.getAccumulatorCoder(
+                context.getInput(transform).getPipeline().getCoderRegistry(),
+                inputCoder.getKeyCoder(),
+                inputCoder.getValueCoder());
+      } catch (CannotProvideCoderException e) {
+        throw new RuntimeException(e);
+      }
+
+      WindowingStrategy<?, ?> windowingStrategy =
+          context.getInput(transform).getWindowingStrategy();
+
+      TypeInformation<WindowedValue<KV<K, InputT>>> kvCoderTypeInformation =
+          new KvCoderTypeInformation<>(
+              WindowedValue.getFullCoder(
+                  inputCoder,
+                  windowingStrategy.getWindowFn().windowCoder()));
+
+      TypeInformation<WindowedValue<KV<K, List<InputT>>>> partialReduceTypeInfo =
+          new KvCoderTypeInformation<>(
+              WindowedValue.getFullCoder(
+                  KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
+                  windowingStrategy.getWindowFn().windowCoder()));
+
+      Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
+          new UnsortedGrouping<>(
+              inputDataSet,
+              new Keys.ExpressionKeys<>(new String[]{"key"},
+                  kvCoderTypeInformation));
+
+      FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> partialReduceFunction;
+      FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction;
+
+      if (windowingStrategy.getWindowFn().isNonMerging()) {
+        @SuppressWarnings("unchecked")
+        WindowingStrategy<?, BoundedWindow> boundedStrategy =
+            (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
+
+        partialReduceFunction = new FlinkPartialReduceFunction<>(
+            combineFn,
+            boundedStrategy,
+            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+            context.getPipelineOptions());
+
+        reduceFunction = new FlinkReduceFunction<>(
+            combineFn,
+            boundedStrategy,
+            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+            context.getPipelineOptions());
+
+      } else {
+        if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
+          throw new UnsupportedOperationException(
+              "Merging WindowFn with windows other than IntervalWindow are not supported.");
+        }
+
+        @SuppressWarnings("unchecked")
+        WindowingStrategy<?, IntervalWindow> intervalStrategy =
+            (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
+
+        partialReduceFunction = new FlinkMergingPartialReduceFunction<>(
+            combineFn,
+            intervalStrategy,
+            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+            context.getPipelineOptions());
+
+        reduceFunction = new FlinkMergingReduceFunction<>(
+            combineFn,
+            intervalStrategy,
+            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+            context.getPipelineOptions());
+      }
+
+      // Partially GroupReduce the values into the intermediate format AccumT (combine)
+      GroupCombineOperator<
+          WindowedValue<KV<K, InputT>>,
+          WindowedValue<KV<K, List<InputT>>>> groupCombine =
+          new GroupCombineOperator<>(
+              inputGrouping,
+              partialReduceTypeInfo,
+              partialReduceFunction,
+              "GroupCombine: " + transform.getName());
+
+      Grouping<WindowedValue<KV<K, List<InputT>>>> intermediateGrouping =
+          new UnsortedGrouping<>(
+              groupCombine, new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType()));
+
+      // Fully reduce the values and create output format VO
+      GroupReduceOperator<
+          WindowedValue<KV<K, List<InputT>>>, WindowedValue<KV<K, List<InputT>>>> outputDataSet =
+          new GroupReduceOperator<>(
+              intermediateGrouping, partialReduceTypeInfo, reduceFunction, transform.getName());
+
+      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
 
-      inputDataSet.output(new SinkOutputFormat<>(transform, context.getPipelineOptions())).name(name);
     }
   }
 
   /**
-   * Translates a GroupByKey while ignoring window assignments. Current ignores windows.
+   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+   *
+   * <p>For internal use to translate {@link GroupByKey}. For a large {@link PCollection} this
+   * is expected to crash!
+   *
+   * <p>This is copied from the dataflow runner code.
+   *
+   * @param <T> the type of elements to concatenate.
    */
-  private static class GroupByKeyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> {
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<T>();
+    }
 
     @Override
-    public void translateNode(GroupByKey<K, V> transform, FlinkBatchTranslationContext context) {
-      DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform));
-      GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>();
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
 
-      TypeInformation<KV<K, Iterable<V>>> typeInformation = context.getTypeInfo(context.getOutput(transform));
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
 
-      Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType()));
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
 
-      GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet =
-          new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName());
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
 
-      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
     }
   }
 
-  private static class CombinePerKeyTranslatorBatch<K, VI, VA, VO> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Combine.PerKey<K, VI, VO>> {
+
+  private static class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          Combine.PerKey<K, InputT, OutputT>> {
 
     @Override
-    public void translateNode(Combine.PerKey<K, VI, VO> transform, FlinkBatchTranslationContext context) {
-      DataSet<KV<K, VI>> inputDataSet = context.getInputDataSet(context.getInput(transform));
+    @SuppressWarnings("unchecked")
+    public void translateNode(
+        Combine.PerKey<K, InputT, OutputT> transform,
+        FlinkBatchTranslationContext context) {
+      DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
 
-      @SuppressWarnings("unchecked")
-      Combine.KeyedCombineFn<K, VI, VA, VO> keyedCombineFn = (Combine.KeyedCombineFn<K, VI, VA, VO>) transform.getFn();
+      CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn =
+          (CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT>) transform.getFn();
+
+      KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) context.getInput(transform).getCoder();
 
-      KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) context.getInput(transform).getCoder();
+      Coder<AccumT> accumulatorCoder;
 
-      Coder<VA> accumulatorCoder =
-          null;
       try {
-        accumulatorCoder = keyedCombineFn.getAccumulatorCoder(context.getInput(transform).getPipeline().getCoderRegistry(), inputCoder.getKeyCoder(), inputCoder.getValueCoder());
+        accumulatorCoder =
+            combineFn.getAccumulatorCoder(
+                context.getInput(transform).getPipeline().getCoderRegistry(),
+                inputCoder.getKeyCoder(),
+                inputCoder.getValueCoder());
       } catch (CannotProvideCoderException e) {
-        e.printStackTrace();
-        // TODO
+        throw new RuntimeException(e);
       }
 
-      TypeInformation<KV<K, VI>> kvCoderTypeInformation = new KvCoderTypeInformation<>(inputCoder);
-      TypeInformation<KV<K, VA>> partialReduceTypeInfo = new KvCoderTypeInformation<>(KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder));
+      WindowingStrategy<?, ?> windowingStrategy =
+          context.getInput(transform).getWindowingStrategy();
+
+      TypeInformation<WindowedValue<KV<K, InputT>>> kvCoderTypeInformation =
+          new KvCoderTypeInformation<>(
+              WindowedValue.getFullCoder(
+                  inputCoder,
+                  windowingStrategy.getWindowFn().windowCoder()));
+
+      TypeInformation<WindowedValue<KV<K, AccumT>>> partialReduceTypeInfo =
+          new KvCoderTypeInformation<>(
+              WindowedValue.getFullCoder(
+                  KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
+                  windowingStrategy.getWindowFn().windowCoder()));
+
+      Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
+          new UnsortedGrouping<>(
+              inputDataSet,
+              new Keys.ExpressionKeys<>(new String[]{"key"},
+                  kvCoderTypeInformation));
+
+      // construct a map from side input to WindowingStrategy so that
+      // the DoFn runner can map main-input windows to side input windows
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
+      for (PCollectionView<?> sideInput: transform.getSideInputs()) {
+        sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
+      }
 
-      Grouping<KV<K, VI>> inputGrouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, kvCoderTypeInformation));
+      if (windowingStrategy.getWindowFn().isNonMerging()) {
+        WindowingStrategy<?, BoundedWindow> boundedStrategy =
+            (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
+
+        FlinkPartialReduceFunction<K, InputT, AccumT, ?> partialReduceFunction =
+            new FlinkPartialReduceFunction<>(
+                combineFn,
+                boundedStrategy,
+                sideInputStrategies,
+                context.getPipelineOptions());
+
+        FlinkReduceFunction<K, AccumT, OutputT, ?> reduceFunction =
+            new FlinkReduceFunction<>(
+                combineFn,
+                boundedStrategy,
+                sideInputStrategies,
+                context.getPipelineOptions());
+
+        // Partially GroupReduce the values into the intermediate format AccumT (combine)
+        GroupCombineOperator<
+            WindowedValue<KV<K, InputT>>,
+            WindowedValue<KV<K, AccumT>>> groupCombine =
+            new GroupCombineOperator<>(
+                inputGrouping,
+                partialReduceTypeInfo,
+                partialReduceFunction,
+                "GroupCombine: " + transform.getName());
+
+        transformSideInputs(transform.getSideInputs(), groupCombine, context);
+
+        TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
+            context.getTypeInfo(context.getOutput(transform));
+
+        Grouping<WindowedValue<KV<K, AccumT>>> intermediateGrouping =
+            new UnsortedGrouping<>(
+                groupCombine,
+                new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType()));
+
+        // Fully reduce the values and create output format OutputT
+        GroupReduceOperator<
+            WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> outputDataSet =
+            new GroupReduceOperator<>(
+                intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName());
+
+        transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+
+        context.setOutputDataSet(context.getOutput(transform), outputDataSet);
 
-      FlinkPartialReduceFunction<K, VI, VA> partialReduceFunction = new FlinkPartialReduceFunction<>(keyedCombineFn);
+      } else {
+        if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
+          throw new UnsupportedOperationException(
+              "Merging WindowFn with windows other than IntervalWindow are not supported.");
+        }
 
-      // Partially GroupReduce the values into the intermediate format VA (combine)
-      GroupCombineOperator<KV<K, VI>, KV<K, VA>> groupCombine =
-          new GroupCombineOperator<>(inputGrouping, partialReduceTypeInfo, partialReduceFunction,
-              "GroupCombine: " + transform.getName());
+        // for merging windows we can't to a pre-shuffle combine step since
+        // elements would not be in their correct windows for side-input access
 
-      // Reduce fully to VO
-      GroupReduceFunction<KV<K, VA>, KV<K, VO>> reduceFunction = new FlinkReduceFunction<>(keyedCombineFn);
+        WindowingStrategy<?, IntervalWindow> intervalStrategy =
+            (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
 
-      TypeInformation<KV<K, VO>> reduceTypeInfo = context.getTypeInfo(context.getOutput(transform));
+        FlinkMergingNonShuffleReduceFunction<K, InputT, AccumT, OutputT, ?> reduceFunction =
+            new FlinkMergingNonShuffleReduceFunction<>(
+                combineFn,
+                intervalStrategy,
+                sideInputStrategies,
+                context.getPipelineOptions());
 
-      Grouping<KV<K, VA>> intermediateGrouping = new UnsortedGrouping<>(groupCombine, new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType()));
+        TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
+            context.getTypeInfo(context.getOutput(transform));
+
+        Grouping<WindowedValue<KV<K, InputT>>> grouping =
+            new UnsortedGrouping<>(
+                inputDataSet,
+                new Keys.ExpressionKeys<>(new String[]{"key"}, kvCoderTypeInformation));
+
+        // Fully reduce the values and create output format OutputT
+        GroupReduceOperator<
+            WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> outputDataSet =
+            new GroupReduceOperator<>(
+                grouping, reduceTypeInfo, reduceFunction, transform.getName());
+
+        transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+
+        context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+      }
 
-      // Fully reduce the values and create output format VO
-      GroupReduceOperator<KV<K, VA>, KV<K, VO>> outputDataSet =
-          new GroupReduceOperator<>(intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName());
 
-      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
     }
   }
 
-//  private static class CombineGroupedValuesTranslator<K, VI, VO> implements FlinkPipelineTranslator.TransformTranslator<Combine.GroupedValues<K, VI, VO>> {
-//
-//    @Override
-//    public void translateNode(Combine.GroupedValues<K, VI, VO> transform, TranslationContext context) {
-//      DataSet<KV<K, VI>> inputDataSet = context.getInputDataSet(transform.getInput());
-//
-//      Combine.KeyedCombineFn<? super K, ? super VI, ?, VO> keyedCombineFn = transform.getFn();
-//
-//      GroupReduceFunction<KV<K, VI>, KV<K, VO>> groupReduceFunction = new FlinkCombineFunction<>(keyedCombineFn);
-//
-//      TypeInformation<KV<K, VO>> typeInformation = context.getTypeInfo(transform.getOutput());
-//
-//      Grouping<KV<K, VI>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{""}, inputDataSet.getType()));
-//
-//      GroupReduceOperator<KV<K, VI>, KV<K, VO>> outputDataSet =
-//          new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName());
-//      context.setOutputDataSet(transform.getOutput(), outputDataSet);
-//    }
-//  }
-
-  private static class ParDoBoundTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.Bound<IN, OUT>> {
-    private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class);
+  private static class ParDoBoundTranslatorBatch<InputT, OutputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          ParDo.Bound<InputT, OutputT>> {
 
     @Override
-    public void translateNode(ParDo.Bound<IN, OUT> transform, FlinkBatchTranslationContext context) {
-      DataSet<IN> inputDataSet = context.getInputDataSet(context.getInput(transform));
+    public void translateNode(
+        ParDo.Bound<InputT, OutputT> transform,
+        FlinkBatchTranslationContext context) {
+      DataSet<WindowedValue<InputT>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
 
-      final DoFn<IN, OUT> doFn = transform.getFn();
+      final DoFn<InputT, OutputT> doFn = transform.getFn();
 
-      TypeInformation<OUT> typeInformation = context.getTypeInfo(context.getOutput(transform));
+      TypeInformation<WindowedValue<OutputT>> typeInformation =
+          context.getTypeInfo(context.getOutput(transform));
 
-      FlinkDoFnFunction<IN, OUT> doFnWrapper = new FlinkDoFnFunction<>(doFn, context.getPipelineOptions());
-      MapPartitionOperator<IN, OUT> outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName());
+      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
 
-      transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+      // construct a map from side input to WindowingStrategy so that
+      // the DoFn runner can map main-input windows to side input windows
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
+      for (PCollectionView<?> sideInput: sideInputs) {
+        sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
+      }
+
+      FlinkDoFnFunction<InputT, OutputT> doFnWrapper =
+          new FlinkDoFnFunction<>(
+              doFn,
+              context.getOutput(transform).getWindowingStrategy(),
+              sideInputStrategies,
+              context.getPipelineOptions());
+
+      MapPartitionOperator<WindowedValue<InputT>, WindowedValue<OutputT>> outputDataSet =
+          new MapPartitionOperator<>(
+              inputDataSet,
+              typeInformation,
+              doFnWrapper,
+              transform.getName());
+
+      transformSideInputs(sideInputs, outputDataSet, context);
 
       context.setOutputDataSet(context.getOutput(transform), outputDataSet);
     }
   }
 
-  private static class ParDoBoundMultiTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.BoundMulti<IN, OUT>> {
-    private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslatorBatch.class);
+  private static class ParDoBoundMultiTranslatorBatch<InputT, OutputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          ParDo.BoundMulti<InputT, OutputT>> {
 
     @Override
-    public void translateNode(ParDo.BoundMulti<IN, OUT> transform, FlinkBatchTranslationContext context) {
-      DataSet<IN> inputDataSet = context.getInputDataSet(context.getInput(transform));
+    public void translateNode(
+        ParDo.BoundMulti<InputT, OutputT> transform,
+        FlinkBatchTranslationContext context) {
+      DataSet<WindowedValue<InputT>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
 
-      final DoFn<IN, OUT> doFn = transform.getFn();
+      final DoFn<InputT, OutputT> doFn = transform.getFn();
 
       Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
 
       Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
-      // put the main output at index 0, FlinkMultiOutputDoFnFunction also expects this
+      // put the main output at index 0, FlinkMultiOutputDoFnFunction  expects this
       outputMap.put(transform.getMainOutputTag(), 0);
       int count = 1;
       for (TupleTag<?> tag: outputs.keySet()) {
@@ -435,58 +683,118 @@ public class FlinkBatchTransformTranslators {
         }
       }
 
+      // assume that the windowing strategy is the same for all outputs
+      WindowingStrategy<?, ?> windowingStrategy = null;
+
       // collect all output Coders and create a UnionCoder for our tagged outputs
       List<Coder<?>> outputCoders = Lists.newArrayList();
       for (PCollection<?> coll: outputs.values()) {
         outputCoders.add(coll.getCoder());
+        windowingStrategy = coll.getWindowingStrategy();
+      }
+
+      if (windowingStrategy == null) {
+        throw new IllegalStateException("No outputs defined.");
       }
 
       UnionCoder unionCoder = UnionCoder.of(outputCoders);
 
-      @SuppressWarnings("unchecked")
-      TypeInformation<RawUnionValue> typeInformation = new CoderTypeInformation<>(unionCoder);
+      TypeInformation<WindowedValue<RawUnionValue>> typeInformation =
+          new CoderTypeInformation<>(
+              WindowedValue.getFullCoder(
+                  unionCoder,
+                  windowingStrategy.getWindowFn().windowCoder()));
 
-      @SuppressWarnings("unchecked")
-      FlinkMultiOutputDoFnFunction<IN, OUT> doFnWrapper = new FlinkMultiOutputDoFnFunction(doFn, context.getPipelineOptions(), outputMap);
-      MapPartitionOperator<IN, RawUnionValue> outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName());
+      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
 
-      transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+      // construct a map from side input to WindowingStrategy so that
+      // the DoFn runner can map main-input windows to side input windows
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
+      for (PCollectionView<?> sideInput: sideInputs) {
+        sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
+      }
 
-      for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) {
-        TypeInformation<Object> outputType = context.getTypeInfo(output.getValue());
-        int outputTag = outputMap.get(output.getKey());
-        FlinkMultiOutputPruningFunction<Object> pruningFunction = new FlinkMultiOutputPruningFunction<>(outputTag);
-        FlatMapOperator<RawUnionValue, Object> pruningOperator = new
-            FlatMapOperator<>(outputDataSet, outputType,
-            pruningFunction, output.getValue().getName());
-        context.setOutputDataSet(output.getValue(), pruningOperator);
+      @SuppressWarnings("unchecked")
+      FlinkMultiOutputDoFnFunction<InputT, OutputT> doFnWrapper =
+          new FlinkMultiOutputDoFnFunction(
+              doFn,
+              windowingStrategy,
+              sideInputStrategies,
+              context.getPipelineOptions(),
+              outputMap);
+
+      MapPartitionOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>> taggedDataSet =
+          new MapPartitionOperator<>(
+              inputDataSet,
+              typeInformation,
+              doFnWrapper,
+              transform.getName());
+
+      transformSideInputs(sideInputs, taggedDataSet, context);
 
+      for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) {
+        pruneOutput(
+            taggedDataSet,
+            context,
+            outputMap.get(output.getKey()),
+            (PCollection) output.getValue());
       }
     }
+
+    private <T> void pruneOutput(
+        MapPartitionOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>> taggedDataSet,
+        FlinkBatchTranslationContext context,
+        int integerTag,
+        PCollection<T> collection) {
+      TypeInformation<WindowedValue<T>> outputType = context.getTypeInfo(collection);
+
+      FlinkMultiOutputPruningFunction<T> pruningFunction =
+          new FlinkMultiOutputPruningFunction<>(integerTag);
+
+      FlatMapOperator<WindowedValue<RawUnionValue>, WindowedValue<T>> pruningOperator =
+          new FlatMapOperator<>(
+              taggedDataSet,
+              outputType,
+              pruningFunction,
+              collection.getName());
+
+      context.setOutputDataSet(collection, pruningOperator);
+    }
   }
 
-  private static class FlattenPCollectionTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Flatten.FlattenPCollectionList<T>> {
+  private static class FlattenPCollectionTranslatorBatch<T>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          Flatten.FlattenPCollectionList<T>> {
 
     @Override
     @SuppressWarnings("unchecked")
-    public void translateNode(Flatten.FlattenPCollectionList<T> transform, FlinkBatchTranslationContext context) {
+    public void translateNode(
+        Flatten.FlattenPCollectionList<T> transform,
+        FlinkBatchTranslationContext context) {
+
       List<PCollection<T>> allInputs = context.getInput(transform).getAll();
-      DataSet<T> result = null;
+      DataSet<WindowedValue<T>> result = null;
+
       if (allInputs.isEmpty()) {
+
         // create an empty dummy source to satisfy downstream operations
         // we cannot create an empty source in Flink, therefore we have to
         // add the flatMap that simply never forwards the single element
         DataSource<String> dummySource =
             context.getExecutionEnvironment().fromElements("dummy");
-        result = dummySource.flatMap(new FlatMapFunction<String, T>() {
+        result = dummySource.flatMap(new FlatMapFunction<String, WindowedValue<T>>() {
           @Override
-          public void flatMap(String s, Collector<T> collector) throws Exception {
+          public void flatMap(String s, Collector<WindowedValue<T>> collector) throws Exception {
             // never return anything
           }
-        }).returns(new CoderTypeInformation<>((Coder<T>) VoidCoder.of()));
+        }).returns(
+            new CoderTypeInformation<>(
+                WindowedValue.getFullCoder(
+                    (Coder<T>) VoidCoder.of(),
+                    GlobalWindow.Coder.INSTANCE)));
       } else {
         for (PCollection<T> collection : allInputs) {
-          DataSet<T> current = context.getInputDataSet(collection);
+          DataSet<WindowedValue<T>> current = context.getInputDataSet(collection);
           if (result == null) {
             result = current;
           } else {
@@ -494,103 +802,47 @@ public class FlinkBatchTransformTranslators {
           }
         }
       }
-      context.setOutputDataSet(context.getOutput(transform), result);
-    }
-  }
 
-  private static class CreatePCollectionViewTranslatorBatch<R, T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<View.CreatePCollectionView<R, T>> {
-    @Override
-    public void translateNode(View.CreatePCollectionView<R, T> transform, FlinkBatchTranslationContext context) {
-      DataSet<T> inputDataSet = context.getInputDataSet(context.getInput(transform));
-      PCollectionView<T> input = transform.apply(null);
-      context.setSideInputDataSet(input, inputDataSet);
+      // insert a dummy filter, there seems to be a bug in Flink
+      // that produces duplicate elements after the union in some cases
+      // if we don't
+      result = result.filter(new FilterFunction<WindowedValue<T>>() {
+        @Override
+        public boolean filter(WindowedValue<T> tWindowedValue) throws Exception {
+          return true;
+        }
+      }).name("UnionFixFilter");
+      context.setOutputDataSet(context.getOutput(transform), result);
     }
   }
 
-  private static class CreateTranslatorBatch<OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Create.Values<OUT>> {
+  private static class CreatePCollectionViewTranslatorBatch<ElemT, ViewT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          View.CreatePCollectionView<ElemT, ViewT>> {
 
     @Override
-    public void translateNode(Create.Values<OUT> transform, FlinkBatchTranslationContext context) {
-      TypeInformation<OUT> typeInformation = context.getOutputTypeInfo();
-      Iterable<OUT> elements = transform.getElements();
-
-      // we need to serialize the elements to byte arrays, since they might contain
-      // elements that are not serializable by Java serialization. We deserialize them
-      // in the FlatMap function using the Coder.
-
-      List<byte[]> serializedElements = Lists.newArrayList();
-      Coder<OUT> coder = context.getOutput(transform).getCoder();
-      for (OUT element: elements) {
-        ByteArrayOutputStream bao = new ByteArrayOutputStream();
-        try {
-          coder.encode(element, bao, Coder.Context.OUTER);
-          serializedElements.add(bao.toByteArray());
-        } catch (IOException e) {
-          throw new RuntimeException("Could not serialize Create elements using Coder: " + e);
-        }
-      }
+    public void translateNode(
+        View.CreatePCollectionView<ElemT, ViewT> transform,
+        FlinkBatchTranslationContext context) {
+      DataSet<WindowedValue<ElemT>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
 
-      DataSet<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1);
-      FlinkCreateFunction<Integer, OUT> flatMapFunction = new FlinkCreateFunction<>(serializedElements, coder);
-      FlatMapOperator<Integer, OUT> outputDataSet = new FlatMapOperator<>(initDataSet, typeInformation, flatMapFunction, transform.getName());
+      PCollectionView<ViewT> input = transform.getView();
 
-      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+      context.setSideInputDataSet(input, inputDataSet);
     }
   }
 
-  private static void transformSideInputs(List<PCollectionView<?>> sideInputs,
-                                          MapPartitionOperator<?, ?> outputDataSet,
-                                          FlinkBatchTranslationContext context) {
+  private static void transformSideInputs(
+      List<PCollectionView<?>> sideInputs,
+      SingleInputUdfOperator<?, ?, ?> outputDataSet,
+      FlinkBatchTranslationContext context) {
     // get corresponding Flink broadcast DataSets
-    for(PCollectionView<?> input : sideInputs) {
+    for (PCollectionView<?> input : sideInputs) {
       DataSet<?> broadcastSet = context.getSideInputDataSet(input);
       outputDataSet.withBroadcastSet(broadcastSet, input.getTagInternal().getId());
     }
   }
 
-// Disabled because it depends on a pending pull request to the DataFlowSDK
-  /**
-   * Special composite transform translator. Only called if the CoGroup is two dimensional.
-   * @param <K>
-   */
-  private static class CoGroupByKeyTranslatorBatch<K, V1, V2> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<CoGroupByKey<K>> {
-
-    @Override
-    public void translateNode(CoGroupByKey<K> transform, FlinkBatchTranslationContext context) {
-      KeyedPCollectionTuple<K> input = context.getInput(transform);
-
-      CoGbkResultSchema schema = input.getCoGbkResultSchema();
-      List<KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?>> keyedCollections = input.getKeyedCollections();
-
-      KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection1 = keyedCollections.get(0);
-      KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection2 = keyedCollections.get(1);
-
-      TupleTag<?> tupleTag1 = taggedCollection1.getTupleTag();
-      TupleTag<?> tupleTag2 = taggedCollection2.getTupleTag();
-
-      PCollection<? extends KV<K, ?>> collection1 = taggedCollection1.getCollection();
-      PCollection<? extends KV<K, ?>> collection2 = taggedCollection2.getCollection();
-
-      DataSet<KV<K,V1>> inputDataSet1 = context.getInputDataSet(collection1);
-      DataSet<KV<K,V2>> inputDataSet2 = context.getInputDataSet(collection2);
-
-      TypeInformation<KV<K,CoGbkResult>> typeInfo = context.getOutputTypeInfo();
-
-      FlinkCoGroupKeyedListAggregator<K,V1,V2> aggregator = new FlinkCoGroupKeyedListAggregator<>(schema, tupleTag1, tupleTag2);
-
-      Keys.ExpressionKeys<KV<K,V1>> keySelector1 = new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet1.getType());
-      Keys.ExpressionKeys<KV<K,V2>> keySelector2 = new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet2.getType());
-
-      DataSet<KV<K, CoGbkResult>> out = new CoGroupOperator<>(inputDataSet1, inputDataSet2,
-                                  keySelector1, keySelector2,
-                                                          aggregator, typeInfo, null, transform.getName());
-      context.setOutputDataSet(context.getOutput(transform), out);
-    }
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Miscellaneous
-  // --------------------------------------------------------------------------------------------
-
   private FlinkBatchTransformTranslators() {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
index 501b1ea..ecc3a65 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java
@@ -18,26 +18,28 @@
 package org.apache.beam.runners.flink.translation;
 
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.types.KvCoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TypedPValue;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 
 import java.util.HashMap;
 import java.util.Map;
 
+/**
+ * Helper for {@link FlinkBatchPipelineTranslator} and translators in
+ * {@link FlinkBatchTransformTranslators}.
+ */
 public class FlinkBatchTranslationContext {
   
   private final Map<PValue, DataSet<?>> dataSets;
@@ -81,13 +83,13 @@ public class FlinkBatchTranslationContext {
   }
   
   @SuppressWarnings("unchecked")
-  public <T> DataSet<T> getInputDataSet(PValue value) {
+  public <T> DataSet<WindowedValue<T>> getInputDataSet(PValue value) {
     // assume that the DataSet is used as an input if retrieved here
     danglingDataSets.remove(value);
-    return (DataSet<T>) dataSets.get(value);
+    return (DataSet<WindowedValue<T>>) dataSets.get(value);
   }
 
-  public void setOutputDataSet(PValue value, DataSet<?> set) {
+  public <T> void setOutputDataSet(PValue value, DataSet<WindowedValue<T>> set) {
     if (!dataSets.containsKey(value)) {
       dataSets.put(value, set);
       danglingDataSets.put(value, set);
@@ -107,40 +109,32 @@ public class FlinkBatchTranslationContext {
     return (DataSet<T>) broadcastDataSets.get(value);
   }
 
-  public void setSideInputDataSet(PCollectionView<?> value, DataSet<?> set) {
+  public <ViewT, ElemT> void setSideInputDataSet(
+      PCollectionView<ViewT> value,
+      DataSet<WindowedValue<ElemT>> set) {
     if (!broadcastDataSets.containsKey(value)) {
       broadcastDataSets.put(value, set);
     }
   }
-  
-  @SuppressWarnings("unchecked")
-  public <T> TypeInformation<T> getTypeInfo(PInput output) {
-    if (output instanceof TypedPValue) {
-      Coder<?> outputCoder = ((TypedPValue) output).getCoder();
-      if (outputCoder instanceof KvCoder) {
-        return new KvCoderTypeInformation((KvCoder) outputCoder);
-      } else {
-        return new CoderTypeInformation(outputCoder);
-      }
-    }
-    return new GenericTypeInfo<>((Class<T>)Object.class);
-  }
-
-  public <T> TypeInformation<T> getInputTypeInfo() {
-    return getTypeInfo(currentTransform.getInput());
-  }
 
-  public <T> TypeInformation<T> getOutputTypeInfo() {
-    return getTypeInfo((PValue) currentTransform.getOutput());
+  @SuppressWarnings("unchecked")
+  public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) {
+    Coder<T> valueCoder = collection.getCoder();
+    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
+        WindowedValue.getFullCoder(
+            valueCoder,
+            collection.getWindowingStrategy().getWindowFn().windowCoder());
+
+    return new CoderTypeInformation<>(windowedValueCoder);
   }
 
   @SuppressWarnings("unchecked")
-  <I extends PInput> I getInput(PTransform<I, ?> transform) {
-    return (I) currentTransform.getInput();
+  <T extends PInput> T getInput(PTransform<T, ?> transform) {
+    return (T) currentTransform.getInput();
   }
 
   @SuppressWarnings("unchecked")
-  <O extends POutput> O getOutput(PTransform<?, O> transform) {
-    return (O) currentTransform.getOutput();
+  <T extends POutput> T getOutput(PTransform<?, T> transform) {
+    return (T) currentTransform.getOutput();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 2778d5c..b3fed99 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.runners.flink.translation;
 
-import org.apache.beam.runners.flink.translation.functions.UnionCoder;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.FlinkCoder;
 import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
@@ -46,6 +45,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -229,29 +229,15 @@ public class FlinkStreamingTransformTranslators {
       BoundedSource<T> boundedSource = transform.getSource();
       PCollection<T> output = context.getOutput(transform);
 
-      Coder<T> defaultOutputCoder = boundedSource.getDefaultOutputCoder();
-      CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(defaultOutputCoder);
+      TypeInformation<WindowedValue<T>> typeInfo = context.getTypeInfo(output);
 
-      DataStream<T> source = context.getExecutionEnvironment().createInput(
+      DataStream<WindowedValue<T>> source = context.getExecutionEnvironment().createInput(
           new SourceInputFormat<>(
               boundedSource,
               context.getPipelineOptions()),
           typeInfo);
 
-      DataStream<WindowedValue<T>> windowedStream = source.flatMap(
-          new FlatMapFunction<T, WindowedValue<T>>() {
-            @Override
-            public void flatMap(T value, Collector<WindowedValue<T>> out) throws Exception {
-              out.collect(
-                  WindowedValue.of(value,
-                    Instant.now(),
-                    GlobalWindow.INSTANCE,
-                    PaneInfo.NO_FIRING));
-            }
-          })
-          .assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>());
-
-      context.setOutputDataStream(output, windowedStream);
+      context.setOutputDataStream(output, source);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
index 8bc7317..0cb80ba 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
@@ -17,21 +17,30 @@
  */
 package org.apache.beam.runners.flink.translation;
 
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 
 import com.google.common.base.Preconditions;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 import java.util.HashMap;
 import java.util.Map;
 
+/**
+ * Helper for keeping track of which {@link DataStream DataStreams} map
+ * to which {@link PTransform PTransforms}.
+ */
 public class FlinkStreamingTranslationContext {
 
   private final StreamExecutionEnvironment env;
@@ -80,12 +89,24 @@ public class FlinkStreamingTranslationContext {
   }
 
   @SuppressWarnings("unchecked")
-  public <I extends PInput> I getInput(PTransform<I, ?> transform) {
-    return (I) currentTransform.getInput();
+  public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) {
+    Coder<T> valueCoder = collection.getCoder();
+    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
+        WindowedValue.getFullCoder(
+            valueCoder,
+            collection.getWindowingStrategy().getWindowFn().windowCoder());
+
+    return new CoderTypeInformation<>(windowedValueCoder);
+  }
+
+
+  @SuppressWarnings("unchecked")
+  public <T extends PInput> T getInput(PTransform<T, ?> transform) {
+    return (T) currentTransform.getInput();
   }
 
   @SuppressWarnings("unchecked")
-  public <O extends POutput> O getOutput(PTransform<?, O> transform) {
-    return (O) currentTransform.getOutput();
+  public <T extends POutput> T getOutput(PTransform<?, T> transform) {
+    return (T) currentTransform.getOutput();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
new file mode 100644
index 0000000..7ea8c20
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+/**
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext} for
+ * Flink functions.
+ */
+class FlinkAssignContext<InputT, W extends BoundedWindow>
+    extends WindowFn<InputT, W>.AssignContext {
+  private final WindowedValue<InputT> value;
+
+  FlinkAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
+    fn.super();
+    this.value = value;
+  }
+
+  @Override
+  public InputT element() {
+    return value.getValue();
+  }
+
+  @Override
+  public Instant timestamp() {
+    return value.getTimestamp();
+  }
+
+  @Override
+  public Collection<? extends BoundedWindow> windows() {
+    return value.getWindows();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
new file mode 100644
index 0000000..e07e49a
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Collection;
+
+/**
+ * Flink {@link FlatMapFunction} for implementing
+ * {@link org.apache.beam.sdk.transforms.windowing.Window.Bound}.
+ */
+public class FlinkAssignWindows<T, W extends BoundedWindow>
+    implements FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
+
+  private final WindowFn<T, W> windowFn;
+
+  public FlinkAssignWindows(WindowFn<T, W> windowFn) {
+    this.windowFn = windowFn;
+  }
+
+  @Override
+  public void flatMap(
+      WindowedValue<T> input, Collector<WindowedValue<T>> collector) throws Exception {
+    Collection<W> windows = windowFn.assignWindows(new FlinkAssignContext<>(windowFn, input));
+    for (W window: windows) {
+      collector.collect(
+          WindowedValue.of(input.getValue(), input.getTimestamp(), window, input.getPane()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java
deleted file mode 100644
index 8e7cdd7..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TupleTag;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class FlinkCoGroupKeyedListAggregator<K,V1,V2> implements CoGroupFunction<KV<K,V1>, KV<K,V2>, KV<K, CoGbkResult>>{
-
-  private CoGbkResultSchema schema;
-  private TupleTag<?> tupleTag1;
-  private TupleTag<?> tupleTag2;
-
-  public FlinkCoGroupKeyedListAggregator(CoGbkResultSchema schema, TupleTag<?> tupleTag1, TupleTag<?> tupleTag2) {
-    this.schema = schema;
-    this.tupleTag1 = tupleTag1;
-    this.tupleTag2 = tupleTag2;
-  }
-
-  @Override
-  public void coGroup(Iterable<KV<K,V1>> first, Iterable<KV<K,V2>> second, Collector<KV<K, CoGbkResult>> out) throws Exception {
-    K k = null;
-    List<RawUnionValue> result = new ArrayList<>();
-    int index1 = schema.getIndex(tupleTag1);
-    for (KV<K,?> entry : first) {
-      k = entry.getKey();
-      result.add(new RawUnionValue(index1, entry.getValue()));
-    }
-    int index2 = schema.getIndex(tupleTag2);
-    for (KV<K,?> entry : second) {
-      k = entry.getKey();
-      result.add(new RawUnionValue(index2, entry.getValue()));
-    }
-    out.collect(KV.of(k, new CoGbkResult(schema, result)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java
deleted file mode 100644
index e5ac748..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer;
-import org.apache.beam.sdk.coders.Coder;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-
-import java.io.ByteArrayInputStream;
-import java.util.List;
-
-/**
- * This is a hack for transforming a {@link org.apache.beam.sdk.transforms.Create}
- * operation. Flink does not allow {@code null} in it's equivalent operation:
- * {@link org.apache.flink.api.java.ExecutionEnvironment#fromElements(Object[])}. Therefore
- * we use a DataSource with one dummy element and output the elements of the Create operation
- * inside this FlatMap.
- */
-public class FlinkCreateFunction<IN, OUT> implements FlatMapFunction<IN, OUT> {
-
-  private final List<byte[]> elements;
-  private final Coder<OUT> coder;
-
-  public FlinkCreateFunction(List<byte[]> elements, Coder<OUT> coder) {
-    this.elements = elements;
-    this.coder = coder;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void flatMap(IN value, Collector<OUT> out) throws Exception {
-
-    for (byte[] element : elements) {
-      ByteArrayInputStream bai = new ByteArrayInputStream(element);
-      OUT outValue = coder.decode(bai, Coder.Context.OUTER);
-      if (outValue == null) {
-        // TODO Flink doesn't allow null values in records
-        out.collect((OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE);
-      } else {
-        out.collect(outValue);
-      }
-    }
-
-    out.close();
-  }
-}