You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/04/19 13:09:18 UTC

[09/18] beam git commit: [BEAM-1994] Remove Flink examples package

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
deleted file mode 100644
index 3a08088..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.DelegateCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.values.KV;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An unbounded source for testing the unbounded sources framework code.
- *
- * <p>Each split of this sources produces records of the form KV(split_id, i),
- * where i counts up from 0.  Each record has a timestamp of i, and the watermark
- * accurately tracks these timestamps.  The reader will occasionally return false
- * from {@code advance}, in order to simulate a source where not all the data is
- * available immediately.
- */
-public class TestCountingSource
-    extends UnboundedSource<KV<Integer, Integer>, TestCountingSource.CounterMark> {
-  private static final Logger LOG = LoggerFactory.getLogger(TestCountingSource.class);
-
-  private static List<Integer> finalizeTracker;
-  private final int numMessagesPerShard;
-  private final int shardNumber;
-  private final boolean dedup;
-  private final boolean throwOnFirstSnapshot;
-  private final boolean allowSplitting;
-
-  /**
-   * We only allow an exception to be thrown from getCheckpointMark
-   * at most once. This must be static since the entire TestCountingSource
-   * instance may re-serialized when the pipeline recovers and retries.
-   */
-  private static boolean thrown = false;
-
-  public static void setFinalizeTracker(List<Integer> finalizeTracker) {
-    TestCountingSource.finalizeTracker = finalizeTracker;
-  }
-
-  public TestCountingSource(int numMessagesPerShard) {
-    this(numMessagesPerShard, 0, false, false, true);
-  }
-
-  public TestCountingSource withDedup() {
-    return new TestCountingSource(
-        numMessagesPerShard, shardNumber, true, throwOnFirstSnapshot, true);
-  }
-
-  private TestCountingSource withShardNumber(int shardNumber) {
-    return new TestCountingSource(
-        numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true);
-  }
-
-  public TestCountingSource withThrowOnFirstSnapshot(boolean throwOnFirstSnapshot) {
-    return new TestCountingSource(
-        numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true);
-  }
-
-  public TestCountingSource withoutSplitting() {
-    return new TestCountingSource(
-        numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, false);
-  }
-
-  private TestCountingSource(int numMessagesPerShard, int shardNumber, boolean dedup,
-                             boolean throwOnFirstSnapshot, boolean allowSplitting) {
-    this.numMessagesPerShard = numMessagesPerShard;
-    this.shardNumber = shardNumber;
-    this.dedup = dedup;
-    this.throwOnFirstSnapshot = throwOnFirstSnapshot;
-    this.allowSplitting = allowSplitting;
-  }
-
-  public int getShardNumber() {
-    return shardNumber;
-  }
-
-  @Override
-  public List<TestCountingSource> split(
-      int desiredNumSplits, PipelineOptions options) {
-    List<TestCountingSource> splits = new ArrayList<>();
-    int numSplits = allowSplitting ? desiredNumSplits : 1;
-    for (int i = 0; i < numSplits; i++) {
-      splits.add(withShardNumber(i));
-    }
-    return splits;
-  }
-
-  class CounterMark implements UnboundedSource.CheckpointMark {
-    int current;
-
-    public CounterMark(int current) {
-      this.current = current;
-    }
-
-    @Override
-    public void finalizeCheckpoint() {
-      if (finalizeTracker != null) {
-        finalizeTracker.add(current);
-      }
-    }
-  }
-
-  @Override
-  public Coder<CounterMark> getCheckpointMarkCoder() {
-    return DelegateCoder.of(
-        VarIntCoder.of(),
-        new DelegateCoder.CodingFunction<CounterMark, Integer>() {
-          @Override
-          public Integer apply(CounterMark input) {
-            return input.current;
-          }
-        },
-        new DelegateCoder.CodingFunction<Integer, CounterMark>() {
-          @Override
-          public CounterMark apply(Integer input) {
-            return new CounterMark(input);
-          }
-        });
-  }
-
-  @Override
-  public boolean requiresDeduping() {
-    return dedup;
-  }
-
-  /**
-   * Public only so that the checkpoint can be conveyed from {@link #getCheckpointMark()} to
-   * {@link TestCountingSource#createReader(PipelineOptions, CounterMark)} without cast.
-   */
-  public class CountingSourceReader extends UnboundedReader<KV<Integer, Integer>> {
-    private int current;
-
-    public CountingSourceReader(int startingPoint) {
-      this.current = startingPoint;
-    }
-
-    @Override
-    public boolean start() {
-      return advance();
-    }
-
-    @Override
-    public boolean advance() {
-      if (current >= numMessagesPerShard - 1) {
-        return false;
-      }
-      // If testing dedup, occasionally insert a duplicate value;
-      if (current >= 0 && dedup && ThreadLocalRandom.current().nextInt(5) == 0) {
-        return true;
-      }
-      current++;
-      return true;
-    }
-
-    @Override
-    public KV<Integer, Integer> getCurrent() {
-      return KV.of(shardNumber, current);
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() {
-      return new Instant(current);
-    }
-
-    @Override
-    public byte[] getCurrentRecordId() {
-      try {
-        return encodeToByteArray(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()), getCurrent());
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public void close() {}
-
-    @Override
-    public TestCountingSource getCurrentSource() {
-      return TestCountingSource.this;
-    }
-
-    @Override
-    public Instant getWatermark() {
-      // The watermark is a promise about future elements, and the timestamps of elements are
-      // strictly increasing for this source.
-      return new Instant(current + 1);
-    }
-
-    @Override
-    public CounterMark getCheckpointMark() {
-      if (throwOnFirstSnapshot && !thrown) {
-        thrown = true;
-        LOG.error("Throwing exception while checkpointing counter");
-        throw new RuntimeException("failed during checkpoint");
-      }
-      // The checkpoint can assume all records read, including the current, have
-      // been commited.
-      return new CounterMark(current);
-    }
-
-    @Override
-    public long getSplitBacklogBytes() {
-      return 7L;
-    }
-  }
-
-  @Override
-  public CountingSourceReader createReader(
-      PipelineOptions options, @Nullable CounterMark checkpointMark) {
-    if (checkpointMark == null) {
-      LOG.debug("creating reader");
-    } else {
-      LOG.debug("restoring reader from checkpoint with current = {}", checkpointMark.current);
-    }
-    return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : -1);
-  }
-
-  @Override
-  public void validate() {}
-
-  @Override
-  public Coder<KV<Integer, Integer>> getDefaultOutputCoder() {
-    return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
deleted file mode 100644
index 9e6bba8..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.common.base.Joiner;
-import java.io.Serializable;
-import java.util.Arrays;
-import org.apache.beam.runners.flink.FlinkTestPipeline;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-
-/**
- * Session window test.
- */
-public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable {
-  protected String resultPath;
-
-  public TopWikipediaSessionsITCase(){
-  }
-
-  static final String[] EXPECTED_RESULT = new String[] {
-      "user: user1 value:3",
-      "user: user1 value:1",
-      "user: user2 value:4",
-      "user: user2 value:6",
-      "user: user3 value:7",
-      "user: user3 value:2"
-  };
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-  }
-
-  @Override
-  protected void testProgram() throws Exception {
-
-    Pipeline p = FlinkTestPipeline.createForStreaming();
-
-    Long now = (System.currentTimeMillis() + 10000) / 1000;
-
-    PCollection<KV<String, Long>> output =
-      p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set
-          ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", now).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", now).set
-          ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set
-          ("contributor_username", "user1"), new TableRow().set("timestamp", now).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set
-          ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", now)
-          .set("contributor_username", "user3"))))
-
-
-
-      .apply(ParDo.of(new DoFn<TableRow, String>() {
-        @ProcessElement
-        public void processElement(ProcessContext c) throws Exception {
-          TableRow row = c.element();
-          long timestamp = (Integer) row.get("timestamp");
-          String userName = (String) row.get("contributor_username");
-          if (userName != null) {
-            // Sets the timestamp field to be used in windowing.
-            c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
-          }
-        }
-      }))
-
-      .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
-
-      .apply(Count.<String>perElement());
-
-    PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
-      @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        KV<String, Long> el = c.element();
-        String out = "user: " + el.getKey() + " value:" + el.getValue();
-        c.output(out);
-      }
-    }));
-
-    format.apply(TextIO.Write.to(resultPath));
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
deleted file mode 100644
index 90f95d6..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ /dev/null
@@ -1,464 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Test;
-import org.junit.experimental.runners.Enclosed;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.mockito.Matchers;
-
-/**
- * Tests for {@link UnboundedSourceWrapper}.
- */
-@RunWith(Enclosed.class)
-public class UnboundedSourceWrapperTest {
-
-  /**
-   * Parameterized tests.
-   */
-  @RunWith(Parameterized.class)
-  public static class UnboundedSourceWrapperTestWithParams {
-    private final int numTasks;
-    private final int numSplits;
-
-    public UnboundedSourceWrapperTestWithParams(int numTasks, int numSplits) {
-      this.numTasks = numTasks;
-      this.numSplits = numSplits;
-    }
-
-    @Parameterized.Parameters
-    public static Collection<Object[]> data() {
-      /*
-       * Parameters for initializing the tests:
-       * {numTasks, numSplits}
-       * The test currently assumes powers of two for some assertions.
-       */
-      return Arrays.asList(new Object[][]{
-          {1, 1}, {1, 2}, {1, 4},
-          {2, 1}, {2, 2}, {2, 4},
-          {4, 1}, {4, 2}, {4, 4}
-      });
-    }
-
-    /**
-     * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source.
-     * If numSplits > numTasks the source has one source will manage multiple readers.
-     */
-    @Test
-    public void testReaders() throws Exception {
-      final int numElements = 20;
-      final Object checkpointLock = new Object();
-      PipelineOptions options = PipelineOptionsFactory.create();
-
-      // this source will emit exactly NUM_ELEMENTS across all parallel readers,
-      // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
-      // elements later.
-      TestCountingSource source = new TestCountingSource(numElements);
-      UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
-          new UnboundedSourceWrapper<>(options, source, numSplits);
-
-      assertEquals(numSplits, flinkWrapper.getSplitSources().size());
-
-      StreamSource<WindowedValue<
-          KV<Integer, Integer>>,
-          UnboundedSourceWrapper<
-              KV<Integer, Integer>,
-              TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
-
-      setupSourceOperator(sourceOperator, numTasks);
-
-      try {
-        sourceOperator.open();
-        sourceOperator.run(checkpointLock,
-            new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
-              private int count = 0;
-
-              @Override
-              public void emitWatermark(Watermark watermark) {
-              }
-
-              @Override
-              public void emitLatencyMarker(LatencyMarker latencyMarker) {
-              }
-
-              @Override
-              public void collect(
-                  StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
-
-                count++;
-                if (count >= numElements) {
-                  throw new SuccessException();
-                }
-              }
-
-              @Override
-              public void close() {
-
-              }
-            });
-      } catch (SuccessException e) {
-
-        assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
-
-        // success
-        return;
-      }
-      fail("Read terminated without producing expected number of outputs");
-    }
-
-    /**
-     * Verify that snapshot/restore work as expected. We bring up a source and cancel
-     * after seeing a certain number of elements. Then we snapshot that source,
-     * bring up a completely new source that we restore from the snapshot and verify
-     * that we see all expected elements in the end.
-     */
-    @Test
-    public void testRestore() throws Exception {
-      final int numElements = 20;
-      final Object checkpointLock = new Object();
-      PipelineOptions options = PipelineOptionsFactory.create();
-
-      // this source will emit exactly NUM_ELEMENTS across all parallel readers,
-      // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
-      // elements later.
-      TestCountingSource source = new TestCountingSource(numElements);
-      UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
-          new UnboundedSourceWrapper<>(options, source, numSplits);
-
-      assertEquals(numSplits, flinkWrapper.getSplitSources().size());
-
-      StreamSource<
-          WindowedValue<KV<Integer, Integer>>,
-          UnboundedSourceWrapper<
-              KV<Integer, Integer>,
-              TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
-
-
-      OperatorStateStore backend = mock(OperatorStateStore.class);
-
-      TestingListState<KV<UnboundedSource, TestCountingSource.CounterMark>>
-          listState = new TestingListState<>();
-
-      when(backend.getOperatorState(Matchers.any(ListStateDescriptor.class)))
-          .thenReturn(listState);
-
-      StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
-      when(initializationContext.getOperatorStateStore()).thenReturn(backend);
-      when(initializationContext.isRestored()).thenReturn(false, true);
-
-      flinkWrapper.initializeState(initializationContext);
-
-      setupSourceOperator(sourceOperator, numTasks);
-
-      final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
-
-      boolean readFirstBatchOfElements = false;
-
-      try {
-        sourceOperator.open();
-        sourceOperator.run(checkpointLock,
-            new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
-              private int count = 0;
-
-              @Override
-              public void emitWatermark(Watermark watermark) {
-              }
-
-              @Override
-              public void emitLatencyMarker(LatencyMarker latencyMarker) {
-              }
-
-              @Override
-              public void collect(
-                  StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
-
-                emittedElements.add(windowedValueStreamRecord.getValue().getValue());
-                count++;
-                if (count >= numElements / 2) {
-                  throw new SuccessException();
-                }
-              }
-
-              @Override
-              public void close() {
-
-              }
-            });
-      } catch (SuccessException e) {
-        // success
-        readFirstBatchOfElements = true;
-      }
-
-      assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements);
-
-      // draw a snapshot
-      flinkWrapper.snapshotState(new StateSnapshotContextSynchronousImpl(0, 0));
-
-      // test snapshot offsets
-      assertEquals(flinkWrapper.getLocalSplitSources().size(),
-          listState.getList().size());
-      int totalEmit = 0;
-      for (KV<UnboundedSource, TestCountingSource.CounterMark> kv : listState.get()) {
-        totalEmit += kv.getValue().current + 1;
-      }
-      assertEquals(numElements / 2, totalEmit);
-
-      // test that finalizeCheckpoint on CheckpointMark is called
-      final ArrayList<Integer> finalizeList = new ArrayList<>();
-      TestCountingSource.setFinalizeTracker(finalizeList);
-      flinkWrapper.notifyCheckpointComplete(0);
-      assertEquals(flinkWrapper.getLocalSplitSources().size(), finalizeList.size());
-
-      // create a completely new source but restore from the snapshot
-      TestCountingSource restoredSource = new TestCountingSource(numElements);
-      UnboundedSourceWrapper<
-          KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
-          new UnboundedSourceWrapper<>(options, restoredSource, numSplits);
-
-      assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
-
-      StreamSource<
-          WindowedValue<KV<Integer, Integer>>,
-          UnboundedSourceWrapper<
-              KV<Integer, Integer>,
-              TestCountingSource.CounterMark>> restoredSourceOperator =
-          new StreamSource<>(restoredFlinkWrapper);
-
-      setupSourceOperator(restoredSourceOperator, numTasks);
-
-      // restore snapshot
-      restoredFlinkWrapper.initializeState(initializationContext);
-
-      boolean readSecondBatchOfElements = false;
-
-      // run again and verify that we see the other elements
-      try {
-        restoredSourceOperator.open();
-        restoredSourceOperator.run(checkpointLock,
-            new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
-              private int count = 0;
-
-              @Override
-              public void emitWatermark(Watermark watermark) {
-              }
-
-              @Override
-              public void emitLatencyMarker(LatencyMarker latencyMarker) {
-              }
-
-              @Override
-              public void collect(
-                  StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
-                emittedElements.add(windowedValueStreamRecord.getValue().getValue());
-                count++;
-                if (count >= numElements / 2) {
-                  throw new SuccessException();
-                }
-              }
-
-              @Override
-              public void close() {
-
-              }
-            });
-      } catch (SuccessException e) {
-        // success
-        readSecondBatchOfElements = true;
-      }
-
-      assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
-
-      assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
-
-      // verify that we saw all NUM_ELEMENTS elements
-      assertTrue(emittedElements.size() == numElements);
-    }
-
-    @Test
-    public void testNullCheckpoint() throws Exception {
-      final int numElements = 20;
-      PipelineOptions options = PipelineOptionsFactory.create();
-
-      TestCountingSource source = new TestCountingSource(numElements) {
-        @Override
-        public Coder<CounterMark> getCheckpointMarkCoder() {
-          return null;
-        }
-      };
-      UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
-          new UnboundedSourceWrapper<>(options, source, numSplits);
-
-      OperatorStateStore backend = mock(OperatorStateStore.class);
-
-      TestingListState<KV<UnboundedSource, TestCountingSource.CounterMark>>
-          listState = new TestingListState<>();
-
-      when(backend.getOperatorState(Matchers.any(ListStateDescriptor.class)))
-          .thenReturn(listState);
-
-      StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
-      when(initializationContext.getOperatorStateStore()).thenReturn(backend);
-      when(initializationContext.isRestored()).thenReturn(false, true);
-
-      flinkWrapper.initializeState(initializationContext);
-
-      StreamSource sourceOperator = new StreamSource<>(flinkWrapper);
-      setupSourceOperator(sourceOperator, numTasks);
-      sourceOperator.open();
-
-      flinkWrapper.snapshotState(new StateSnapshotContextSynchronousImpl(0, 0));
-
-      assertEquals(0, listState.getList().size());
-
-      UnboundedSourceWrapper<
-          KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
-          new UnboundedSourceWrapper<>(options, new TestCountingSource(numElements),
-              numSplits);
-
-      StreamSource restoredSourceOperator = new StreamSource<>(flinkWrapper);
-      setupSourceOperator(restoredSourceOperator, numTasks);
-      sourceOperator.open();
-
-      restoredFlinkWrapper.initializeState(initializationContext);
-
-      assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
-
-    }
-
-    @SuppressWarnings("unchecked")
-    private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int numSubTasks) {
-      ExecutionConfig executionConfig = new ExecutionConfig();
-      StreamConfig cfg = new StreamConfig(new Configuration());
-
-      cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
-
-      Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0);
-
-      StreamTask<?, ?> mockTask = mock(StreamTask.class);
-      when(mockTask.getName()).thenReturn("Mock Task");
-      when(mockTask.getCheckpointLock()).thenReturn(new Object());
-      when(mockTask.getConfiguration()).thenReturn(cfg);
-      when(mockTask.getEnvironment()).thenReturn(env);
-      when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-      when(mockTask.getAccumulatorMap())
-          .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
-      TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
-      when(mockTask.getProcessingTimeService()).thenReturn(testProcessingTimeService);
-
-      operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
-    }
-
-    /**
-     * A special {@link RuntimeException} that we throw to signal that the test was successful.
-     */
-    private static class SuccessException extends RuntimeException {
-    }
-  }
-
-  /**
-   * Not parameterized tests.
-   */
-  public static class BasicTest {
-
-    /**
-     * Check serialization a {@link UnboundedSourceWrapper}.
-     */
-    @Test
-    public void testSerialization() throws Exception {
-      final int parallelism = 1;
-      final int numElements = 20;
-      PipelineOptions options = PipelineOptionsFactory.create();
-
-      TestCountingSource source = new TestCountingSource(numElements);
-      UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
-          new UnboundedSourceWrapper<>(options, source, parallelism);
-
-      InstantiationUtil.serializeObject(flinkWrapper);
-    }
-
-  }
-
-  private static final class TestingListState<T> implements ListState<T> {
-
-    private final List<T> list = new ArrayList<>();
-
-    @Override
-    public void clear() {
-      list.clear();
-    }
-
-    @Override
-    public Iterable<T> get() throws Exception {
-      return list;
-    }
-
-    @Override
-    public void add(T value) throws Exception {
-      list.add(value);
-    }
-
-    public List<T> getList() {
-      return list;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
deleted file mode 100644
index 08a1e03..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.streaming;

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/resources/log4j-test.properties b/runners/flink/runner/src/test/resources/log4j-test.properties
deleted file mode 100644
index 4c74d85..0000000
--- a/runners/flink/runner/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
new file mode 100644
index 0000000..b745f0b
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+
+/**
+ * {@link DefaultValueFactory} for getting a default value for the parallelism option
+ * on {@link FlinkPipelineOptions}.
+ *
+ * <p>This will return either the default value from {@link GlobalConfiguration} or {@code 1}.
+ * A valid {@link GlobalConfiguration} is only available if the program is executed by the Flink
+ * run scripts.
+ */
+public class DefaultParallelismFactory implements DefaultValueFactory<Integer> {
+  @Override
+  public Integer create(PipelineOptions options) {
+    return GlobalConfiguration.loadConfiguration()
+        .getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
new file mode 100644
index 0000000..854b674
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a
+ * Flink batch job.
+ */
+class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class);
+
+  /**
+   * The necessary context in the case of a batch job.
+   */
+  private final FlinkBatchTranslationContext batchContext;
+
+  private int depth = 0;
+
+  public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) {
+    this.batchContext = new FlinkBatchTranslationContext(env, options);
+  }
+
+  @Override
+  @SuppressWarnings("rawtypes, unchecked")
+  public void translate(Pipeline pipeline) {
+    super.translate(pipeline);
+
+    // terminate dangling DataSets
+    for (DataSet<?> dataSet: batchContext.getDanglingDataSets().values()) {
+      dataSet.output(new DiscardingOutputFormat());
+    }
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Pipeline Visitor Methods
+  // --------------------------------------------------------------------------------------------
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+    LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
+    this.depth++;
+
+    BatchTransformTranslator<?> translator = getTranslator(node);
+
+    if (translator != null) {
+      applyBatchTransform(node.getTransform(), node, translator);
+      LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName());
+      return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+    } else {
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
+    this.depth--;
+    LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
+  }
+
+  @Override
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName());
+
+    // get the transformation corresponding to the node we are
+    // currently visiting and translate it into its Flink alternative.
+    PTransform<?, ?> transform = node.getTransform();
+    BatchTransformTranslator<?> translator =
+        FlinkBatchTransformTranslators.getTranslator(transform);
+    if (translator == null) {
+      LOG.info(node.getTransform().getClass().toString());
+      throw new UnsupportedOperationException("The transform " + transform
+          + " is currently not supported.");
+    }
+    applyBatchTransform(transform, node, translator);
+  }
+
+  private <T extends PTransform<?, ?>> void applyBatchTransform(
+      PTransform<?, ?> transform,
+      TransformHierarchy.Node node,
+      BatchTransformTranslator<?> translator) {
+
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) transform;
+
+    @SuppressWarnings("unchecked")
+    BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
+
+    // create the applied PTransform on the batchContext
+    batchContext.setCurrentTransform(node.toAppliedPTransform());
+    typedTranslator.translateNode(typedTransform, batchContext);
+  }
+
+  /**
+   * A translator of a {@link PTransform}.
+   */
+  public interface BatchTransformTranslator<TransformT extends PTransform> {
+    void translateNode(TransformT transform, FlinkBatchTranslationContext context);
+  }
+
+  /**
+   * Returns a translator for the given node, if it is possible, otherwise null.
+   */
+  private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) {
+    PTransform<?, ?> transform = node.getTransform();
+
+    // Root of the graph is null
+    if (transform == null) {
+      return null;
+    }
+
+    return FlinkBatchTransformTranslators.getTranslator(transform);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
new file mode 100644
index 0000000..ff9521c
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkMergingPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkMergingReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkStatefulDoFnFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.Reshuffle;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.FlatMapOperator;
+import org.apache.flink.api.java.operators.GroupCombineOperator;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.Grouping;
+import org.apache.flink.api.java.operators.MapPartitionOperator;
+import org.apache.flink.api.java.operators.SingleInputUdfOperator;
+import org.apache.flink.util.Collector;
+
+/**
+ * Translators for transforming {@link PTransform PTransforms} to
+ * Flink {@link DataSet DataSets}.
+ */
+class FlinkBatchTransformTranslators {
+
+  // --------------------------------------------------------------------------------------------
+  //  Transform Translator Registry
+  // --------------------------------------------------------------------------------------------
+
+  @SuppressWarnings("rawtypes")
+  private static final Map<
+      Class<? extends PTransform>,
+      FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>();
+
+  static {
+    TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch());
+
+    TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch());
+    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
+    TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch());
+
+    TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslatorBatch());
+
+    TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslatorBatch());
+
+    TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslatorBatch());
+
+    TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch());
+  }
+
+
+  static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(
+      PTransform<?, ?> transform) {
+    return TRANSLATORS.get(transform.getClass());
+  }
+
+  private static class ReadSourceTranslatorBatch<T>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> {
+
+    @Override
+    public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContext context) {
+      String name = transform.getName();
+      BoundedSource<T> source = transform.getSource();
+      PCollection<T> output = context.getOutput(transform);
+
+      TypeInformation<WindowedValue<T>> typeInformation = context.getTypeInfo(output);
+
+      DataSource<WindowedValue<T>> dataSource = new DataSource<>(
+          context.getExecutionEnvironment(),
+          new SourceInputFormat<>(source, context.getPipelineOptions()),
+          typeInformation,
+          name);
+
+      context.setOutputDataSet(output, dataSource);
+    }
+  }
+
+  private static class WindowAssignTranslatorBatch<T>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Assign<T>> {
+
+    @Override
+    public void translateNode(Window.Assign<T> transform, FlinkBatchTranslationContext context) {
+      PValue input = context.getInput(transform);
+
+      TypeInformation<WindowedValue<T>> resultTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input);
+
+      @SuppressWarnings("unchecked")
+      final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy =
+          (WindowingStrategy<T, ? extends BoundedWindow>)
+              context.getOutput(transform).getWindowingStrategy();
+
+      WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+
+      FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
+          new FlinkAssignWindows<>(windowFn);
+
+      DataSet<WindowedValue<T>> resultDataSet = inputDataSet
+          .flatMap(assignWindowsFunction)
+          .name(context.getOutput(transform).getName())
+          .returns(resultTypeInfo);
+
+      context.setOutputDataSet(context.getOutput(transform), resultDataSet);
+    }
+  }
+
+  private static class GroupByKeyTranslatorBatch<K, InputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        GroupByKey<K, InputT> transform,
+        FlinkBatchTranslationContext context) {
+
+      // for now, this is copied from the Combine.PerKey translater. Once we have the new runner API
+      // we can replace GroupByKey by a Combine.PerKey with the Concatenate CombineFn
+
+      DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+
+      Combine.KeyedCombineFn<K, InputT, List<InputT>, List<InputT>> combineFn =
+          new Concatenate<InputT>().asKeyedFn();
+
+      KvCoder<K, InputT> inputCoder =
+          (KvCoder<K, InputT>) context.getInput(transform).getCoder();
+
+      Coder<List<InputT>> accumulatorCoder;
+
+      try {
+        accumulatorCoder =
+            combineFn.getAccumulatorCoder(
+                context.getInput(transform).getPipeline().getCoderRegistry(),
+                inputCoder.getKeyCoder(),
+                inputCoder.getValueCoder());
+      } catch (CannotProvideCoderException e) {
+        throw new RuntimeException(e);
+      }
+
+      WindowingStrategy<?, ?> windowingStrategy =
+          context.getInput(transform).getWindowingStrategy();
+
+      TypeInformation<WindowedValue<KV<K, List<InputT>>>> partialReduceTypeInfo =
+          new CoderTypeInformation<>(
+              WindowedValue.getFullCoder(
+                  KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
+                  windowingStrategy.getWindowFn().windowCoder()));
+
+
+      Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
+          inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
+
+      FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> partialReduceFunction;
+      FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction;
+
+      if (windowingStrategy.getWindowFn().isNonMerging()) {
+        @SuppressWarnings("unchecked")
+        WindowingStrategy<?, BoundedWindow> boundedStrategy =
+            (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
+
+        partialReduceFunction = new FlinkPartialReduceFunction<>(
+            combineFn,
+            boundedStrategy,
+            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+            context.getPipelineOptions());
+
+        reduceFunction = new FlinkReduceFunction<>(
+            combineFn,
+            boundedStrategy,
+            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+            context.getPipelineOptions());
+
+      } else {
+        if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
+          throw new UnsupportedOperationException(
+              "Merging WindowFn with windows other than IntervalWindow are not supported.");
+        }
+
+        @SuppressWarnings("unchecked")
+        WindowingStrategy<?, IntervalWindow> intervalStrategy =
+            (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
+
+        partialReduceFunction = new FlinkMergingPartialReduceFunction<>(
+            combineFn,
+            intervalStrategy,
+            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+            context.getPipelineOptions());
+
+        reduceFunction = new FlinkMergingReduceFunction<>(
+            combineFn,
+            intervalStrategy,
+            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
+            context.getPipelineOptions());
+      }
+
+      // Partially GroupReduce the values into the intermediate format AccumT (combine)
+      GroupCombineOperator<
+          WindowedValue<KV<K, InputT>>,
+          WindowedValue<KV<K, List<InputT>>>> groupCombine =
+          new GroupCombineOperator<>(
+              inputGrouping,
+              partialReduceTypeInfo,
+              partialReduceFunction,
+              "GroupCombine: " + transform.getName());
+
+      Grouping<WindowedValue<KV<K, List<InputT>>>> intermediateGrouping =
+          groupCombine.groupBy(new KvKeySelector<List<InputT>, K>(inputCoder.getKeyCoder()));
+
+      // Fully reduce the values and create output format VO
+      GroupReduceOperator<
+          WindowedValue<KV<K, List<InputT>>>, WindowedValue<KV<K, List<InputT>>>> outputDataSet =
+          new GroupReduceOperator<>(
+              intermediateGrouping, partialReduceTypeInfo, reduceFunction, transform.getName());
+
+      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+
+    }
+
+  }
+
+  private static class ReshuffleTranslatorBatch<K, InputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Reshuffle<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        Reshuffle<K, InputT> transform,
+        FlinkBatchTranslationContext context) {
+
+      DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+
+      context.setOutputDataSet(context.getOutput(transform), inputDataSet.rebalance());
+
+    }
+
+  }
+
+  /**
+   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+   *
+   * <p>For internal use to translate {@link GroupByKey}. For a large {@link PCollection} this
+   * is expected to crash!
+   *
+   * <p>This is copied from the dataflow runner code.
+   *
+   * @param <T> the type of elements to concatenate.
+   */
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+  }
+
+
+  private static class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          Combine.PerKey<K, InputT, OutputT>> {
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void translateNode(
+        Combine.PerKey<K, InputT, OutputT> transform,
+        FlinkBatchTranslationContext context) {
+      DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+
+      CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn =
+          (CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT>) transform.getFn();
+
+      KvCoder<K, InputT> inputCoder =
+          (KvCoder<K, InputT>) context.getInput(transform).getCoder();
+
+      Coder<AccumT> accumulatorCoder;
+
+      try {
+        accumulatorCoder =
+            combineFn.getAccumulatorCoder(
+                context.getInput(transform).getPipeline().getCoderRegistry(),
+                inputCoder.getKeyCoder(),
+                inputCoder.getValueCoder());
+      } catch (CannotProvideCoderException e) {
+        throw new RuntimeException(e);
+      }
+
+      WindowingStrategy<?, ?> windowingStrategy =
+          context.getInput(transform).getWindowingStrategy();
+
+      TypeInformation<WindowedValue<KV<K, AccumT>>> partialReduceTypeInfo =
+          context.getTypeInfo(
+              KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
+              windowingStrategy);
+
+      Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
+          inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
+
+      // construct a map from side input to WindowingStrategy so that
+      // the DoFn runner can map main-input windows to side input windows
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
+      for (PCollectionView<?> sideInput: transform.getSideInputs()) {
+        sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
+      }
+
+      if (windowingStrategy.getWindowFn().isNonMerging()) {
+        WindowingStrategy<?, BoundedWindow> boundedStrategy =
+            (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
+
+        FlinkPartialReduceFunction<K, InputT, AccumT, ?> partialReduceFunction =
+            new FlinkPartialReduceFunction<>(
+                combineFn,
+                boundedStrategy,
+                sideInputStrategies,
+                context.getPipelineOptions());
+
+        FlinkReduceFunction<K, AccumT, OutputT, ?> reduceFunction =
+            new FlinkReduceFunction<>(
+                combineFn,
+                boundedStrategy,
+                sideInputStrategies,
+                context.getPipelineOptions());
+
+        // Partially GroupReduce the values into the intermediate format AccumT (combine)
+        GroupCombineOperator<
+            WindowedValue<KV<K, InputT>>,
+            WindowedValue<KV<K, AccumT>>> groupCombine =
+            new GroupCombineOperator<>(
+                inputGrouping,
+                partialReduceTypeInfo,
+                partialReduceFunction,
+                "GroupCombine: " + transform.getName());
+
+        transformSideInputs(transform.getSideInputs(), groupCombine, context);
+
+        TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
+            context.getTypeInfo(context.getOutput(transform));
+
+        Grouping<WindowedValue<KV<K, AccumT>>> intermediateGrouping =
+            groupCombine.groupBy(new KvKeySelector<AccumT, K>(inputCoder.getKeyCoder()));
+
+        // Fully reduce the values and create output format OutputT
+        GroupReduceOperator<
+            WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> outputDataSet =
+            new GroupReduceOperator<>(
+                intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName());
+
+        transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+
+        context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+
+      } else {
+        if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
+          throw new UnsupportedOperationException(
+              "Merging WindowFn with windows other than IntervalWindow are not supported.");
+        }
+
+        // for merging windows we can't to a pre-shuffle combine step since
+        // elements would not be in their correct windows for side-input access
+
+        WindowingStrategy<?, IntervalWindow> intervalStrategy =
+            (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
+
+        FlinkMergingNonShuffleReduceFunction<K, InputT, AccumT, OutputT, ?> reduceFunction =
+            new FlinkMergingNonShuffleReduceFunction<>(
+                combineFn,
+                intervalStrategy,
+                sideInputStrategies,
+                context.getPipelineOptions());
+
+        TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
+            context.getTypeInfo(context.getOutput(transform));
+
+        Grouping<WindowedValue<KV<K, InputT>>> grouping =
+            inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
+
+        // Fully reduce the values and create output format OutputT
+        GroupReduceOperator<
+            WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> outputDataSet =
+            new GroupReduceOperator<>(
+                grouping, reduceTypeInfo, reduceFunction, transform.getName());
+
+        transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+
+        context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+      }
+
+
+    }
+  }
+
+  private static void rejectSplittable(DoFn<?, ?> doFn) {
+    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+    if (signature.processElement().isSplittable()) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s does not currently support splittable DoFn: %s",
+              FlinkRunner.class.getSimpleName(), doFn));
+    }
+  }
+
+  private static class ParDoTranslatorBatch<InputT, OutputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+      ParDo.MultiOutput<InputT, OutputT>> {
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void translateNode(
+        ParDo.MultiOutput<InputT, OutputT> transform,
+        FlinkBatchTranslationContext context) {
+      DoFn<InputT, OutputT> doFn = transform.getFn();
+      rejectSplittable(doFn);
+      DataSet<WindowedValue<InputT>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+
+      Map<TupleTag<?>, PValue> outputs = context.getOutputs(transform);
+
+      Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
+      // put the main output at index 0, FlinkMultiOutputDoFnFunction  expects this
+      outputMap.put(transform.getMainOutputTag(), 0);
+      int count = 1;
+      for (TupleTag<?> tag : outputs.keySet()) {
+        if (!outputMap.containsKey(tag)) {
+          outputMap.put(tag, count++);
+        }
+      }
+
+      // assume that the windowing strategy is the same for all outputs
+      WindowingStrategy<?, ?> windowingStrategy = null;
+
+      // collect all output Coders and create a UnionCoder for our tagged outputs
+      List<Coder<?>> outputCoders = Lists.newArrayList();
+      for (PValue taggedValue : outputs.values()) {
+        checkState(
+            taggedValue instanceof PCollection,
+            "Within ParDo, got a non-PCollection output %s of type %s",
+            taggedValue,
+            taggedValue.getClass().getSimpleName());
+        PCollection<?> coll = (PCollection<?>) taggedValue;
+        outputCoders.add(coll.getCoder());
+        windowingStrategy = coll.getWindowingStrategy();
+      }
+
+      if (windowingStrategy == null) {
+        throw new IllegalStateException("No outputs defined.");
+      }
+
+      UnionCoder unionCoder = UnionCoder.of(outputCoders);
+
+      TypeInformation<WindowedValue<RawUnionValue>> typeInformation =
+          new CoderTypeInformation<>(
+              WindowedValue.getFullCoder(
+                  unionCoder,
+                  windowingStrategy.getWindowFn().windowCoder()));
+
+      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+
+      // construct a map from side input to WindowingStrategy so that
+      // the DoFn runner can map main-input windows to side input windows
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
+      for (PCollectionView<?> sideInput: sideInputs) {
+        sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
+      }
+
+      SingleInputUdfOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>, ?> outputDataSet;
+      DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+      if (signature.stateDeclarations().size() > 0
+          || signature.timerDeclarations().size() > 0) {
+
+        // Based on the fact that the signature is stateful, DoFnSignatures ensures
+        // that it is also keyed
+        KvCoder<?, InputT> inputCoder =
+            (KvCoder<?, InputT>) context.getInput(transform).getCoder();
+
+        FlinkStatefulDoFnFunction<?, ?, OutputT> doFnWrapper = new FlinkStatefulDoFnFunction<>(
+            (DoFn) doFn, windowingStrategy, sideInputStrategies, context.getPipelineOptions(),
+            outputMap, transform.getMainOutputTag()
+        );
+
+        Grouping<WindowedValue<InputT>> grouping =
+            inputDataSet.groupBy(new KvKeySelector(inputCoder.getKeyCoder()));
+
+        outputDataSet =
+            new GroupReduceOperator(grouping, typeInformation, doFnWrapper, transform.getName());
+
+      } else {
+        FlinkDoFnFunction<InputT, RawUnionValue> doFnWrapper =
+            new FlinkDoFnFunction(
+                doFn,
+                windowingStrategy,
+                sideInputStrategies,
+                context.getPipelineOptions(),
+                outputMap,
+                transform.getMainOutputTag());
+
+        outputDataSet = new MapPartitionOperator<>(
+            inputDataSet, typeInformation,
+            doFnWrapper, transform.getName());
+
+      }
+
+      transformSideInputs(sideInputs, outputDataSet, context);
+
+      for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
+        pruneOutput(
+            outputDataSet,
+            context,
+            outputMap.get(output.getKey()),
+            (PCollection) output.getValue());
+      }
+
+    }
+
+    private <T> void pruneOutput(
+        DataSet<WindowedValue<RawUnionValue>> taggedDataSet,
+        FlinkBatchTranslationContext context,
+        int integerTag,
+        PCollection<T> collection) {
+      TypeInformation<WindowedValue<T>> outputType = context.getTypeInfo(collection);
+
+      FlinkMultiOutputPruningFunction<T> pruningFunction =
+          new FlinkMultiOutputPruningFunction<>(integerTag);
+
+      FlatMapOperator<WindowedValue<RawUnionValue>, WindowedValue<T>> pruningOperator =
+          new FlatMapOperator<>(
+              taggedDataSet,
+              outputType,
+              pruningFunction,
+              collection.getName());
+
+      context.setOutputDataSet(collection, pruningOperator);
+    }
+  }
+
+  private static class FlattenPCollectionTranslatorBatch<T>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+      Flatten.PCollections<T>> {
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void translateNode(
+        Flatten.PCollections<T> transform,
+        FlinkBatchTranslationContext context) {
+
+      Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform);
+      DataSet<WindowedValue<T>> result = null;
+
+      if (allInputs.isEmpty()) {
+
+        // create an empty dummy source to satisfy downstream operations
+        // we cannot create an empty source in Flink, therefore we have to
+        // add the flatMap that simply never forwards the single element
+        DataSource<String> dummySource =
+            context.getExecutionEnvironment().fromElements("dummy");
+        result = dummySource.flatMap(new FlatMapFunction<String, WindowedValue<T>>() {
+          @Override
+          public void flatMap(String s, Collector<WindowedValue<T>> collector) throws Exception {
+            // never return anything
+          }
+        }).returns(
+            new CoderTypeInformation<>(
+                WindowedValue.getFullCoder(
+                    (Coder<T>) VoidCoder.of(),
+                    GlobalWindow.Coder.INSTANCE)));
+      } else {
+        for (PValue taggedPc : allInputs.values()) {
+          checkArgument(
+              taggedPc instanceof PCollection,
+              "Got non-PCollection input to flatten: %s of type %s",
+              taggedPc,
+              taggedPc.getClass().getSimpleName());
+          PCollection<T> collection = (PCollection<T>) taggedPc;
+          DataSet<WindowedValue<T>> current = context.getInputDataSet(collection);
+          if (result == null) {
+            result = current;
+          } else {
+            result = result.union(current);
+          }
+        }
+      }
+
+      // insert a dummy filter, there seems to be a bug in Flink
+      // that produces duplicate elements after the union in some cases
+      // if we don't
+      result = result.filter(new FilterFunction<WindowedValue<T>>() {
+        @Override
+        public boolean filter(WindowedValue<T> tWindowedValue) throws Exception {
+          return true;
+        }
+      }).name("UnionFixFilter");
+      context.setOutputDataSet(context.getOutput(transform), result);
+    }
+  }
+
+  private static class CreatePCollectionViewTranslatorBatch<ElemT, ViewT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          View.CreatePCollectionView<ElemT, ViewT>> {
+
+    @Override
+    public void translateNode(
+        View.CreatePCollectionView<ElemT, ViewT> transform,
+        FlinkBatchTranslationContext context) {
+      DataSet<WindowedValue<ElemT>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+
+      PCollectionView<ViewT> input = transform.getView();
+
+      context.setSideInputDataSet(input, inputDataSet);
+    }
+  }
+
+  private static void transformSideInputs(
+      List<PCollectionView<?>> sideInputs,
+      SingleInputUdfOperator<?, ?, ?> outputDataSet,
+      FlinkBatchTranslationContext context) {
+    // get corresponding Flink broadcast DataSets
+    for (PCollectionView<?> input : sideInputs) {
+      DataSet<?> broadcastSet = context.getSideInputDataSet(input);
+      outputDataSet.withBroadcastSet(broadcastSet, input.getTagInternal().getId());
+    }
+  }
+
+  private FlinkBatchTransformTranslators() {}
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
new file mode 100644
index 0000000..98dd0fb
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.collect.Iterables;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * Helper for {@link FlinkBatchPipelineTranslator} and translators in
+ * {@link FlinkBatchTransformTranslators}.
+ */
+class FlinkBatchTranslationContext {
+
+  private final Map<PValue, DataSet<?>> dataSets;
+  private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets;
+
+  /**
+   * For keeping track about which DataSets don't have a successor. We
+   * need to terminate these with a discarding sink because the Beam
+   * model allows dangling operations.
+   */
+  private final Map<PValue, DataSet<?>> danglingDataSets;
+
+  private final ExecutionEnvironment env;
+  private final PipelineOptions options;
+
+  private AppliedPTransform<?, ?, ?> currentTransform;
+
+  // ------------------------------------------------------------------------
+
+  public FlinkBatchTranslationContext(ExecutionEnvironment env, PipelineOptions options) {
+    this.env = env;
+    this.options = options;
+    this.dataSets = new HashMap<>();
+    this.broadcastDataSets = new HashMap<>();
+
+    this.danglingDataSets = new HashMap<>();
+  }
+
+  // ------------------------------------------------------------------------
+
+  public Map<PValue, DataSet<?>> getDanglingDataSets() {
+    return danglingDataSets;
+  }
+
+  public ExecutionEnvironment getExecutionEnvironment() {
+    return env;
+  }
+
+  public PipelineOptions getPipelineOptions() {
+    return options;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> DataSet<WindowedValue<T>> getInputDataSet(PValue value) {
+    // assume that the DataSet is used as an input if retrieved here
+    danglingDataSets.remove(value);
+    return (DataSet<WindowedValue<T>>) dataSets.get(value);
+  }
+
+  public <T> void setOutputDataSet(PValue value, DataSet<WindowedValue<T>> set) {
+    if (!dataSets.containsKey(value)) {
+      dataSets.put(value, set);
+      danglingDataSets.put(value, set);
+    }
+  }
+
+  /**
+   * Sets the AppliedPTransform which carries input/output.
+   * @param currentTransform
+   */
+  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
+    this.currentTransform = currentTransform;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) {
+    return (DataSet<T>) broadcastDataSets.get(value);
+  }
+
+  public <ViewT, ElemT> void setSideInputDataSet(
+      PCollectionView<ViewT> value,
+      DataSet<WindowedValue<ElemT>> set) {
+    if (!broadcastDataSets.containsKey(value)) {
+      broadcastDataSets.put(value, set);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) {
+    return getTypeInfo(collection.getCoder(), collection.getWindowingStrategy());
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> TypeInformation<WindowedValue<T>> getTypeInfo(
+      Coder<T> coder,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
+        WindowedValue.getFullCoder(
+            coder,
+            windowingStrategy.getWindowFn().windowCoder());
+
+    return new CoderTypeInformation<>(windowedValueCoder);
+  }
+
+  Map<TupleTag<?>, PValue> getInputs(PTransform<?, ?> transform) {
+    return currentTransform.getInputs();
+  }
+
+  @SuppressWarnings("unchecked")
+  <T extends PValue> T getInput(PTransform<T, ?> transform) {
+    return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
+  }
+
+  Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) {
+    return currentTransform.getOutputs();
+  }
+
+  @SuppressWarnings("unchecked")
+  <T extends PValue> T getOutput(PTransform<?, T> transform) {
+    return (T) Iterables.getOnlyElement(currentTransform.getOutputs().values());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
new file mode 100644
index 0000000..bf4395f
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.io.IOException;
+
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.joda.time.Duration;
+
+
+/**
+ * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} with Flink.
+ * In detached execution, results and job execution are currently unavailable.
+ */
+public class FlinkDetachedRunnerResult implements PipelineResult {
+
+  FlinkDetachedRunnerResult() {}
+
+  @Override
+  public State getState() {
+    return State.UNKNOWN;
+  }
+
+  @Override
+  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
+      throws AggregatorRetrievalException {
+    throw new AggregatorRetrievalException(
+        "Accumulators can't be retrieved for detached Job executions.",
+        new UnsupportedOperationException());
+  }
+
+  @Override
+  public MetricResults metrics() {
+    throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
+  }
+
+  @Override
+  public State cancel() throws IOException {
+    throw new UnsupportedOperationException("Cancelling is not yet supported.");
+  }
+
+  @Override
+  public State waitUntilFinish() {
+    return State.UNKNOWN;
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+    return State.UNKNOWN;
+  }
+
+  @Override
+  public String toString() {
+    return "FlinkDetachedRunnerResult{}";
+  }
+}