You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/15 17:07:07 UTC

[12/13] incubator-beam git commit: [flink] restructure and cleanup Maven layout

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index c8c5d84..31713cd 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -1,261 +1,167 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
 -->
 <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
-        xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.beam</groupId>
-        <artifactId>runners</artifactId>
-        <version>1.6.0-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>flink-runner</artifactId>
-    <version>0.3-SNAPSHOT</version>
-
-    <name>Flink Beam Runner</name>
-    <packaging>jar</packaging>
-
-    <inceptionYear>2015</inceptionYear>
-
-    <licenses>
-        <license>
-            <name>The Apache Software License, Version 2.0</name>
-            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-            <distribution>repo</distribution>
-        </license>
-    </licenses>
-
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-        <flink.version>1.0.0</flink.version>
-        <beam.version>1.6.0-SNAPSHOT</beam.version>
-        <scala.major.version>2.10</scala.major.version>
-        <!-- Default parameters for mvn exec:exec -->
-        <clazz>org.apache.beam.runners.flink.examples.WordCount</clazz>
-        <input>kinglear.txt</input>
-        <output>wordcounts.txt</output>
-        <parallelism>1</parallelism>
-    </properties>
-
-    <repositories>
-        <repository>
-            <id>apache.snapshots</id>
-            <name>Apache Development Snapshot Repository</name>
-            <url>https://repository.apache.org/content/repositories/snapshots/</url>
-            <releases>
-                <enabled>false</enabled>
-            </releases>
-            <snapshots>
-                <enabled>true</enabled>
-            </snapshots>
-        </repository>
-    </repositories>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <version>${flink.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_${scala.major.version}</artifactId>
-            <version>${flink.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_${scala.major.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${flink.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients_${scala.major.version}</artifactId>
-            <version>${flink.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils_${scala.major.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-kafka-0.8_${scala.major.version}</artifactId>
-            <version>${flink.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-avro_${scala.major.version}</artifactId>
-            <version>${flink.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.google.cloud.dataflow</groupId>
-            <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
-            <version>${beam.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-jdk14</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
-            <version>1.9.5</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-
-            <!-- JAR Packaging -->
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <version>2.6</version><!--$NO-MVN-MAN-VER$-->
-                <configuration>
-                    <archive>
-                        <manifest>
-                            <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
-                            <addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
-                        </manifest>
-                    </archive>
-                </configuration>
-            </plugin>
-
-            <!-- Java compiler -->
-            <plugin>
-
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>3.1</version><!--$NO-MVN-MAN-VER$-->
-                <configuration>
-                    <source>1.7</source>
-                    <target>1.7</target>
-                </configuration>
-            </plugin>
-
-            <!-- Integration Tests -->
-            <plugin>
-                <artifactId>maven-failsafe-plugin</artifactId>
-                <version>2.17</version>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>integration-test</goal>
-                            <goal>verify</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <forkCount>1</forkCount>
-                    <argLine>-Dlog4j.configuration=log4j-test.properties  -XX:-UseGCOverheadLimit</argLine>
-                </configuration>
-            </plugin>
-
-            <!-- Unit Tests -->
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <version>2.17</version>
-                <configuration>
-                    <argLine>-Dlog4j.configuration=log4j-test.properties  -XX:-UseGCOverheadLimit</argLine>
-                </configuration>
-            </plugin>
-
-            <!-- Eclipse Integration -->
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-eclipse-plugin</artifactId>
-                <version>2.8</version>
-                <configuration>
-                    <classpathContainers>
-                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-                    </classpathContainers>
-                    <downloadSources>true</downloadSources>
-                    <downloadJavadocs>true</downloadJavadocs>
-                </configuration>
-            </plugin>
-
-            <!-- Maven minimum version check -->
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-enforcer-plugin</artifactId>
-                <version>1.3.1</version><!--$NO-MVN-MAN-VER$-->
-                <executions>
-                    <execution>
-                        <id>enforce-maven</id>
-                        <goals>
-                            <goal>enforce</goal>
-                        </goals>
-                        <configuration>
-                            <rules>
-                                <requireJavaVersion>
-                                    <version>[1.7,)</version>
-                                </requireJavaVersion>
-                                <requireMavenVersion>
-                                    <!-- enforce at least mvn version 3.0.3 -->
-                                    <version>[3.0.3,)</version>
-                                </requireMavenVersion>
-                            </rules>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>exec-maven-plugin</artifactId>
-                <version>1.2.1</version>
-                <executions>
-                    <execution>
-                        <phase>none</phase>
-                    </execution>
-                </executions>
-                <configuration>
-                    <executable>java</executable>
-                    <arguments>
-                        <argument>-classpath</argument>
-                        <classpath/>
-                        <argument>${clazz}</argument>
-                        <argument>--input=${input}</argument>
-                        <argument>--output=${output}</argument>
-                        <argument>--parallelism=${parallelism}</argument>
-                    </arguments>
-                </configuration>
-            </plugin>
-
-        </plugins>
-
-    </build>
+     xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>runners-parent</artifactId>
+    <version>1.6.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>flink-runner-parent</artifactId>
+  <version>0.4-SNAPSHOT</version>
+
+  <name>Flink Beam Runner</name>
+  <packaging>pom</packaging>
+
+  <inceptionYear>2015</inceptionYear>
+
+  <modules>
+    <module>runner</module>
+    <module>examples</module>
+  </modules>
+
+  <licenses>
+    <license>
+      <name>The Apache Software License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+    </license>
+  </licenses>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    <flink.version>1.0.0</flink.version>
+    <beam.version>1.6.0-SNAPSHOT</beam.version>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>apache.snapshots</id>
+      <name>Apache Development Snapshot Repository</name>
+      <url>https://repository.apache.org/content/repositories/snapshots/</url>
+      <releases>
+        <enabled>false</enabled>
+      </releases>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <build>
+
+    <pluginManagement>
+
+      <plugins>
+
+        <!-- Integration Tests -->
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-failsafe-plugin</artifactId>
+          <version>2.17</version>
+          <executions>
+            <execution>
+              <goals>
+                <goal>integration-test</goal>
+                <goal>verify</goal>
+              </goals>
+            </execution>
+          </executions>
+          <configuration>
+            <forkCount>1</forkCount>
+            <argLine>-Dlog4j.configuration=log4j-test.properties  -XX:-UseGCOverheadLimit</argLine>
+          </configuration>
+        </plugin>
+
+        <!-- Unit Tests -->
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-surefire-plugin</artifactId>
+          <version>2.17</version>
+          <configuration>
+            <argLine>-Dlog4j.configuration=log4j-test.properties  -XX:-UseGCOverheadLimit</argLine>
+          </configuration>
+        </plugin>
+
+        <plugin>
+          <groupId>org.codehaus.mojo</groupId>
+          <artifactId>exec-maven-plugin</artifactId>
+          <version>1.2.1</version>
+          <executions>
+            <execution>
+              <phase>none</phase>
+            </execution>
+          </executions>
+        </plugin>
+
+      </plugins>
+
+    </pluginManagement>
+
+
+    <plugins>
+      <!-- Java compiler -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.1</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+
+      <!-- Maven minimum version check -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>1.3.1</version>
+        <executions>
+          <execution>
+            <id>enforce-maven</id>
+            <goals>
+              <goal>enforce</goal>
+            </goals>
+            <configuration>
+              <rules>
+                <requireJavaVersion>
+                  <version>[1.7,)</version>
+                </requireJavaVersion>
+                <requireMavenVersion>
+                  <!-- enforce at least mvn version 3.0.3 -->
+                  <version>[3.0.3,)</version>
+                </requireMavenVersion>
+              </rules>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+
+
+  </build>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
new file mode 100644
index 0000000..212b973
--- /dev/null
+++ b/runners/flink/runner/pom.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+         xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>flink-runner-parent</artifactId>
+    <version>0.4-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>flink-runner_2.10</artifactId>
+  <version>0.4-SNAPSHOT</version>
+
+  <name>Flink Beam Runner Core</name>
+  <packaging>jar</packaging>
+
+  <inceptionYear>2015</inceptionYear>
+
+  <licenses>
+    <license>
+      <name>The Apache Software License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+    </license>
+  </licenses>
+
+  <dependencies>
+    <!-- Flink dependencies -->
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_2.10</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-java</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-clients_2.10</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+    <!-- Libraries not part of Flink which need to be included by the user (see flink-examples) -->
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-avro_2.10</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+    <!--- Dataflow -->
+    <dependency>
+      <groupId>com.google.cloud.dataflow</groupId>
+      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
+      <version>${beam.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <version>1.0-rc2</version>
+    </dependency>
+    <!-- Test scoped -->
+    <dependency>
+      <groupId>com.google.cloud.dataflow</groupId>
+      <artifactId>google-cloud-dataflow-java-examples-all</artifactId>
+      <version>${beam.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_2.10</artifactId>
+      <version>${flink.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-test-utils_2.10</artifactId>
+      <version>${flink.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.9.5</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+
+      <!-- Integration Tests -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+      </plugin>
+
+      <!-- Unit Tests -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+      </plugin>
+
+    </plugins>
+
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
new file mode 100644
index 0000000..8825ed3
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.runners.flink.translation.FlinkPipelineTranslator;
+import org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator;
+import org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * The class that instantiates and manages the execution of a given job.
+ * Depending on if the job is a Streaming or Batch processing one, it creates
+ * the adequate execution environment ({@link ExecutionEnvironment} or {@link StreamExecutionEnvironment}),
+ * the necessary {@link FlinkPipelineTranslator} ({@link FlinkBatchPipelineTranslator} or
+ * {@link FlinkStreamingPipelineTranslator})to transform the Beam job into a Flink one, and
+ * executes the (translated) job.
+ */
+public class FlinkPipelineExecutionEnvironment {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
+
+  private final FlinkPipelineOptions options;
+
+  /**
+   * The Flink Batch execution environment. This is instantiated to either a
+   * {@link org.apache.flink.api.java.CollectionEnvironment},
+   * a {@link org.apache.flink.api.java.LocalEnvironment} or
+   * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
+   * options.
+   */
+  private ExecutionEnvironment flinkBatchEnv;
+
+
+  /**
+   * The Flink Streaming execution environment. This is instantiated to either a
+   * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
+   * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
+   * on the configuration options, and more specifically, the url of the master.
+   */
+  private StreamExecutionEnvironment flinkStreamEnv;
+
+  /**
+   * Translator for this FlinkPipelineRunner. Its role is to translate the Beam operators to
+   * their Flink counterparts. Based on the options provided by the user, if we have a streaming job,
+   * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. In other case, i.e. a batch job,
+   * a {@link FlinkBatchPipelineTranslator} is created.
+   */
+  private FlinkPipelineTranslator flinkPipelineTranslator;
+
+  /**
+   * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the
+   * provided {@link FlinkPipelineOptions}.
+   *
+   * @param options the user-defined pipeline options.
+   * */
+  public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
+    this.options = Preconditions.checkNotNull(options);
+    this.createPipelineExecutionEnvironment();
+    this.createPipelineTranslator();
+  }
+
+  /**
+   * Depending on the type of job (Streaming or Batch) and the user-specified options,
+   * this method creates the adequate ExecutionEnvironment.
+   */
+  private void createPipelineExecutionEnvironment() {
+    if (options.isStreaming()) {
+      createStreamExecutionEnvironment();
+    } else {
+      createBatchExecutionEnvironment();
+    }
+  }
+
+  /**
+   * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph
+   * translator. In the case of batch, it will work with {@link org.apache.flink.api.java.DataSet},
+   * while for streaming, it will work with {@link org.apache.flink.streaming.api.datastream.DataStream}.
+   */
+  private void createPipelineTranslator() {
+    checkInitializationState();
+    if (this.flinkPipelineTranslator != null) {
+      throw new IllegalStateException("FlinkPipelineTranslator already initialized.");
+    }
+
+    this.flinkPipelineTranslator = options.isStreaming() ?
+        new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
+        new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
+  }
+
+  /**
+   * Depending on if the job is a Streaming or a Batch one, this method creates
+   * the necessary execution environment and pipeline translator, and translates
+   * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program into
+   * a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream}
+   * one.
+   * */
+  public void translate(Pipeline pipeline) {
+    checkInitializationState();
+    if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
+      createPipelineExecutionEnvironment();
+    }
+    if (this.flinkPipelineTranslator == null) {
+      createPipelineTranslator();
+    }
+    this.flinkPipelineTranslator.translate(pipeline);
+  }
+
+  /**
+   * Launches the program execution.
+   * */
+  public JobExecutionResult executePipeline() throws Exception {
+    if (options.isStreaming()) {
+      if (this.flinkStreamEnv == null) {
+        throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
+      }
+      if (this.flinkPipelineTranslator == null) {
+        throw new RuntimeException("FlinkPipelineTranslator not initialized.");
+      }
+      return this.flinkStreamEnv.execute();
+    } else {
+      if (this.flinkBatchEnv == null) {
+        throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
+      }
+      if (this.flinkPipelineTranslator == null) {
+        throw new RuntimeException("FlinkPipelineTranslator not initialized.");
+      }
+      return this.flinkBatchEnv.execute();
+    }
+  }
+
+  /**
+   * If the submitted job is a batch processing job, this method creates the adequate
+   * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
+   * on the user-specified options.
+   */
+  private void createBatchExecutionEnvironment() {
+    if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+      throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
+    }
+
+    LOG.info("Creating the required Batch Execution Environment.");
+
+    String masterUrl = options.getFlinkMaster();
+    this.flinkStreamEnv = null;
+
+    // depending on the master, create the right environment.
+    if (masterUrl.equals("[local]")) {
+      this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+    } else if (masterUrl.equals("[collection]")) {
+      this.flinkBatchEnv = new CollectionEnvironment();
+    } else if (masterUrl.equals("[auto]")) {
+      this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+    } else if (masterUrl.matches(".*:\\d*")) {
+      String[] parts = masterUrl.split(":");
+      List<String> stagingFiles = options.getFilesToStage();
+      this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
+          Integer.parseInt(parts[1]),
+          stagingFiles.toArray(new String[stagingFiles.size()]));
+    } else {
+      LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+      this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+    }
+
+    // set the correct parallelism.
+    if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) {
+      this.flinkBatchEnv.setParallelism(options.getParallelism());
+    }
+
+    // set parallelism in the options (required by some execution code)
+    options.setParallelism(flinkBatchEnv.getParallelism());
+  }
+
+  /**
+   * If the submitted job is a stream processing job, this method creates the adequate
+   * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
+   * on the user-specified options.
+   */
+  private void createStreamExecutionEnvironment() {
+    if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+      throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
+    }
+
+    LOG.info("Creating the required Streaming Environment.");
+
+    String masterUrl = options.getFlinkMaster();
+    this.flinkBatchEnv = null;
+
+    // depending on the master, create the right environment.
+    if (masterUrl.equals("[local]")) {
+      this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
+    } else if (masterUrl.equals("[auto]")) {
+      this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+    } else if (masterUrl.matches(".*:\\d*")) {
+      String[] parts = masterUrl.split(":");
+      List<String> stagingFiles = options.getFilesToStage();
+      this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
+          Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()]));
+    } else {
+      LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+      this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+    }
+
+    // set the correct parallelism.
+    if (options.getParallelism() != -1) {
+      this.flinkStreamEnv.setParallelism(options.getParallelism());
+    }
+
+    // set parallelism in the options (required by some execution code)
+    options.setParallelism(flinkStreamEnv.getParallelism());
+
+    // default to event time
+    this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+    // for the following 2 parameters, a value of -1 means that Flink will use
+    // the default values as specified in the configuration.
+    int numRetries = options.getNumberOfExecutionRetries();
+    if (numRetries != -1) {
+      this.flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
+    }
+    long retryDelay = options.getExecutionRetryDelay();
+    if (retryDelay != -1) {
+      this.flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
+    }
+
+    // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink).
+    // If the value is not -1, then the validity checks are applied.
+    // By default, checkpointing is disabled.
+    long checkpointInterval = options.getCheckpointingInterval();
+    if(checkpointInterval != -1) {
+      if (checkpointInterval < 1) {
+        throw new IllegalArgumentException("The checkpoint interval must be positive");
+      }
+      this.flinkStreamEnv.enableCheckpointing(checkpointInterval);
+    }
+  }
+
+  private void checkInitializationState() {
+    if (options.isStreaming() && this.flinkBatchEnv != null) {
+      throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment.");
+    } else if (!options.isStreaming() && this.flinkStreamEnv != null) {
+      throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
new file mode 100644
index 0000000..2f4b3ea
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.StreamingOptions;
+
+import java.util.List;
+
+/**
+ * Options which can be used to configure a Flink PipelineRunner.
+ */
+public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
+
+  /**
+   * List of local files to make available to workers.
+   * <p>
+   * Jars are placed on the worker's classpath.
+   * <p>
+   * The default value is the list of jars from the main program's classpath.
+   */
+  @Description("Jar-Files to send to all workers and put on the classpath. " +
+      "The default value is all files from the classpath.")
+  @JsonIgnore
+  List<String> getFilesToStage();
+  void setFilesToStage(List<String> value);
+
+  /**
+   * The job name is used to identify jobs running on a Flink cluster.
+   */
+  @Description("Dataflow job name, to uniquely identify active jobs. "
+      + "Defaults to using the ApplicationName-UserName-Date.")
+  @Default.InstanceFactory(DataflowPipelineOptions.JobNameFactory.class)
+  String getJobName();
+  void setJobName(String value);
+
+  /**
+   * The url of the Flink JobManager on which to execute pipelines. This can either be
+   * the the address of a cluster JobManager, in the form "host:port" or one of the special
+   * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink
+   * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while
+   * "[auto]" will let the system decide where to execute the pipeline based on the environment.
+   */
+  @Description("Address of the Flink Master where the Pipeline should be executed. Can" +
+      " either be of the form \"host:port\" or one of the special values [local], " +
+      "[collection] or [auto].")
+  String getFlinkMaster();
+  void setFlinkMaster(String value);
+
+  @Description("The degree of parallelism to be used when distributing operations onto workers.")
+  @Default.Integer(-1)
+  Integer getParallelism();
+  void setParallelism(Integer value);
+
+  @Description("The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for " +
+      "fault tolerance).")
+  @Default.Long(-1L)
+  Long getCheckpointingInterval();
+  void setCheckpointingInterval(Long interval);
+
+  @Description("Sets the number of times that failed tasks are re-executed. " +
+      "A value of zero effectively disables fault tolerance. A value of -1 indicates " +
+      "that the system default value (as defined in the configuration) should be used.")
+  @Default.Integer(-1)
+  Integer getNumberOfExecutionRetries();
+  void setNumberOfExecutionRetries(Integer retries);
+
+  @Description("Sets the delay between executions. A value of {@code -1} indicates that the default value should be used.")
+  @Default.Long(-1L)
+  Long getExecutionRetryDelay();
+  void setExecutionRetryDelay(Long delay);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
new file mode 100644
index 0000000..fe773d9
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the
+ * pipeline by first translating them to a Flink Plan and then executing them either locally
+ * or on a Flink cluster, depending on the configuration.
+ * <p>
+ * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}.
+ */
+public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class);
+
+  /**
+   * Provided options.
+   */
+  private final FlinkPipelineOptions options;
+
+  private final FlinkPipelineExecutionEnvironment flinkJobEnv;
+
+  /**
+   * Construct a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static FlinkPipelineRunner fromOptions(PipelineOptions options) {
+    FlinkPipelineOptions flinkOptions =
+        PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
+    ArrayList<String> missing = new ArrayList<>();
+
+    if (flinkOptions.getAppName() == null) {
+      missing.add("appName");
+    }
+    if (missing.size() > 0) {
+      throw new IllegalArgumentException(
+          "Missing required values: " + Joiner.on(',').join(missing));
+    }
+
+    if (flinkOptions.getFilesToStage() == null) {
+      flinkOptions.setFilesToStage(detectClassPathResourcesToStage(
+          DataflowPipelineRunner.class.getClassLoader()));
+      LOG.info("PipelineOptions.filesToStage was not specified. "
+              + "Defaulting to files from the classpath: will stage {} files. "
+              + "Enable logging at DEBUG level to see which files will be staged.",
+          flinkOptions.getFilesToStage().size());
+      LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
+    }
+
+    // Verify jobName according to service requirements.
+    String jobName = flinkOptions.getJobName().toLowerCase();
+    Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), "JobName invalid; " +
+        "the name must consist of only the characters " + "[-a-z0-9], starting with a letter " +
+        "and ending with a letter " + "or number");
+    Preconditions.checkArgument(jobName.length() <= 40,
+        "JobName too long; must be no more than 40 characters in length");
+
+    // Set Flink Master to [auto] if no option was specified.
+    if (flinkOptions.getFlinkMaster() == null) {
+      flinkOptions.setFlinkMaster("[auto]");
+    }
+
+    return new FlinkPipelineRunner(flinkOptions);
+  }
+
+  private FlinkPipelineRunner(FlinkPipelineOptions options) {
+    this.options = options;
+    this.flinkJobEnv = new FlinkPipelineExecutionEnvironment(options);
+  }
+
+  @Override
+  public FlinkRunnerResult run(Pipeline pipeline) {
+    LOG.info("Executing pipeline using FlinkPipelineRunner.");
+
+    LOG.info("Translating pipeline to Flink program.");
+
+    this.flinkJobEnv.translate(pipeline);
+
+    LOG.info("Starting execution of Flink program.");
+    
+    JobExecutionResult result;
+    try {
+      result = this.flinkJobEnv.executePipeline();
+    } catch (Exception e) {
+      LOG.error("Pipeline execution failed", e);
+      throw new RuntimeException("Pipeline execution failed", e);
+    }
+
+    LOG.info("Execution finished in {} msecs", result.getNetRuntime());
+
+    Map<String, Object> accumulators = result.getAllAccumulatorResults();
+    if (accumulators != null && !accumulators.isEmpty()) {
+      LOG.info("Final aggregator values:");
+
+      for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
+        LOG.info("{} : {}", entry.getKey(), entry.getValue());
+      }
+    }
+
+    return new FlinkRunnerResult(accumulators, result.getNetRuntime());
+  }
+
+  /**
+   * For testing.
+   */
+  public FlinkPipelineOptions getPipelineOptions() {
+    return options;
+  }
+
+  /**
+   * Constructs a runner with default properties for testing.
+   *
+   * @return The newly created runner.
+   */
+  public static FlinkPipelineRunner createForTest(boolean streaming) {
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    // we use [auto] for testing since this will make it pick up the Testing
+    // ExecutionEnvironment
+    options.setFlinkMaster("[auto]");
+    options.setStreaming(streaming);
+    return new FlinkPipelineRunner(options);
+  }
+
+  @Override
+  public <Output extends POutput, Input extends PInput> Output apply(
+      PTransform<Input, Output> transform, Input input) {
+    return super.apply(transform, input);
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public String toString() {
+    return "DataflowPipelineRunner#" + hashCode();
+  }
+
+  /**
+   * Attempts to detect all the resources the class loader has access to. This does not recurse
+   * to class loader parents stopping it from pulling in resources from the system class loader.
+   *
+   * @param classLoader The URLClassLoader to use to detect resources to stage.
+   * @return A list of absolute paths to the resources the class loader uses.
+   * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
+   *                                  of the resources the class loader exposes is not a file resource.
+   */
+  protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
+    if (!(classLoader instanceof URLClassLoader)) {
+      String message = String.format("Unable to use ClassLoader to detect classpath elements. "
+          + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
+      LOG.error(message);
+      throw new IllegalArgumentException(message);
+    }
+
+    List<String> files = new ArrayList<>();
+    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
+      try {
+        files.add(new File(url.toURI()).getAbsolutePath());
+      } catch (IllegalArgumentException | URISyntaxException e) {
+        String message = String.format("Unable to convert url (%s) to file.", url);
+        LOG.error(message);
+        throw new IllegalArgumentException(message, e);
+      }
+    }
+    return files;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
new file mode 100644
index 0000000..8fd08ec
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException;
+import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Result of executing a {@link com.google.cloud.dataflow.sdk.Pipeline} with Flink. This
+ * has methods to query to job runtime and the final values of
+ * {@link com.google.cloud.dataflow.sdk.transforms.Aggregator}s.
+ */
+public class FlinkRunnerResult implements PipelineResult {
+  
+  private final Map<String, Object> aggregators;
+  
+  private final long runtime;
+  
+  public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
+    this.aggregators = (aggregators == null || aggregators.isEmpty()) ?
+        Collections.<String, Object>emptyMap() :
+        Collections.unmodifiableMap(aggregators);
+    
+    this.runtime = runtime;
+  }
+
+  @Override
+  public State getState() {
+    return null;
+  }
+
+  @Override
+  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) throws AggregatorRetrievalException {
+    // TODO provide a list of all accumulator step values
+    Object value = aggregators.get(aggregator.getName());
+    if (value != null) {
+      return new AggregatorValues<T>() {
+        @Override
+        public Map<String, T> getValuesAtSteps() {
+          return (Map<String, T>) aggregators;
+        }
+      };
+    } else {
+      throw new AggregatorRetrievalException("Accumulator results not found.",
+          new RuntimeException("Accumulator does not exist."));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
new file mode 100644
index 0000000..71e3b54
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.io;
+
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PDone;
+
+/**
+ * Transform for printing the contents of a {@link com.google.cloud.dataflow.sdk.values.PCollection}.
+ * to standard output.
+ *
+ * This is Flink-specific and will only work when executed using the
+ * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}.
+ */
+public class ConsoleIO {
+
+  /**
+   * A PTransform that writes a PCollection to a standard output.
+   */
+  public static class Write {
+
+    /**
+     * Returns a ConsoleIO.Write PTransform with a default step name.
+     */
+    public static Bound create() {
+      return new Bound();
+    }
+
+    /**
+     * Returns a ConsoleIO.Write PTransform with the given step name.
+     */
+    public static Bound named(String name) {
+      return new Bound().named(name);
+    }
+
+    /**
+     * A PTransform that writes a bounded PCollection to standard output.
+     */
+    public static class Bound extends PTransform<PCollection<?>, PDone> {
+      private static final long serialVersionUID = 0;
+
+      Bound() {
+        super("ConsoleIO.Write");
+      }
+
+      Bound(String name) {
+        super(name);
+      }
+
+      /**
+       * Returns a new ConsoleIO.Write PTransform that's like this one but with the given
+       * step
+       * name.  Does not modify this object.
+       */
+      public Bound named(String name) {
+        return new Bound(name);
+      }
+
+      @Override
+      public PDone apply(PCollection<?> input) {
+        return PDone.in(input.getPipeline());
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
new file mode 100644
index 0000000..28a10b7
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs.
+ * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator}
+ */
+public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class);
+
+  /**
+   * The necessary context in the case of a batch job.
+   */
+  private final FlinkBatchTranslationContext batchContext;
+
+  private int depth = 0;
+
+  /**
+   * Composite transform that we want to translate before proceeding with other transforms.
+   */
+  private PTransform<?, ?> currentCompositeTransform;
+
+  public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) {
+    this.batchContext = new FlinkBatchTranslationContext(env, options);
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Pipeline Visitor Methods
+  // --------------------------------------------------------------------------------------------
+
+  @Override
+  public void enterCompositeTransform(TransformTreeNode node) {
+    LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
+
+    PTransform<?, ?> transform = node.getTransform();
+    if (transform != null && currentCompositeTransform == null) {
+
+      BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+      if (translator != null) {
+        currentCompositeTransform = transform;
+        if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) {
+          // we can only optimize CoGroupByKey for input size 2
+          currentCompositeTransform = null;
+        }
+      }
+    }
+    this.depth++;
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformTreeNode node) {
+    PTransform<?, ?> transform = node.getTransform();
+    if (transform != null && currentCompositeTransform == transform) {
+
+      BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+      if (translator != null) {
+        LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
+        applyBatchTransform(transform, node, translator);
+        currentCompositeTransform = null;
+      } else {
+        throw new IllegalStateException("Attempted to translate composite transform " +
+            "but no translator was found: " + currentCompositeTransform);
+      }
+    }
+    this.depth--;
+    LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
+  }
+
+  @Override
+  public void visitTransform(TransformTreeNode node) {
+    LOG.info(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
+    if (currentCompositeTransform != null) {
+      // ignore it
+      return;
+    }
+
+    // get the transformation corresponding to hte node we are
+    // currently visiting and translate it into its Flink alternative.
+
+    PTransform<?, ?> transform = node.getTransform();
+    BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+    if (translator == null) {
+      LOG.info(node.getTransform().getClass().toString());
+      throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
+    }
+    applyBatchTransform(transform, node, translator);
+  }
+
+  @Override
+  public void visitValue(PValue value, TransformTreeNode producer) {
+    // do nothing here
+  }
+
+  private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) {
+
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) transform;
+
+    @SuppressWarnings("unchecked")
+    BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
+
+    // create the applied PTransform on the batchContext
+    batchContext.setCurrentTransform(AppliedPTransform.of(
+        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform));
+    typedTranslator.translateNode(typedTransform, batchContext);
+  }
+
+  /**
+   * A translator of a {@link PTransform}.
+   */
+  public interface BatchTransformTranslator<Type extends PTransform> {
+    void translateNode(Type transform, FlinkBatchTranslationContext context);
+  }
+
+  private static String genSpaces(int n) {
+    String s = "";
+    for (int i = 0; i < n; i++) {
+      s += "|   ";
+    }
+    return s;
+  }
+
+  private static String formatNodeName(TransformTreeNode node) {
+    return node.toString().split("@")[1] + node.getTransform();
+  }
+}