You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/15 17:07:07 UTC
[12/13] incubator-beam git commit: [flink] restructure and cleanup
Maven layout
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index c8c5d84..31713cd 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -1,261 +1,167 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
-->
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
- xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.beam</groupId>
- <artifactId>runners</artifactId>
- <version>1.6.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>flink-runner</artifactId>
- <version>0.3-SNAPSHOT</version>
-
- <name>Flink Beam Runner</name>
- <packaging>jar</packaging>
-
- <inceptionYear>2015</inceptionYear>
-
- <licenses>
- <license>
- <name>The Apache Software License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- </license>
- </licenses>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <flink.version>1.0.0</flink.version>
- <beam.version>1.6.0-SNAPSHOT</beam.version>
- <scala.major.version>2.10</scala.major.version>
- <!-- Default parameters for mvn exec:exec -->
- <clazz>org.apache.beam.runners.flink.examples.WordCount</clazz>
- <input>kinglear.txt</input>
- <output>wordcounts.txt</output>
- <parallelism>1</parallelism>
- </properties>
-
- <repositories>
- <repository>
- <id>apache.snapshots</id>
- <name>Apache Development Snapshot Repository</name>
- <url>https://repository.apache.org/content/repositories/snapshots/</url>
- <releases>
- <enabled>false</enabled>
- </releases>
- <snapshots>
- <enabled>true</enabled>
- </snapshots>
- </repository>
- </repositories>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.major.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.major.version}</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.major.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.major.version}</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-0.8_${scala.major.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-avro_${scala.major.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>com.google.cloud.dataflow</groupId>
- <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
- <version>${beam.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <version>1.9.5</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
-
- <!-- JAR Packaging -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>2.6</version><!--$NO-MVN-MAN-VER$-->
- <configuration>
- <archive>
- <manifest>
- <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
- <addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
- </manifest>
- </archive>
- </configuration>
- </plugin>
-
- <!-- Java compiler -->
- <plugin>
-
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version><!--$NO-MVN-MAN-VER$-->
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
-
- <!-- Integration Tests -->
- <plugin>
- <artifactId>maven-failsafe-plugin</artifactId>
- <version>2.17</version>
- <executions>
- <execution>
- <goals>
- <goal>integration-test</goal>
- <goal>verify</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <forkCount>1</forkCount>
- <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine>
- </configuration>
- </plugin>
-
- <!-- Unit Tests -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.17</version>
- <configuration>
- <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine>
- </configuration>
- </plugin>
-
- <!-- Eclipse Integration -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-eclipse-plugin</artifactId>
- <version>2.8</version>
- <configuration>
- <classpathContainers>
- <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
- </classpathContainers>
- <downloadSources>true</downloadSources>
- <downloadJavadocs>true</downloadJavadocs>
- </configuration>
- </plugin>
-
- <!-- Maven minimum version check -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-enforcer-plugin</artifactId>
- <version>1.3.1</version><!--$NO-MVN-MAN-VER$-->
- <executions>
- <execution>
- <id>enforce-maven</id>
- <goals>
- <goal>enforce</goal>
- </goals>
- <configuration>
- <rules>
- <requireJavaVersion>
- <version>[1.7,)</version>
- </requireJavaVersion>
- <requireMavenVersion>
- <!-- enforce at least mvn version 3.0.3 -->
- <version>[3.0.3,)</version>
- </requireMavenVersion>
- </rules>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <version>1.2.1</version>
- <executions>
- <execution>
- <phase>none</phase>
- </execution>
- </executions>
- <configuration>
- <executable>java</executable>
- <arguments>
- <argument>-classpath</argument>
- <classpath/>
- <argument>${clazz}</argument>
- <argument>--input=${input}</argument>
- <argument>--output=${output}</argument>
- <argument>--parallelism=${parallelism}</argument>
- </arguments>
- </configuration>
- </plugin>
-
- </plugins>
-
- </build>
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>runners-parent</artifactId>
+ <version>1.6.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-runner-parent</artifactId>
+ <version>0.4-SNAPSHOT</version>
+
+ <name>Flink Beam Runner</name>
+ <packaging>pom</packaging>
+
+ <inceptionYear>2015</inceptionYear>
+
+ <modules>
+ <module>runner</module>
+ <module>examples</module>
+ </modules>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <flink.version>1.0.0</flink.version>
+ <beam.version>1.6.0-SNAPSHOT</beam.version>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>apache.snapshots</id>
+ <name>Apache Development Snapshot Repository</name>
+ <url>https://repository.apache.org/content/repositories/snapshots/</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <build>
+
+ <pluginManagement>
+
+ <plugins>
+
+ <!-- Integration Tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.17</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <forkCount>1</forkCount>
+ <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine>
+ </configuration>
+ </plugin>
+
+ <!-- Unit Tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.17</version>
+ <configuration>
+ <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2.1</version>
+ <executions>
+ <execution>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+
+ </pluginManagement>
+
+
+ <plugins>
+ <!-- Java compiler -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+
+ <!-- Maven minimum version check -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.3.1</version>
+ <executions>
+ <execution>
+ <id>enforce-maven</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <requireJavaVersion>
+ <version>[1.7,)</version>
+ </requireJavaVersion>
+ <requireMavenVersion>
+ <!-- enforce at least mvn version 3.0.3 -->
+ <version>[3.0.3,)</version>
+ </requireMavenVersion>
+ </rules>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+
+
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
new file mode 100644
index 0000000..212b973
--- /dev/null
+++ b/runners/flink/runner/pom.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>flink-runner-parent</artifactId>
+ <version>0.4-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-runner_2.10</artifactId>
+ <version>0.4-SNAPSHOT</version>
+
+ <name>Flink Beam Runner Core</name>
+ <packaging>jar</packaging>
+
+ <inceptionYear>2015</inceptionYear>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+
+ <dependencies>
+ <!-- Flink dependencies -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_2.10</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <!-- Libraries not part of Flink which need to be included by the user (see flink-examples) -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro_2.10</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <!--- Dataflow -->
+ <dependency>
+ <groupId>com.google.cloud.dataflow</groupId>
+ <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
+ <version>${beam.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <version>1.0-rc2</version>
+ </dependency>
+ <!-- Test scoped -->
+ <dependency>
+ <groupId>com.google.cloud.dataflow</groupId>
+ <artifactId>google-cloud-dataflow-java-examples-all</artifactId>
+ <version>${beam.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_2.10</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <!-- Integration Tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
+
+ <!-- Unit Tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ </plugin>
+
+ </plugins>
+
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
new file mode 100644
index 0000000..8825ed3
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.runners.flink.translation.FlinkPipelineTranslator;
+import org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator;
+import org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * The class that instantiates and manages the execution of a given job.
+ * Depending on if the job is a Streaming or Batch processing one, it creates
+ * the adequate execution environment ({@link ExecutionEnvironment} or {@link StreamExecutionEnvironment}),
+ * the necessary {@link FlinkPipelineTranslator} ({@link FlinkBatchPipelineTranslator} or
+ * {@link FlinkStreamingPipelineTranslator})to transform the Beam job into a Flink one, and
+ * executes the (translated) job.
+ */
+public class FlinkPipelineExecutionEnvironment {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
+
+ private final FlinkPipelineOptions options;
+
+ /**
+ * The Flink Batch execution environment. This is instantiated to either a
+ * {@link org.apache.flink.api.java.CollectionEnvironment},
+ * a {@link org.apache.flink.api.java.LocalEnvironment} or
+ * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration
+ * options.
+ */
+ private ExecutionEnvironment flinkBatchEnv;
+
+
+ /**
+ * The Flink Streaming execution environment. This is instantiated to either a
+ * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
+ * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
+ * on the configuration options, and more specifically, the url of the master.
+ */
+ private StreamExecutionEnvironment flinkStreamEnv;
+
+ /**
+ * Translator for this FlinkPipelineRunner. Its role is to translate the Beam operators to
+ * their Flink counterparts. Based on the options provided by the user, if we have a streaming job,
+ * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. In other case, i.e. a batch job,
+ * a {@link FlinkBatchPipelineTranslator} is created.
+ */
+ private FlinkPipelineTranslator flinkPipelineTranslator;
+
+ /**
+ * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the
+ * provided {@link FlinkPipelineOptions}.
+ *
+ * @param options the user-defined pipeline options.
+ * */
+ public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
+ this.options = Preconditions.checkNotNull(options);
+ this.createPipelineExecutionEnvironment();
+ this.createPipelineTranslator();
+ }
+
+ /**
+ * Depending on the type of job (Streaming or Batch) and the user-specified options,
+ * this method creates the adequate ExecutionEnvironment.
+ */
+ private void createPipelineExecutionEnvironment() {
+ if (options.isStreaming()) {
+ createStreamExecutionEnvironment();
+ } else {
+ createBatchExecutionEnvironment();
+ }
+ }
+
+ /**
+ * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph
+ * translator. In the case of batch, it will work with {@link org.apache.flink.api.java.DataSet},
+ * while for streaming, it will work with {@link org.apache.flink.streaming.api.datastream.DataStream}.
+ */
+ private void createPipelineTranslator() {
+ checkInitializationState();
+ if (this.flinkPipelineTranslator != null) {
+ throw new IllegalStateException("FlinkPipelineTranslator already initialized.");
+ }
+
+ this.flinkPipelineTranslator = options.isStreaming() ?
+ new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
+ new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
+ }
+
+ /**
+ * Depending on if the job is a Streaming or a Batch one, this method creates
+ * the necessary execution environment and pipeline translator, and translates
+ * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program into
+ * a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream}
+ * one.
+ * */
+ public void translate(Pipeline pipeline) {
+ checkInitializationState();
+ if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
+ createPipelineExecutionEnvironment();
+ }
+ if (this.flinkPipelineTranslator == null) {
+ createPipelineTranslator();
+ }
+ this.flinkPipelineTranslator.translate(pipeline);
+ }
+
+ /**
+ * Launches the program execution.
+ * */
+ public JobExecutionResult executePipeline() throws Exception {
+ if (options.isStreaming()) {
+ if (this.flinkStreamEnv == null) {
+ throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
+ }
+ if (this.flinkPipelineTranslator == null) {
+ throw new RuntimeException("FlinkPipelineTranslator not initialized.");
+ }
+ return this.flinkStreamEnv.execute();
+ } else {
+ if (this.flinkBatchEnv == null) {
+ throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
+ }
+ if (this.flinkPipelineTranslator == null) {
+ throw new RuntimeException("FlinkPipelineTranslator not initialized.");
+ }
+ return this.flinkBatchEnv.execute();
+ }
+ }
+
+ /**
+ * If the submitted job is a batch processing job, this method creates the adequate
+ * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
+ * on the user-specified options.
+ */
+ private void createBatchExecutionEnvironment() {
+ if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+ throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
+ }
+
+ LOG.info("Creating the required Batch Execution Environment.");
+
+ String masterUrl = options.getFlinkMaster();
+ this.flinkStreamEnv = null;
+
+ // depending on the master, create the right environment.
+ if (masterUrl.equals("[local]")) {
+ this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+ } else if (masterUrl.equals("[collection]")) {
+ this.flinkBatchEnv = new CollectionEnvironment();
+ } else if (masterUrl.equals("[auto]")) {
+ this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+ } else if (masterUrl.matches(".*:\\d*")) {
+ String[] parts = masterUrl.split(":");
+ List<String> stagingFiles = options.getFilesToStage();
+ this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
+ Integer.parseInt(parts[1]),
+ stagingFiles.toArray(new String[stagingFiles.size()]));
+ } else {
+ LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+ this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ // set the correct parallelism.
+ if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) {
+ this.flinkBatchEnv.setParallelism(options.getParallelism());
+ }
+
+ // set parallelism in the options (required by some execution code)
+ options.setParallelism(flinkBatchEnv.getParallelism());
+ }
+
+ /**
+ * If the submitted job is a stream processing job, this method creates the adequate
+ * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
+ * on the user-specified options.
+ */
+ private void createStreamExecutionEnvironment() {
+ if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+ throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
+ }
+
+ LOG.info("Creating the required Streaming Environment.");
+
+ String masterUrl = options.getFlinkMaster();
+ this.flinkBatchEnv = null;
+
+ // depending on the master, create the right environment.
+ if (masterUrl.equals("[local]")) {
+ this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
+ } else if (masterUrl.equals("[auto]")) {
+ this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ } else if (masterUrl.matches(".*:\\d*")) {
+ String[] parts = masterUrl.split(":");
+ List<String> stagingFiles = options.getFilesToStage();
+ this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
+ Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()]));
+ } else {
+ LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
+ this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ }
+
+ // set the correct parallelism.
+ if (options.getParallelism() != -1) {
+ this.flinkStreamEnv.setParallelism(options.getParallelism());
+ }
+
+ // set parallelism in the options (required by some execution code)
+ options.setParallelism(flinkStreamEnv.getParallelism());
+
+ // default to event time
+ this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ // for the following 2 parameters, a value of -1 means that Flink will use
+ // the default values as specified in the configuration.
+ int numRetries = options.getNumberOfExecutionRetries();
+ if (numRetries != -1) {
+ this.flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
+ }
+ long retryDelay = options.getExecutionRetryDelay();
+ if (retryDelay != -1) {
+ this.flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
+ }
+
+ // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink).
+ // If the value is not -1, then the validity checks are applied.
+ // By default, checkpointing is disabled.
+ long checkpointInterval = options.getCheckpointingInterval();
+ if(checkpointInterval != -1) {
+ if (checkpointInterval < 1) {
+ throw new IllegalArgumentException("The checkpoint interval must be positive");
+ }
+ this.flinkStreamEnv.enableCheckpointing(checkpointInterval);
+ }
+ }
+
+ private void checkInitializationState() {
+ if (options.isStreaming() && this.flinkBatchEnv != null) {
+ throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment.");
+ } else if (!options.isStreaming() && this.flinkStreamEnv != null) {
+ throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
new file mode 100644
index 0000000..2f4b3ea
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.StreamingOptions;
+
+import java.util.List;
+
+/**
+ * Options which can be used to configure a Flink PipelineRunner.
+ */
+public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
+
+ /**
+ * List of local files to make available to workers.
+ * <p>
+ * Jars are placed on the worker's classpath.
+ * <p>
+ * The default value is the list of jars from the main program's classpath.
+ */
+ @Description("Jar-Files to send to all workers and put on the classpath. " +
+ "The default value is all files from the classpath.")
+ @JsonIgnore
+ List<String> getFilesToStage();
+ void setFilesToStage(List<String> value);
+
+ /**
+ * The job name is used to identify jobs running on a Flink cluster.
+ */
+ @Description("Dataflow job name, to uniquely identify active jobs. "
+ + "Defaults to using the ApplicationName-UserName-Date.")
+ @Default.InstanceFactory(DataflowPipelineOptions.JobNameFactory.class)
+ String getJobName();
+ void setJobName(String value);
+
+ /**
+ * The url of the Flink JobManager on which to execute pipelines. This can either be
+ * the the address of a cluster JobManager, in the form "host:port" or one of the special
+ * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink
+ * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while
+ * "[auto]" will let the system decide where to execute the pipeline based on the environment.
+ */
+ @Description("Address of the Flink Master where the Pipeline should be executed. Can" +
+ " either be of the form \"host:port\" or one of the special values [local], " +
+ "[collection] or [auto].")
+ String getFlinkMaster();
+ void setFlinkMaster(String value);
+
+ @Description("The degree of parallelism to be used when distributing operations onto workers.")
+ @Default.Integer(-1)
+ Integer getParallelism();
+ void setParallelism(Integer value);
+
+ @Description("The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for " +
+ "fault tolerance).")
+ @Default.Long(-1L)
+ Long getCheckpointingInterval();
+ void setCheckpointingInterval(Long interval);
+
+ @Description("Sets the number of times that failed tasks are re-executed. " +
+ "A value of zero effectively disables fault tolerance. A value of -1 indicates " +
+ "that the system default value (as defined in the configuration) should be used.")
+ @Default.Integer(-1)
+ Integer getNumberOfExecutionRetries();
+ void setNumberOfExecutionRetries(Integer retries);
+
+ @Description("Sets the delay between executions. A value of {@code -1} indicates that the default value should be used.")
+ @Default.Long(-1L)
+ Long getExecutionRetryDelay();
+ void setExecutionRetryDelay(Long delay);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
new file mode 100644
index 0000000..fe773d9
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the
+ * pipeline by first translating them to a Flink Plan and then executing them either locally
+ * or on a Flink cluster, depending on the configuration.
+ * <p>
+ * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}.
+ */
+public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class);
+
+ /**
+ * Provided options.
+ */
+ private final FlinkPipelineOptions options;
+
+ private final FlinkPipelineExecutionEnvironment flinkJobEnv;
+
+ /**
+ * Construct a runner from the provided options.
+ *
+ * @param options Properties which configure the runner.
+ * @return The newly created runner.
+ */
+ public static FlinkPipelineRunner fromOptions(PipelineOptions options) {
+ FlinkPipelineOptions flinkOptions =
+ PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
+ ArrayList<String> missing = new ArrayList<>();
+
+ if (flinkOptions.getAppName() == null) {
+ missing.add("appName");
+ }
+ if (missing.size() > 0) {
+ throw new IllegalArgumentException(
+ "Missing required values: " + Joiner.on(',').join(missing));
+ }
+
+ if (flinkOptions.getFilesToStage() == null) {
+ flinkOptions.setFilesToStage(detectClassPathResourcesToStage(
+ DataflowPipelineRunner.class.getClassLoader()));
+ LOG.info("PipelineOptions.filesToStage was not specified. "
+ + "Defaulting to files from the classpath: will stage {} files. "
+ + "Enable logging at DEBUG level to see which files will be staged.",
+ flinkOptions.getFilesToStage().size());
+ LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
+ }
+
+ // Verify jobName according to service requirements.
+ String jobName = flinkOptions.getJobName().toLowerCase();
+ Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), "JobName invalid; " +
+ "the name must consist of only the characters " + "[-a-z0-9], starting with a letter " +
+ "and ending with a letter " + "or number");
+ Preconditions.checkArgument(jobName.length() <= 40,
+ "JobName too long; must be no more than 40 characters in length");
+
+ // Set Flink Master to [auto] if no option was specified.
+ if (flinkOptions.getFlinkMaster() == null) {
+ flinkOptions.setFlinkMaster("[auto]");
+ }
+
+ return new FlinkPipelineRunner(flinkOptions);
+ }
+
+ private FlinkPipelineRunner(FlinkPipelineOptions options) {
+ this.options = options;
+ this.flinkJobEnv = new FlinkPipelineExecutionEnvironment(options);
+ }
+
+ @Override
+ public FlinkRunnerResult run(Pipeline pipeline) {
+ LOG.info("Executing pipeline using FlinkPipelineRunner.");
+
+ LOG.info("Translating pipeline to Flink program.");
+
+ this.flinkJobEnv.translate(pipeline);
+
+ LOG.info("Starting execution of Flink program.");
+
+ JobExecutionResult result;
+ try {
+ result = this.flinkJobEnv.executePipeline();
+ } catch (Exception e) {
+ LOG.error("Pipeline execution failed", e);
+ throw new RuntimeException("Pipeline execution failed", e);
+ }
+
+ LOG.info("Execution finished in {} msecs", result.getNetRuntime());
+
+ Map<String, Object> accumulators = result.getAllAccumulatorResults();
+ if (accumulators != null && !accumulators.isEmpty()) {
+ LOG.info("Final aggregator values:");
+
+ for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
+ LOG.info("{} : {}", entry.getKey(), entry.getValue());
+ }
+ }
+
+ return new FlinkRunnerResult(accumulators, result.getNetRuntime());
+ }
+
+ /**
+ * For testing.
+ */
+ public FlinkPipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ /**
+ * Constructs a runner with default properties for testing.
+ *
+ * @return The newly created runner.
+ */
+ public static FlinkPipelineRunner createForTest(boolean streaming) {
+ FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ // we use [auto] for testing since this will make it pick up the Testing
+ // ExecutionEnvironment
+ options.setFlinkMaster("[auto]");
+ options.setStreaming(streaming);
+ return new FlinkPipelineRunner(options);
+ }
+
+ @Override
+ public <Output extends POutput, Input extends PInput> Output apply(
+ PTransform<Input, Output> transform, Input input) {
+ return super.apply(transform, input);
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public String toString() {
+ return "DataflowPipelineRunner#" + hashCode();
+ }
+
+ /**
+ * Attempts to detect all the resources the class loader has access to. This does not recurse
+ * to class loader parents stopping it from pulling in resources from the system class loader.
+ *
+ * @param classLoader The URLClassLoader to use to detect resources to stage.
+ * @return A list of absolute paths to the resources the class loader uses.
+ * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
+ * of the resources the class loader exposes is not a file resource.
+ */
+ protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
+ if (!(classLoader instanceof URLClassLoader)) {
+ String message = String.format("Unable to use ClassLoader to detect classpath elements. "
+ + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
+ LOG.error(message);
+ throw new IllegalArgumentException(message);
+ }
+
+ List<String> files = new ArrayList<>();
+ for (URL url : ((URLClassLoader) classLoader).getURLs()) {
+ try {
+ files.add(new File(url.toURI()).getAbsolutePath());
+ } catch (IllegalArgumentException | URISyntaxException e) {
+ String message = String.format("Unable to convert url (%s) to file.", url);
+ LOG.error(message);
+ throw new IllegalArgumentException(message, e);
+ }
+ }
+ return files;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
new file mode 100644
index 0000000..8fd08ec
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException;
+import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Result of executing a {@link com.google.cloud.dataflow.sdk.Pipeline} with Flink. This
+ * has methods to query to job runtime and the final values of
+ * {@link com.google.cloud.dataflow.sdk.transforms.Aggregator}s.
+ */
+public class FlinkRunnerResult implements PipelineResult {
+
+ private final Map<String, Object> aggregators;
+
+ private final long runtime;
+
+ public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
+ this.aggregators = (aggregators == null || aggregators.isEmpty()) ?
+ Collections.<String, Object>emptyMap() :
+ Collections.unmodifiableMap(aggregators);
+
+ this.runtime = runtime;
+ }
+
+ @Override
+ public State getState() {
+ return null;
+ }
+
+ @Override
+ public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) throws AggregatorRetrievalException {
+ // TODO provide a list of all accumulator step values
+ Object value = aggregators.get(aggregator.getName());
+ if (value != null) {
+ return new AggregatorValues<T>() {
+ @Override
+ public Map<String, T> getValuesAtSteps() {
+ return (Map<String, T>) aggregators;
+ }
+ };
+ } else {
+ throw new AggregatorRetrievalException("Accumulator results not found.",
+ new RuntimeException("Accumulator does not exist."));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
new file mode 100644
index 0000000..71e3b54
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.io;
+
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PDone;
+
+/**
+ * Transform for printing the contents of a {@link com.google.cloud.dataflow.sdk.values.PCollection}.
+ * to standard output.
+ *
+ * This is Flink-specific and will only work when executed using the
+ * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}.
+ */
+public class ConsoleIO {
+
+ /**
+ * A PTransform that writes a PCollection to a standard output.
+ */
+ public static class Write {
+
+ /**
+ * Returns a ConsoleIO.Write PTransform with a default step name.
+ */
+ public static Bound create() {
+ return new Bound();
+ }
+
+ /**
+ * Returns a ConsoleIO.Write PTransform with the given step name.
+ */
+ public static Bound named(String name) {
+ return new Bound().named(name);
+ }
+
+ /**
+ * A PTransform that writes a bounded PCollection to standard output.
+ */
+ public static class Bound extends PTransform<PCollection<?>, PDone> {
+ private static final long serialVersionUID = 0;
+
+ Bound() {
+ super("ConsoleIO.Write");
+ }
+
+ Bound(String name) {
+ super(name);
+ }
+
+ /**
+ * Returns a new ConsoleIO.Write PTransform that's like this one but with the given
+ * step
+ * name. Does not modify this object.
+ */
+ public Bound named(String name) {
+ return new Bound(name);
+ }
+
+ @Override
+ public PDone apply(PCollection<?> input) {
+ return PDone.in(input.getPipeline());
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
new file mode 100644
index 0000000..28a10b7
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs.
+ * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator}
+ */
+public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class);
+
+ /**
+ * The necessary context in the case of a batch job.
+ */
+ private final FlinkBatchTranslationContext batchContext;
+
+ private int depth = 0;
+
+ /**
+ * Composite transform that we want to translate before proceeding with other transforms.
+ */
+ private PTransform<?, ?> currentCompositeTransform;
+
+ public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) {
+ this.batchContext = new FlinkBatchTranslationContext(env, options);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Pipeline Visitor Methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void enterCompositeTransform(TransformTreeNode node) {
+ LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
+
+ PTransform<?, ?> transform = node.getTransform();
+ if (transform != null && currentCompositeTransform == null) {
+
+ BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+ if (translator != null) {
+ currentCompositeTransform = transform;
+ if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) {
+ // we can only optimize CoGroupByKey for input size 2
+ currentCompositeTransform = null;
+ }
+ }
+ }
+ this.depth++;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformTreeNode node) {
+ PTransform<?, ?> transform = node.getTransform();
+ if (transform != null && currentCompositeTransform == transform) {
+
+ BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+ if (translator != null) {
+ LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
+ applyBatchTransform(transform, node, translator);
+ currentCompositeTransform = null;
+ } else {
+ throw new IllegalStateException("Attempted to translate composite transform " +
+ "but no translator was found: " + currentCompositeTransform);
+ }
+ }
+ this.depth--;
+ LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
+ }
+
+ @Override
+ public void visitTransform(TransformTreeNode node) {
+ LOG.info(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
+ if (currentCompositeTransform != null) {
+ // ignore it
+ return;
+ }
+
+ // get the transformation corresponding to hte node we are
+ // currently visiting and translate it into its Flink alternative.
+
+ PTransform<?, ?> transform = node.getTransform();
+ BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
+ if (translator == null) {
+ LOG.info(node.getTransform().getClass().toString());
+ throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
+ }
+ applyBatchTransform(transform, node, translator);
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformTreeNode producer) {
+ // do nothing here
+ }
+
+ private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) {
+
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+
+ @SuppressWarnings("unchecked")
+ BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
+
+ // create the applied PTransform on the batchContext
+ batchContext.setCurrentTransform(AppliedPTransform.of(
+ node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform));
+ typedTranslator.translateNode(typedTransform, batchContext);
+ }
+
+ /**
+ * A translator of a {@link PTransform}.
+ */
+ public interface BatchTransformTranslator<Type extends PTransform> {
+ void translateNode(Type transform, FlinkBatchTranslationContext context);
+ }
+
+ private static String genSpaces(int n) {
+ String s = "";
+ for (int i = 0; i < n; i++) {
+ s += "| ";
+ }
+ return s;
+ }
+
+ private static String formatNodeName(TransformTreeNode node) {
+ return node.toString().split("@")[1] + node.getTransform();
+ }
+}