You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/04/19 13:09:25 UTC

[16/18] beam git commit: [BEAM-1994] Remove Flink examples package

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
deleted file mode 100644
index 18343ef..0000000
--- a/runners/flink/runner/pom.xml
+++ /dev/null
@@ -1,330 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.beam</groupId>
-    <artifactId>beam-runners-flink-parent</artifactId>
-    <version>0.7.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>beam-runners-flink_2.10</artifactId>
-
-  <name>Apache Beam :: Runners :: Flink :: Core</name>
-
-  <packaging>jar</packaging>
-
-  <profiles>
-    <profile>
-      <id>local-validates-runner-tests</id>
-      <activation><activeByDefault>false</activeByDefault></activation>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-surefire-plugin</artifactId>
-            <executions>
-
-              <!-- This configures the inherited validates-runner-tests
-                   execution to execute with a local Flink instance. -->
-              <execution>
-                <id>validates-runner-tests</id>
-                <phase>integration-test</phase>
-                <goals>
-                  <goal>test</goal>
-                </goals>
-                <configuration>
-                  <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
-                  <excludedGroups>
-                    org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
-                    org.apache.beam.sdk.testing.UsesSplittableParDo,
-                    org.apache.beam.sdk.testing.UsesAttemptedMetrics,
-                    org.apache.beam.sdk.testing.UsesCommittedMetrics,
-                    org.apache.beam.sdk.testing.UsesTestStream
-                  </excludedGroups>
-                  <parallel>none</parallel>
-                  <failIfNoTests>true</failIfNoTests>
-                  <dependenciesToScan>
-                    <dependency>org.apache.beam:beam-sdks-java-core</dependency>
-                  </dependenciesToScan>
-                  <systemPropertyVariables>
-                    <beamTestPipelineOptions>
-                      [
-                      "--runner=TestFlinkRunner",
-                      "--streaming=false"
-                      ]
-                    </beamTestPipelineOptions>
-                  </systemPropertyVariables>
-                </configuration>
-              </execution>
-
-              <!-- This second execution runs the tests in streaming mode -->
-              <execution>
-                <id>streaming-validates-runner-tests</id>
-                <phase>integration-test</phase>
-                <goals>
-                  <goal>test</goal>
-                </goals>
-                <configuration>
-                  <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
-                  <excludedGroups>
-                    org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
-                    org.apache.beam.sdk.testing.UsesSetState,
-                    org.apache.beam.sdk.testing.UsesMapState,
-                    org.apache.beam.sdk.testing.UsesAttemptedMetrics,
-                    org.apache.beam.sdk.testing.UsesCommittedMetrics,
-                    org.apache.beam.sdk.testing.UsesTestStream,
-                    org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs
-                  </excludedGroups>
-                  <parallel>none</parallel>
-                  <failIfNoTests>true</failIfNoTests>
-                  <dependenciesToScan>
-                    <dependency>org.apache.beam:beam-sdks-java-core</dependency>
-                  </dependenciesToScan>
-                  <systemPropertyVariables>
-                    <beamTestPipelineOptions>
-                      [
-                        "--runner=TestFlinkRunner",
-                        "--streaming=true"
-                      ]
-                    </beamTestPipelineOptions>
-                  </systemPropertyVariables>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-  </profiles>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-jar-plugin</artifactId>
-      </plugin>
-
-      <!-- Integration Tests -->
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-failsafe-plugin</artifactId>
-      </plugin>
-
-      <!-- Unit Tests -->
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-      </plugin>
-    </plugins>
-  </build>
-
-  <dependencies>
-    <!-- Flink dependencies -->
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-java</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-clients_2.10</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-runtime_2.10</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-
-    <!-- For testing -->
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-core</artifactId>
-      <version>${flink.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-runtime_2.10</artifactId>
-      <version>${flink.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-
-    <!-- Beam -->
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-core</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-jdk14</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-core-java</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-jdk14</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-core-construction-java</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-jdk14</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-annotations</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.code.findbugs</groupId>
-      <artifactId>jsr305</artifactId>
-    </dependency>
-
-    <!--
-    Force an upgrade on the version of Apache Commons from Flink to support DEFLATE compression.
-    -->
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-compress</artifactId>
-      <scope>runtime</scope>
-    </dependency>
-
-    <!-- Test scoped -->
-    <dependency>
-      <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-bigquery</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.hamcrest</groupId>
-      <artifactId>hamcrest-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <!-- Depend on test jar to scan for ValidatesRunner tests -->
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-core</artifactId>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-jdk14</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-streaming-java_2.10</artifactId>
-      <version>${flink.version}</version>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-test-utils_2.10</artifactId>
-      <version>${flink.version}</version>
-      <scope>test</scope>
-        <exclusions>
-            <exclusion>
-                <artifactId>apacheds-jdbm1</artifactId>
-                <groupId>org.apache.directory.jdbm</groupId>
-            </exclusion>
-        </exclusions>
-    </dependency>
-
-    <!-- Optional Pipeline Registration -->
-    <dependency>
-      <groupId>com.google.auto.service</groupId>
-      <artifactId>auto-service</artifactId>
-      <optional>true</optional>
-    </dependency>
-
-    <!-- transitive test dependencies from beam-sdk-java-core -->
-    <dependency>
-      <groupId>com.fasterxml.jackson.dataformat</groupId>
-      <artifactId>jackson-dataformat-yaml</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-common-fn-api</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
deleted file mode 100644
index b745f0b..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-
-/**
- * {@link DefaultValueFactory} for getting a default value for the parallelism option
- * on {@link FlinkPipelineOptions}.
- *
- * <p>This will return either the default value from {@link GlobalConfiguration} or {@code 1}.
- * A valid {@link GlobalConfiguration} is only available if the program is executed by the Flink
- * run scripts.
- */
-public class DefaultParallelismFactory implements DefaultValueFactory<Integer> {
-  @Override
-  public Integer create(PipelineOptions options) {
-    return GlobalConfiguration.loadConfiguration()
-        .getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
deleted file mode 100644
index 854b674..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a
- * Flink batch job.
- */
-class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
-
-  private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class);
-
-  /**
-   * The necessary context in the case of a batch job.
-   */
-  private final FlinkBatchTranslationContext batchContext;
-
-  private int depth = 0;
-
-  public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) {
-    this.batchContext = new FlinkBatchTranslationContext(env, options);
-  }
-
-  @Override
-  @SuppressWarnings("rawtypes, unchecked")
-  public void translate(Pipeline pipeline) {
-    super.translate(pipeline);
-
-    // terminate dangling DataSets
-    for (DataSet<?> dataSet: batchContext.getDanglingDataSets().values()) {
-      dataSet.output(new DiscardingOutputFormat());
-    }
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Pipeline Visitor Methods
-  // --------------------------------------------------------------------------------------------
-
-  @Override
-  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-    LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
-    this.depth++;
-
-    BatchTransformTranslator<?> translator = getTranslator(node);
-
-    if (translator != null) {
-      applyBatchTransform(node.getTransform(), node, translator);
-      LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName());
-      return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
-    } else {
-      return CompositeBehavior.ENTER_TRANSFORM;
-    }
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformHierarchy.Node node) {
-    this.depth--;
-    LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
-  }
-
-  @Override
-  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName());
-
-    // get the transformation corresponding to the node we are
-    // currently visiting and translate it into its Flink alternative.
-    PTransform<?, ?> transform = node.getTransform();
-    BatchTransformTranslator<?> translator =
-        FlinkBatchTransformTranslators.getTranslator(transform);
-    if (translator == null) {
-      LOG.info(node.getTransform().getClass().toString());
-      throw new UnsupportedOperationException("The transform " + transform
-          + " is currently not supported.");
-    }
-    applyBatchTransform(transform, node, translator);
-  }
-
-  private <T extends PTransform<?, ?>> void applyBatchTransform(
-      PTransform<?, ?> transform,
-      TransformHierarchy.Node node,
-      BatchTransformTranslator<?> translator) {
-
-    @SuppressWarnings("unchecked")
-    T typedTransform = (T) transform;
-
-    @SuppressWarnings("unchecked")
-    BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
-
-    // create the applied PTransform on the batchContext
-    batchContext.setCurrentTransform(node.toAppliedPTransform());
-    typedTranslator.translateNode(typedTransform, batchContext);
-  }
-
-  /**
-   * A translator of a {@link PTransform}.
-   */
-  public interface BatchTransformTranslator<TransformT extends PTransform> {
-    void translateNode(TransformT transform, FlinkBatchTranslationContext context);
-  }
-
-  /**
-   * Returns a translator for the given node, if it is possible, otherwise null.
-   */
-  private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) {
-    PTransform<?, ?> transform = node.getTransform();
-
-    // Root of the graph is null
-    if (transform == null) {
-      return null;
-    }
-
-    return FlinkBatchTransformTranslators.getTranslator(transform);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
deleted file mode 100644
index ff9521c..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ /dev/null
@@ -1,723 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
-import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkMergingPartialReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkMergingReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkStatefulDoFnFunction;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.types.KvKeySelector;
-import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.transforms.join.UnionCoder;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.Reshuffle;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.operators.FlatMapOperator;
-import org.apache.flink.api.java.operators.GroupCombineOperator;
-import org.apache.flink.api.java.operators.GroupReduceOperator;
-import org.apache.flink.api.java.operators.Grouping;
-import org.apache.flink.api.java.operators.MapPartitionOperator;
-import org.apache.flink.api.java.operators.SingleInputUdfOperator;
-import org.apache.flink.util.Collector;
-
-/**
- * Translators for transforming {@link PTransform PTransforms} to
- * Flink {@link DataSet DataSets}.
- */
-class FlinkBatchTransformTranslators {
-
-  // --------------------------------------------------------------------------------------------
-  //  Transform Translator Registry
-  // --------------------------------------------------------------------------------------------
-
-  @SuppressWarnings("rawtypes")
-  private static final Map<
-      Class<? extends PTransform>,
-      FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>();
-
-  static {
-    TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch());
-
-    TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch());
-    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
-    TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch());
-
-    TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslatorBatch());
-
-    TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslatorBatch());
-
-    TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslatorBatch());
-
-    TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch());
-  }
-
-
-  static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(
-      PTransform<?, ?> transform) {
-    return TRANSLATORS.get(transform.getClass());
-  }
-
-  private static class ReadSourceTranslatorBatch<T>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> {
-
-    @Override
-    public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContext context) {
-      String name = transform.getName();
-      BoundedSource<T> source = transform.getSource();
-      PCollection<T> output = context.getOutput(transform);
-
-      TypeInformation<WindowedValue<T>> typeInformation = context.getTypeInfo(output);
-
-      DataSource<WindowedValue<T>> dataSource = new DataSource<>(
-          context.getExecutionEnvironment(),
-          new SourceInputFormat<>(source, context.getPipelineOptions()),
-          typeInformation,
-          name);
-
-      context.setOutputDataSet(output, dataSource);
-    }
-  }
-
-  private static class WindowAssignTranslatorBatch<T>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Assign<T>> {
-
-    @Override
-    public void translateNode(Window.Assign<T> transform, FlinkBatchTranslationContext context) {
-      PValue input = context.getInput(transform);
-
-      TypeInformation<WindowedValue<T>> resultTypeInfo =
-          context.getTypeInfo(context.getOutput(transform));
-
-      DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input);
-
-      @SuppressWarnings("unchecked")
-      final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy =
-          (WindowingStrategy<T, ? extends BoundedWindow>)
-              context.getOutput(transform).getWindowingStrategy();
-
-      WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
-
-      FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
-          new FlinkAssignWindows<>(windowFn);
-
-      DataSet<WindowedValue<T>> resultDataSet = inputDataSet
-          .flatMap(assignWindowsFunction)
-          .name(context.getOutput(transform).getName())
-          .returns(resultTypeInfo);
-
-      context.setOutputDataSet(context.getOutput(transform), resultDataSet);
-    }
-  }
-
-  private static class GroupByKeyTranslatorBatch<K, InputT>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, InputT>> {
-
-    @Override
-    public void translateNode(
-        GroupByKey<K, InputT> transform,
-        FlinkBatchTranslationContext context) {
-
-      // for now, this is copied from the Combine.PerKey translater. Once we have the new runner API
-      // we can replace GroupByKey by a Combine.PerKey with the Concatenate CombineFn
-
-      DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
-          context.getInputDataSet(context.getInput(transform));
-
-      Combine.KeyedCombineFn<K, InputT, List<InputT>, List<InputT>> combineFn =
-          new Concatenate<InputT>().asKeyedFn();
-
-      KvCoder<K, InputT> inputCoder =
-          (KvCoder<K, InputT>) context.getInput(transform).getCoder();
-
-      Coder<List<InputT>> accumulatorCoder;
-
-      try {
-        accumulatorCoder =
-            combineFn.getAccumulatorCoder(
-                context.getInput(transform).getPipeline().getCoderRegistry(),
-                inputCoder.getKeyCoder(),
-                inputCoder.getValueCoder());
-      } catch (CannotProvideCoderException e) {
-        throw new RuntimeException(e);
-      }
-
-      WindowingStrategy<?, ?> windowingStrategy =
-          context.getInput(transform).getWindowingStrategy();
-
-      TypeInformation<WindowedValue<KV<K, List<InputT>>>> partialReduceTypeInfo =
-          new CoderTypeInformation<>(
-              WindowedValue.getFullCoder(
-                  KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
-                  windowingStrategy.getWindowFn().windowCoder()));
-
-
-      Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
-          inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
-
-      FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> partialReduceFunction;
-      FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction;
-
-      if (windowingStrategy.getWindowFn().isNonMerging()) {
-        @SuppressWarnings("unchecked")
-        WindowingStrategy<?, BoundedWindow> boundedStrategy =
-            (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
-
-        partialReduceFunction = new FlinkPartialReduceFunction<>(
-            combineFn,
-            boundedStrategy,
-            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
-            context.getPipelineOptions());
-
-        reduceFunction = new FlinkReduceFunction<>(
-            combineFn,
-            boundedStrategy,
-            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
-            context.getPipelineOptions());
-
-      } else {
-        if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
-          throw new UnsupportedOperationException(
-              "Merging WindowFn with windows other than IntervalWindow are not supported.");
-        }
-
-        @SuppressWarnings("unchecked")
-        WindowingStrategy<?, IntervalWindow> intervalStrategy =
-            (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
-
-        partialReduceFunction = new FlinkMergingPartialReduceFunction<>(
-            combineFn,
-            intervalStrategy,
-            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
-            context.getPipelineOptions());
-
-        reduceFunction = new FlinkMergingReduceFunction<>(
-            combineFn,
-            intervalStrategy,
-            Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
-            context.getPipelineOptions());
-      }
-
-      // Partially GroupReduce the values into the intermediate format AccumT (combine)
-      GroupCombineOperator<
-          WindowedValue<KV<K, InputT>>,
-          WindowedValue<KV<K, List<InputT>>>> groupCombine =
-          new GroupCombineOperator<>(
-              inputGrouping,
-              partialReduceTypeInfo,
-              partialReduceFunction,
-              "GroupCombine: " + transform.getName());
-
-      Grouping<WindowedValue<KV<K, List<InputT>>>> intermediateGrouping =
-          groupCombine.groupBy(new KvKeySelector<List<InputT>, K>(inputCoder.getKeyCoder()));
-
-      // Fully reduce the values and create output format VO
-      GroupReduceOperator<
-          WindowedValue<KV<K, List<InputT>>>, WindowedValue<KV<K, List<InputT>>>> outputDataSet =
-          new GroupReduceOperator<>(
-              intermediateGrouping, partialReduceTypeInfo, reduceFunction, transform.getName());
-
-      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-
-    }
-
-  }
-
-  private static class ReshuffleTranslatorBatch<K, InputT>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Reshuffle<K, InputT>> {
-
-    @Override
-    public void translateNode(
-        Reshuffle<K, InputT> transform,
-        FlinkBatchTranslationContext context) {
-
-      DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
-          context.getInputDataSet(context.getInput(transform));
-
-      context.setOutputDataSet(context.getOutput(transform), inputDataSet.rebalance());
-
-    }
-
-  }
-
-  /**
-   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
-   *
-   * <p>For internal use to translate {@link GroupByKey}. For a large {@link PCollection} this
-   * is expected to crash!
-   *
-   * <p>This is copied from the dataflow runner code.
-   *
-   * @param <T> the type of elements to concatenate.
-   */
-  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
-    @Override
-    public List<T> createAccumulator() {
-      return new ArrayList<>();
-    }
-
-    @Override
-    public List<T> addInput(List<T> accumulator, T input) {
-      accumulator.add(input);
-      return accumulator;
-    }
-
-    @Override
-    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
-      List<T> result = createAccumulator();
-      for (List<T> accumulator : accumulators) {
-        result.addAll(accumulator);
-      }
-      return result;
-    }
-
-    @Override
-    public List<T> extractOutput(List<T> accumulator) {
-      return accumulator;
-    }
-
-    @Override
-    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-
-    @Override
-    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-  }
-
-
-  private static class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
-          Combine.PerKey<K, InputT, OutputT>> {
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void translateNode(
-        Combine.PerKey<K, InputT, OutputT> transform,
-        FlinkBatchTranslationContext context) {
-      DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
-          context.getInputDataSet(context.getInput(transform));
-
-      CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn =
-          (CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT>) transform.getFn();
-
-      KvCoder<K, InputT> inputCoder =
-          (KvCoder<K, InputT>) context.getInput(transform).getCoder();
-
-      Coder<AccumT> accumulatorCoder;
-
-      try {
-        accumulatorCoder =
-            combineFn.getAccumulatorCoder(
-                context.getInput(transform).getPipeline().getCoderRegistry(),
-                inputCoder.getKeyCoder(),
-                inputCoder.getValueCoder());
-      } catch (CannotProvideCoderException e) {
-        throw new RuntimeException(e);
-      }
-
-      WindowingStrategy<?, ?> windowingStrategy =
-          context.getInput(transform).getWindowingStrategy();
-
-      TypeInformation<WindowedValue<KV<K, AccumT>>> partialReduceTypeInfo =
-          context.getTypeInfo(
-              KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
-              windowingStrategy);
-
-      Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
-          inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
-
-      // construct a map from side input to WindowingStrategy so that
-      // the DoFn runner can map main-input windows to side input windows
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
-      for (PCollectionView<?> sideInput: transform.getSideInputs()) {
-        sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
-      }
-
-      if (windowingStrategy.getWindowFn().isNonMerging()) {
-        WindowingStrategy<?, BoundedWindow> boundedStrategy =
-            (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
-
-        FlinkPartialReduceFunction<K, InputT, AccumT, ?> partialReduceFunction =
-            new FlinkPartialReduceFunction<>(
-                combineFn,
-                boundedStrategy,
-                sideInputStrategies,
-                context.getPipelineOptions());
-
-        FlinkReduceFunction<K, AccumT, OutputT, ?> reduceFunction =
-            new FlinkReduceFunction<>(
-                combineFn,
-                boundedStrategy,
-                sideInputStrategies,
-                context.getPipelineOptions());
-
-        // Partially GroupReduce the values into the intermediate format AccumT (combine)
-        GroupCombineOperator<
-            WindowedValue<KV<K, InputT>>,
-            WindowedValue<KV<K, AccumT>>> groupCombine =
-            new GroupCombineOperator<>(
-                inputGrouping,
-                partialReduceTypeInfo,
-                partialReduceFunction,
-                "GroupCombine: " + transform.getName());
-
-        transformSideInputs(transform.getSideInputs(), groupCombine, context);
-
-        TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
-            context.getTypeInfo(context.getOutput(transform));
-
-        Grouping<WindowedValue<KV<K, AccumT>>> intermediateGrouping =
-            groupCombine.groupBy(new KvKeySelector<AccumT, K>(inputCoder.getKeyCoder()));
-
-        // Fully reduce the values and create output format OutputT
-        GroupReduceOperator<
-            WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> outputDataSet =
-            new GroupReduceOperator<>(
-                intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName());
-
-        transformSideInputs(transform.getSideInputs(), outputDataSet, context);
-
-        context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-
-      } else {
-        if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) {
-          throw new UnsupportedOperationException(
-              "Merging WindowFn with windows other than IntervalWindow are not supported.");
-        }
-
-        // for merging windows we can't to a pre-shuffle combine step since
-        // elements would not be in their correct windows for side-input access
-
-        WindowingStrategy<?, IntervalWindow> intervalStrategy =
-            (WindowingStrategy<?, IntervalWindow>) windowingStrategy;
-
-        FlinkMergingNonShuffleReduceFunction<K, InputT, AccumT, OutputT, ?> reduceFunction =
-            new FlinkMergingNonShuffleReduceFunction<>(
-                combineFn,
-                intervalStrategy,
-                sideInputStrategies,
-                context.getPipelineOptions());
-
-        TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo =
-            context.getTypeInfo(context.getOutput(transform));
-
-        Grouping<WindowedValue<KV<K, InputT>>> grouping =
-            inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
-
-        // Fully reduce the values and create output format OutputT
-        GroupReduceOperator<
-            WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> outputDataSet =
-            new GroupReduceOperator<>(
-                grouping, reduceTypeInfo, reduceFunction, transform.getName());
-
-        transformSideInputs(transform.getSideInputs(), outputDataSet, context);
-
-        context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-      }
-
-
-    }
-  }
-
-  private static void rejectSplittable(DoFn<?, ?> doFn) {
-    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
-    if (signature.processElement().isSplittable()) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "%s does not currently support splittable DoFn: %s",
-              FlinkRunner.class.getSimpleName(), doFn));
-    }
-  }
-
-  private static class ParDoTranslatorBatch<InputT, OutputT>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
-      ParDo.MultiOutput<InputT, OutputT>> {
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void translateNode(
-        ParDo.MultiOutput<InputT, OutputT> transform,
-        FlinkBatchTranslationContext context) {
-      DoFn<InputT, OutputT> doFn = transform.getFn();
-      rejectSplittable(doFn);
-      DataSet<WindowedValue<InputT>> inputDataSet =
-          context.getInputDataSet(context.getInput(transform));
-
-      Map<TupleTag<?>, PValue> outputs = context.getOutputs(transform);
-
-      Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
-      // put the main output at index 0, FlinkMultiOutputDoFnFunction  expects this
-      outputMap.put(transform.getMainOutputTag(), 0);
-      int count = 1;
-      for (TupleTag<?> tag : outputs.keySet()) {
-        if (!outputMap.containsKey(tag)) {
-          outputMap.put(tag, count++);
-        }
-      }
-
-      // assume that the windowing strategy is the same for all outputs
-      WindowingStrategy<?, ?> windowingStrategy = null;
-
-      // collect all output Coders and create a UnionCoder for our tagged outputs
-      List<Coder<?>> outputCoders = Lists.newArrayList();
-      for (PValue taggedValue : outputs.values()) {
-        checkState(
-            taggedValue instanceof PCollection,
-            "Within ParDo, got a non-PCollection output %s of type %s",
-            taggedValue,
-            taggedValue.getClass().getSimpleName());
-        PCollection<?> coll = (PCollection<?>) taggedValue;
-        outputCoders.add(coll.getCoder());
-        windowingStrategy = coll.getWindowingStrategy();
-      }
-
-      if (windowingStrategy == null) {
-        throw new IllegalStateException("No outputs defined.");
-      }
-
-      UnionCoder unionCoder = UnionCoder.of(outputCoders);
-
-      TypeInformation<WindowedValue<RawUnionValue>> typeInformation =
-          new CoderTypeInformation<>(
-              WindowedValue.getFullCoder(
-                  unionCoder,
-                  windowingStrategy.getWindowFn().windowCoder()));
-
-      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
-
-      // construct a map from side input to WindowingStrategy so that
-      // the DoFn runner can map main-input windows to side input windows
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
-      for (PCollectionView<?> sideInput: sideInputs) {
-        sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
-      }
-
-      SingleInputUdfOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>, ?> outputDataSet;
-      DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
-      if (signature.stateDeclarations().size() > 0
-          || signature.timerDeclarations().size() > 0) {
-
-        // Based on the fact that the signature is stateful, DoFnSignatures ensures
-        // that it is also keyed
-        KvCoder<?, InputT> inputCoder =
-            (KvCoder<?, InputT>) context.getInput(transform).getCoder();
-
-        FlinkStatefulDoFnFunction<?, ?, OutputT> doFnWrapper = new FlinkStatefulDoFnFunction<>(
-            (DoFn) doFn, windowingStrategy, sideInputStrategies, context.getPipelineOptions(),
-            outputMap, transform.getMainOutputTag()
-        );
-
-        Grouping<WindowedValue<InputT>> grouping =
-            inputDataSet.groupBy(new KvKeySelector(inputCoder.getKeyCoder()));
-
-        outputDataSet =
-            new GroupReduceOperator(grouping, typeInformation, doFnWrapper, transform.getName());
-
-      } else {
-        FlinkDoFnFunction<InputT, RawUnionValue> doFnWrapper =
-            new FlinkDoFnFunction(
-                doFn,
-                windowingStrategy,
-                sideInputStrategies,
-                context.getPipelineOptions(),
-                outputMap,
-                transform.getMainOutputTag());
-
-        outputDataSet = new MapPartitionOperator<>(
-            inputDataSet, typeInformation,
-            doFnWrapper, transform.getName());
-
-      }
-
-      transformSideInputs(sideInputs, outputDataSet, context);
-
-      for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
-        pruneOutput(
-            outputDataSet,
-            context,
-            outputMap.get(output.getKey()),
-            (PCollection) output.getValue());
-      }
-
-    }
-
-    private <T> void pruneOutput(
-        DataSet<WindowedValue<RawUnionValue>> taggedDataSet,
-        FlinkBatchTranslationContext context,
-        int integerTag,
-        PCollection<T> collection) {
-      TypeInformation<WindowedValue<T>> outputType = context.getTypeInfo(collection);
-
-      FlinkMultiOutputPruningFunction<T> pruningFunction =
-          new FlinkMultiOutputPruningFunction<>(integerTag);
-
-      FlatMapOperator<WindowedValue<RawUnionValue>, WindowedValue<T>> pruningOperator =
-          new FlatMapOperator<>(
-              taggedDataSet,
-              outputType,
-              pruningFunction,
-              collection.getName());
-
-      context.setOutputDataSet(collection, pruningOperator);
-    }
-  }
-
-  private static class FlattenPCollectionTranslatorBatch<T>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
-      Flatten.PCollections<T>> {
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void translateNode(
-        Flatten.PCollections<T> transform,
-        FlinkBatchTranslationContext context) {
-
-      Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform);
-      DataSet<WindowedValue<T>> result = null;
-
-      if (allInputs.isEmpty()) {
-
-        // create an empty dummy source to satisfy downstream operations
-        // we cannot create an empty source in Flink, therefore we have to
-        // add the flatMap that simply never forwards the single element
-        DataSource<String> dummySource =
-            context.getExecutionEnvironment().fromElements("dummy");
-        result = dummySource.flatMap(new FlatMapFunction<String, WindowedValue<T>>() {
-          @Override
-          public void flatMap(String s, Collector<WindowedValue<T>> collector) throws Exception {
-            // never return anything
-          }
-        }).returns(
-            new CoderTypeInformation<>(
-                WindowedValue.getFullCoder(
-                    (Coder<T>) VoidCoder.of(),
-                    GlobalWindow.Coder.INSTANCE)));
-      } else {
-        for (PValue taggedPc : allInputs.values()) {
-          checkArgument(
-              taggedPc instanceof PCollection,
-              "Got non-PCollection input to flatten: %s of type %s",
-              taggedPc,
-              taggedPc.getClass().getSimpleName());
-          PCollection<T> collection = (PCollection<T>) taggedPc;
-          DataSet<WindowedValue<T>> current = context.getInputDataSet(collection);
-          if (result == null) {
-            result = current;
-          } else {
-            result = result.union(current);
-          }
-        }
-      }
-
-      // insert a dummy filter, there seems to be a bug in Flink
-      // that produces duplicate elements after the union in some cases
-      // if we don't
-      result = result.filter(new FilterFunction<WindowedValue<T>>() {
-        @Override
-        public boolean filter(WindowedValue<T> tWindowedValue) throws Exception {
-          return true;
-        }
-      }).name("UnionFixFilter");
-      context.setOutputDataSet(context.getOutput(transform), result);
-    }
-  }
-
-  private static class CreatePCollectionViewTranslatorBatch<ElemT, ViewT>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
-          View.CreatePCollectionView<ElemT, ViewT>> {
-
-    @Override
-    public void translateNode(
-        View.CreatePCollectionView<ElemT, ViewT> transform,
-        FlinkBatchTranslationContext context) {
-      DataSet<WindowedValue<ElemT>> inputDataSet =
-          context.getInputDataSet(context.getInput(transform));
-
-      PCollectionView<ViewT> input = transform.getView();
-
-      context.setSideInputDataSet(input, inputDataSet);
-    }
-  }
-
-  private static void transformSideInputs(
-      List<PCollectionView<?>> sideInputs,
-      SingleInputUdfOperator<?, ?, ?> outputDataSet,
-      FlinkBatchTranslationContext context) {
-    // get corresponding Flink broadcast DataSets
-    for (PCollectionView<?> input : sideInputs) {
-      DataSet<?> broadcastSet = context.getSideInputDataSet(input);
-      outputDataSet.withBroadcastSet(broadcastSet, input.getTagInternal().getId());
-    }
-  }
-
-  private FlinkBatchTransformTranslators() {}
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
deleted file mode 100644
index 98dd0fb..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.common.collect.Iterables;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Helper for {@link FlinkBatchPipelineTranslator} and translators in
- * {@link FlinkBatchTransformTranslators}.
- */
-class FlinkBatchTranslationContext {
-
-  private final Map<PValue, DataSet<?>> dataSets;
-  private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets;
-
-  /**
-   * For keeping track about which DataSets don't have a successor. We
-   * need to terminate these with a discarding sink because the Beam
-   * model allows dangling operations.
-   */
-  private final Map<PValue, DataSet<?>> danglingDataSets;
-
-  private final ExecutionEnvironment env;
-  private final PipelineOptions options;
-
-  private AppliedPTransform<?, ?, ?> currentTransform;
-
-  // ------------------------------------------------------------------------
-
-  public FlinkBatchTranslationContext(ExecutionEnvironment env, PipelineOptions options) {
-    this.env = env;
-    this.options = options;
-    this.dataSets = new HashMap<>();
-    this.broadcastDataSets = new HashMap<>();
-
-    this.danglingDataSets = new HashMap<>();
-  }
-
-  // ------------------------------------------------------------------------
-
-  public Map<PValue, DataSet<?>> getDanglingDataSets() {
-    return danglingDataSets;
-  }
-
-  public ExecutionEnvironment getExecutionEnvironment() {
-    return env;
-  }
-
-  public PipelineOptions getPipelineOptions() {
-    return options;
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> DataSet<WindowedValue<T>> getInputDataSet(PValue value) {
-    // assume that the DataSet is used as an input if retrieved here
-    danglingDataSets.remove(value);
-    return (DataSet<WindowedValue<T>>) dataSets.get(value);
-  }
-
-  public <T> void setOutputDataSet(PValue value, DataSet<WindowedValue<T>> set) {
-    if (!dataSets.containsKey(value)) {
-      dataSets.put(value, set);
-      danglingDataSets.put(value, set);
-    }
-  }
-
-  /**
-   * Sets the AppliedPTransform which carries input/output.
-   * @param currentTransform
-   */
-  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
-    this.currentTransform = currentTransform;
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) {
-    return (DataSet<T>) broadcastDataSets.get(value);
-  }
-
-  public <ViewT, ElemT> void setSideInputDataSet(
-      PCollectionView<ViewT> value,
-      DataSet<WindowedValue<ElemT>> set) {
-    if (!broadcastDataSets.containsKey(value)) {
-      broadcastDataSets.put(value, set);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) {
-    return getTypeInfo(collection.getCoder(), collection.getWindowingStrategy());
-  }
-
-  @SuppressWarnings("unchecked")
-  public <T> TypeInformation<WindowedValue<T>> getTypeInfo(
-      Coder<T> coder,
-      WindowingStrategy<?, ?> windowingStrategy) {
-    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
-        WindowedValue.getFullCoder(
-            coder,
-            windowingStrategy.getWindowFn().windowCoder());
-
-    return new CoderTypeInformation<>(windowedValueCoder);
-  }
-
-  Map<TupleTag<?>, PValue> getInputs(PTransform<?, ?> transform) {
-    return currentTransform.getInputs();
-  }
-
-  @SuppressWarnings("unchecked")
-  <T extends PValue> T getInput(PTransform<T, ?> transform) {
-    return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
-  }
-
-  Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) {
-    return currentTransform.getOutputs();
-  }
-
-  @SuppressWarnings("unchecked")
-  <T extends PValue> T getOutput(PTransform<?, T> transform) {
-    return (T) Iterables.getOnlyElement(currentTransform.getOutputs().values());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
deleted file mode 100644
index bf4395f..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import java.io.IOException;
-
-import org.apache.beam.sdk.AggregatorRetrievalException;
-import org.apache.beam.sdk.AggregatorValues;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.joda.time.Duration;
-
-
-/**
- * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} with Flink.
- * In detached execution, results and job execution are currently unavailable.
- */
-public class FlinkDetachedRunnerResult implements PipelineResult {
-
-  FlinkDetachedRunnerResult() {}
-
-  @Override
-  public State getState() {
-    return State.UNKNOWN;
-  }
-
-  @Override
-  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator)
-      throws AggregatorRetrievalException {
-    throw new AggregatorRetrievalException(
-        "Accumulators can't be retrieved for detached Job executions.",
-        new UnsupportedOperationException());
-  }
-
-  @Override
-  public MetricResults metrics() {
-    throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
-  }
-
-  @Override
-  public State cancel() throws IOException {
-    throw new UnsupportedOperationException("Cancelling is not yet supported.");
-  }
-
-  @Override
-  public State waitUntilFinish() {
-    return State.UNKNOWN;
-  }
-
-  @Override
-  public State waitUntilFinish(Duration duration) {
-    return State.UNKNOWN;
-  }
-
-  @Override
-  public String toString() {
-    return "FlinkDetachedRunnerResult{}";
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
deleted file mode 100644
index ba00036..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.List;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.CollectionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The class that instantiates and manages the execution of a given job.
- * Depending on if the job is a Streaming or Batch processing one, it creates
- * the adequate execution environment ({@link ExecutionEnvironment}
- * or {@link StreamExecutionEnvironment}), the necessary {@link FlinkPipelineTranslator}
- * ({@link FlinkBatchPipelineTranslator} or {@link FlinkStreamingPipelineTranslator}) to
- * transform the Beam job into a Flink one, and executes the (translated) job.
- */
-class FlinkPipelineExecutionEnvironment {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
-
-  private final FlinkPipelineOptions options;
-
-  /**
-   * The Flink Batch execution environment. This is instantiated to either a
-   * {@link org.apache.flink.api.java.CollectionEnvironment},
-   * a {@link org.apache.flink.api.java.LocalEnvironment} or
-   * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
-   * options.
-   */
-  private ExecutionEnvironment flinkBatchEnv;
-
-  /**
-   * The Flink Streaming execution environment. This is instantiated to either a
-   * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
-   * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
-   * on the configuration options, and more specifically, the url of the master.
-   */
-  private StreamExecutionEnvironment flinkStreamEnv;
-
-  /**
-   * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the
-   * provided {@link FlinkPipelineOptions}.
-   *
-   * @param options the user-defined pipeline options.
-   * */
-  FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
-    this.options = checkNotNull(options);
-  }
-
-  /**
-   * Depending on if the job is a Streaming or a Batch one, this method creates
-   * the necessary execution environment and pipeline translator, and translates
-   * the {@link org.apache.beam.sdk.values.PCollection} program into
-   * a {@link org.apache.flink.api.java.DataSet}
-   * or {@link org.apache.flink.streaming.api.datastream.DataStream} one.
-   * */
-  public void translate(FlinkRunner flinkRunner, Pipeline pipeline) {
-    this.flinkBatchEnv = null;
-    this.flinkStreamEnv = null;
-
-    PipelineTranslationOptimizer optimizer =
-        new PipelineTranslationOptimizer(TranslationMode.BATCH, options);
-
-    optimizer.translate(pipeline);
-    TranslationMode translationMode = optimizer.getTranslationMode();
-
-    FlinkPipelineTranslator translator;
-    if (translationMode == TranslationMode.STREAMING) {
-      this.flinkStreamEnv = createStreamExecutionEnvironment();
-      translator = new FlinkStreamingPipelineTranslator(flinkRunner, flinkStreamEnv, options);
-    } else {
-      this.flinkBatchEnv = createBatchExecutionEnvironment();
-      translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
-    }
-
-    translator.translate(pipeline);
-  }
-
-  /**
-   * Launches the program execution.
-   * */
-  public JobExecutionResult executePipeline() throws Exception {
-    final String jobName = options.getJobName();
-
-    if (flinkBatchEnv != null) {
-      return flinkBatchEnv.execute(jobName);
-    } else if (flinkStreamEnv != null) {
-      return flinkStreamEnv.execute(jobName);
-    } else {
-      throw new IllegalStateException("The Pipeline has not yet been translated.");
-    }
-  }
-
-  /**
-   * If the submitted job is a batch processing job, this method creates the adequate
-   * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
-   * on the user-specified options.
-   */
-  private ExecutionEnvironment createBatchExecutionEnvironment() {
-
-    LOG.info("Creating the required Batch Execution Environment.");
-
-    String masterUrl = options.getFlinkMaster();
-    ExecutionEnvironment flinkBatchEnv;
-
-    // depending on the master, create the right environment.
-    if (masterUrl.equals("[local]")) {
-      flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
-    } else if (masterUrl.equals("[collection]")) {
-      flinkBatchEnv = new CollectionEnvironment();
-    } else if (masterUrl.equals("[auto]")) {
-      flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
-    } else if (masterUrl.matches(".*:\\d*")) {
-      String[] parts = masterUrl.split(":");
-      List<String> stagingFiles = options.getFilesToStage();
-      flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
-          Integer.parseInt(parts[1]),
-          stagingFiles.toArray(new String[stagingFiles.size()]));
-    } else {
-      LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
-      flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
-    }
-
-    // set the correct parallelism.
-    if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment)) {
-      flinkBatchEnv.setParallelism(options.getParallelism());
-    }
-
-    // set parallelism in the options (required by some execution code)
-    options.setParallelism(flinkBatchEnv.getParallelism());
-
-    if (options.getObjectReuse()) {
-      flinkBatchEnv.getConfig().enableObjectReuse();
-    } else {
-      flinkBatchEnv.getConfig().disableObjectReuse();
-    }
-
-    return flinkBatchEnv;
-  }
-
-  /**
-   * If the submitted job is a stream processing job, this method creates the adequate
-   * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
-   * on the user-specified options.
-   */
-  private StreamExecutionEnvironment createStreamExecutionEnvironment() {
-
-    LOG.info("Creating the required Streaming Environment.");
-
-    String masterUrl = options.getFlinkMaster();
-    StreamExecutionEnvironment flinkStreamEnv = null;
-
-    // depending on the master, create the right environment.
-    if (masterUrl.equals("[local]")) {
-      flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
-    } else if (masterUrl.equals("[auto]")) {
-      flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-    } else if (masterUrl.matches(".*:\\d*")) {
-      String[] parts = masterUrl.split(":");
-      List<String> stagingFiles = options.getFilesToStage();
-      flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
-          Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()]));
-    } else {
-      LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
-      flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-    }
-
-    // set the correct parallelism.
-    if (options.getParallelism() != -1) {
-      flinkStreamEnv.setParallelism(options.getParallelism());
-    }
-
-    // set parallelism in the options (required by some execution code)
-    options.setParallelism(flinkStreamEnv.getParallelism());
-
-    if (options.getObjectReuse()) {
-      flinkStreamEnv.getConfig().enableObjectReuse();
-    } else {
-      flinkStreamEnv.getConfig().disableObjectReuse();
-    }
-
-    // default to event time
-    flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-    // for the following 2 parameters, a value of -1 means that Flink will use
-    // the default values as specified in the configuration.
-    int numRetries = options.getNumberOfExecutionRetries();
-    if (numRetries != -1) {
-      flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
-    }
-    long retryDelay = options.getExecutionRetryDelay();
-    if (retryDelay != -1) {
-      flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
-    }
-
-    // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink).
-    // If the value is not -1, then the validity checks are applied.
-    // By default, checkpointing is disabled.
-    long checkpointInterval = options.getCheckpointingInterval();
-    if (checkpointInterval != -1) {
-      if (checkpointInterval < 1) {
-        throw new IllegalArgumentException("The checkpoint interval must be positive");
-      }
-      flinkStreamEnv.enableCheckpointing(checkpointInterval);
-    }
-
-    // State backend
-    final AbstractStateBackend stateBackend = options.getStateBackend();
-    if (stateBackend != null) {
-      flinkStreamEnv.setStateBackend(stateBackend);
-    }
-
-    return flinkStreamEnv;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
deleted file mode 100644
index ef9afea..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import java.util.List;
-import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-
-/**
- * Options which can be used to configure a Flink PipelineRunner.
- */
-public interface FlinkPipelineOptions
-    extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
-
-  /**
-   * List of local files to make available to workers.
-   *
-   * <p>Jars are placed on the worker's classpath.
-   *
-   * <p>The default value is the list of jars from the main program's classpath.
-   */
-  @Description("Jar-Files to send to all workers and put on the classpath. "
-      + "The default value is all files from the classpath.")
-  @JsonIgnore
-  List<String> getFilesToStage();
-  void setFilesToStage(List<String> value);
-
-  /**
-   * The url of the Flink JobManager on which to execute pipelines. This can either be
-   * the the address of a cluster JobManager, in the form "host:port" or one of the special
-   * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink
-   * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while
-   * "[auto]" will let the system decide where to execute the pipeline based on the environment.
-   */
-  @Description("Address of the Flink Master where the Pipeline should be executed. Can"
-      + " either be of the form \"host:port\" or one of the special values [local], "
-      + "[collection] or [auto].")
-  String getFlinkMaster();
-  void setFlinkMaster(String value);
-
-  @Description("The degree of parallelism to be used when distributing operations onto workers.")
-  @Default.InstanceFactory(DefaultParallelismFactory.class)
-  Integer getParallelism();
-  void setParallelism(Integer value);
-
-  @Description("The interval between consecutive checkpoints (i.e. snapshots of the current"
-      + "pipeline state used for fault tolerance).")
-  @Default.Long(-1L)
-  Long getCheckpointingInterval();
-  void setCheckpointingInterval(Long interval);
-
-  @Description("Sets the number of times that failed tasks are re-executed. "
-      + "A value of zero effectively disables fault tolerance. A value of -1 indicates "
-      + "that the system default value (as defined in the configuration) should be used.")
-  @Default.Integer(-1)
-  Integer getNumberOfExecutionRetries();
-  void setNumberOfExecutionRetries(Integer retries);
-
-  @Description("Sets the delay between executions. A value of {@code -1} "
-      + "indicates that the default value should be used.")
-  @Default.Long(-1L)
-  Long getExecutionRetryDelay();
-  void setExecutionRetryDelay(Long delay);
-
-  @Description("Sets the behavior of reusing objects.")
-  @Default.Boolean(false)
-  Boolean getObjectReuse();
-  void setObjectReuse(Boolean reuse);
-
-  /**
-   * State backend to store Beam's state during computation.
-   * Note: Only applicable when executing in streaming mode.
-   */
-  @Description("Sets the state backend to use in streaming mode. "
-      + "Otherwise the default is read from the Flink config.")
-  @JsonIgnore
-  AbstractStateBackend getStateBackend();
-  void setStateBackend(AbstractStateBackend stateBackend);
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java
deleted file mode 100644
index 65f416d..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-
-/**
- * The role of this class is to translate the Beam operators to
- * their Flink counterparts. If we have a streaming job, this is instantiated as a
- * {@link FlinkStreamingPipelineTranslator}. In other case, i.e. for a batch job,
- * a {@link FlinkBatchPipelineTranslator} is created. Correspondingly, the
- * {@link org.apache.beam.sdk.values.PCollection}-based user-provided job is translated into
- * a {@link org.apache.flink.streaming.api.datastream.DataStream} (for streaming) or a
- * {@link org.apache.flink.api.java.DataSet} (for batch) one.
- */
-abstract class FlinkPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
-
-  /**
-   * Translates the pipeline by passing this class as a visitor.
-   * @param pipeline The pipeline to be translated
-   */
-  public void translate(Pipeline pipeline) {
-    pipeline.traverseTopologically(this);
-  }
-
-  /**
-   * Utility formatting method.
-   * @param n number of spaces to generate
-   * @return String with "|" followed by n spaces
-   */
-  protected static String genSpaces(int n) {
-    StringBuilder builder = new StringBuilder();
-    for (int i = 0; i < n; i++) {
-      builder.append("|   ");
-    }
-    return builder.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
deleted file mode 100644
index 096f030..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.common.base.Joiner;
-import java.io.File;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.program.DetachedEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link PipelineRunner} that executes the operations in the
- * pipeline by first translating them to a Flink Plan and then executing them either locally
- * or on a Flink cluster, depending on the configuration.
- */
-public class FlinkRunner extends PipelineRunner<PipelineResult> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(FlinkRunner.class);
-
-  /**
-   * Provided options.
-   */
-  private final FlinkPipelineOptions options;
-
-  /**
-   * Construct a runner from the provided options.
-   *
-   * @param options Properties which configure the runner.
-   * @return The newly created runner.
-   */
-  public static FlinkRunner fromOptions(PipelineOptions options) {
-    FlinkPipelineOptions flinkOptions =
-        PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
-    ArrayList<String> missing = new ArrayList<>();
-
-    if (flinkOptions.getAppName() == null) {
-      missing.add("appName");
-    }
-    if (missing.size() > 0) {
-      throw new IllegalArgumentException(
-          "Missing required values: " + Joiner.on(',').join(missing));
-    }
-
-    if (flinkOptions.getFilesToStage() == null) {
-      flinkOptions.setFilesToStage(detectClassPathResourcesToStage(
-          FlinkRunner.class.getClassLoader()));
-      LOG.info("PipelineOptions.filesToStage was not specified. "
-              + "Defaulting to files from the classpath: will stage {} files. "
-              + "Enable logging at DEBUG level to see which files will be staged.",
-          flinkOptions.getFilesToStage().size());
-      LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
-    }
-
-    // Set Flink Master to [auto] if no option was specified.
-    if (flinkOptions.getFlinkMaster() == null) {
-      flinkOptions.setFlinkMaster("[auto]");
-    }
-
-    return new FlinkRunner(flinkOptions);
-  }
-
-  private FlinkRunner(FlinkPipelineOptions options) {
-    this.options = options;
-    this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
-  }
-
-  @Override
-  public PipelineResult run(Pipeline pipeline) {
-    logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
-
-    LOG.info("Executing pipeline using FlinkRunner.");
-
-    FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);
-
-    LOG.info("Translating pipeline to Flink program.");
-    env.translate(this, pipeline);
-
-    JobExecutionResult result;
-    try {
-      LOG.info("Starting execution of Flink program.");
-      result = env.executePipeline();
-    } catch (Exception e) {
-      LOG.error("Pipeline execution failed", e);
-      throw new RuntimeException("Pipeline execution failed", e);
-    }
-
-    if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) {
-      LOG.info("Pipeline submitted in Detached mode");
-      return new FlinkDetachedRunnerResult();
-    } else {
-      LOG.info("Execution finished in {} msecs", result.getNetRuntime());
-      Map<String, Object> accumulators = result.getAllAccumulatorResults();
-      if (accumulators != null && !accumulators.isEmpty()) {
-        LOG.info("Final aggregator values:");
-
-        for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
-          LOG.info("{} : {}", entry.getKey(), entry.getValue());
-        }
-      }
-
-      return new FlinkRunnerResult(accumulators, result.getNetRuntime());
-    }
-  }
-
-  /**
-   * For testing.
-   */
-  public FlinkPipelineOptions getPipelineOptions() {
-    return options;
-  }
-
-  @Override
-  public String toString() {
-    return "FlinkRunner#" + hashCode();
-  }
-
-  /**
-   * Attempts to detect all the resources the class loader has access to. This does not recurse
-   * to class loader parents stopping it from pulling in resources from the system class loader.
-   *
-   * @param classLoader The URLClassLoader to use to detect resources to stage.
-   * @return A list of absolute paths to the resources the class loader uses.
-   * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
-   *   of the resources the class loader exposes is not a file resource.
-   */
-  protected static List<String> detectClassPathResourcesToStage(
-      ClassLoader classLoader) {
-    if (!(classLoader instanceof URLClassLoader)) {
-      String message = String.format("Unable to use ClassLoader to detect classpath elements. "
-          + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
-      LOG.error(message);
-      throw new IllegalArgumentException(message);
-    }
-
-    List<String> files = new ArrayList<>();
-    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
-      try {
-        files.add(new File(url.toURI()).getAbsolutePath());
-      } catch (IllegalArgumentException | URISyntaxException e) {
-        String message = String.format("Unable to convert url (%s) to file.", url);
-        LOG.error(message);
-        throw new IllegalArgumentException(message, e);
-      }
-    }
-    return files;
-  }
-
-  /** A set of {@link View}s with non-deterministic key coders. */
-  Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders;
-
-  /**
-   * Records that the {@link PTransform} requires a deterministic key coder.
-   */
-  void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
-    ptransformViewsWithNonDeterministicKeyCoders.add(ptransform);
-  }
-
-  /** Outputs a warning about PCollection views without deterministic key coders. */
-  private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline) {
-    // We need to wait till this point to determine the names of the transforms since only
-    // at this time do we know the hierarchy of the transforms otherwise we could
-    // have just recorded the full names during apply time.
-    if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
-      final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>();
-      pipeline.traverseTopologically(new Pipeline.PipelineVisitor() {
-        @Override
-        public void visitValue(PValue value, TransformHierarchy.Node producer) {
-        }
-
-        @Override
-        public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-          if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
-            ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
-          }
-        }
-
-        @Override
-        public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-          if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
-            ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
-          }
-          return CompositeBehavior.ENTER_TRANSFORM;
-        }
-
-        @Override
-        public void leaveCompositeTransform(TransformHierarchy.Node node) {
-        }
-      });
-
-      LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} "
-          + "because the key coder is not deterministic. Falling back to singleton implementation "
-          + "which may cause memory and/or performance problems. Future major versions of "
-          + "the Flink runner will require deterministic key coders.",
-          ptransformViewNamesWithNonDeterministicKeyCoders);
-    }
-  }
-}