You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:40 UTC

[47/50] [abbrv] incubator-beam git commit: Rename FlinkPipelineRunner to FlinkRunner

Rename FlinkPipelineRunner to FlinkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b417680
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b417680
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b417680

Branch: refs/heads/python-sdk
Commit: 4b417680d09da9f9f1c990a4f235572689efdf93
Parents: d285e67
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 17 09:57:25 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jun 20 15:14:31 2016 -0700

----------------------------------------------------------------------
 README.md                                       |   2 +-
 runners/flink/README.md                         |   4 +-
 .../beam/runners/flink/examples/TFIDF.java      |   4 +-
 .../beam/runners/flink/examples/WordCount.java  |   4 +-
 .../flink/examples/streaming/AutoComplete.java  |   4 +-
 .../flink/examples/streaming/JoinExamples.java  |   4 +-
 .../examples/streaming/KafkaIOExamples.java     |   4 +-
 .../KafkaWindowedWordCountExample.java          |   4 +-
 .../examples/streaming/WindowedWordCount.java   |   4 +-
 runners/flink/runner/pom.xml                    |   4 +-
 .../beam/runners/flink/FlinkPipelineRunner.java | 180 -------------------
 .../apache/beam/runners/flink/FlinkRunner.java  | 179 ++++++++++++++++++
 .../runners/flink/FlinkRunnerRegistrar.java     |   4 +-
 .../runners/flink/TestFlinkPipelineRunner.java  |  81 ---------
 .../beam/runners/flink/TestFlinkRunner.java     |  81 +++++++++
 .../streaming/io/UnboundedFlinkSource.java      |   6 +-
 .../streaming/state/FlinkStateInternals.java    |   2 +-
 .../runners/flink/FlinkRunnerRegistrarTest.java |   8 +-
 .../beam/runners/flink/FlinkTestPipeline.java   |   4 +-
 19 files changed, 291 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index b34d933..c4a9155 100644
--- a/README.md
+++ b/README.md
@@ -68,7 +68,7 @@ Beam supports executing programs on multiple distributed processing backends thr
 
 - The `DirectRunner` runs the pipeline on your local machine.
 - The `DataflowPipelineRunner` submits the pipeline to the [Google Cloud Dataflow](http://cloud.google.com/dataflow/).
-- The `FlinkPipelineRunner` runs the pipeline on an Apache Flink cluster. The code has been donated from [dataArtisans/flink-dataflow](https://github.com/dataArtisans/flink-dataflow) and is now part of Beam.
+- The `FlinkRunner` runs the pipeline on an Apache Flink cluster. The code has been donated from [dataArtisans/flink-dataflow](https://github.com/dataArtisans/flink-dataflow) and is now part of Beam.
 - The `SparkPipelineRunner` runs the pipeline on an Apache Spark cluster. The code has been donated from [cloudera/spark-dataflow](https://github.com/cloudera/spark-dataflow) and is now part of Beam.
 
 Have ideas for new Runners? See the [Jira](https://issues.apache.org/jira/browse/BEAM/component/12328916/).

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/README.md
----------------------------------------------------------------------
diff --git a/runners/flink/README.md b/runners/flink/README.md
index 69e2abb..3348119 100644
--- a/runners/flink/README.md
+++ b/runners/flink/README.md
@@ -107,14 +107,14 @@ Flink-Runner is now installed in your local maven repository.
 
 Next, let's run the classic WordCount example. It's semantically identically to
 the example provided with Apache Beam. Only this time, we chose the
-`FlinkPipelineRunner` to execute the WordCount on top of Flink.
+`FlinkRunner` to execute the WordCount on top of Flink.
 
 Here's an excerpt from the WordCount class file:
 
 ```java
 Options options = PipelineOptionsFactory.fromArgs(args).as(Options.class);
 // yes, we want to run WordCount with Flink
-options.setRunner(FlinkPipelineRunner.class);
+options.setRunner(FlinkRunner.class);
 
 Pipeline p = Pipeline.create(options);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 876ecde..af920aa 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.flink.examples;
 
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.runners.flink.FlinkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -439,7 +439,7 @@ public class TFIDF {
   public static void main(String[] args) throws Exception {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
 
-    options.setRunner(FlinkPipelineRunner.class);
+    options.setRunner(FlinkRunner.class);
 
     Pipeline pipeline = Pipeline.create(options);
     pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index 702fb63..2817622 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.flink.examples;
 
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.runners.flink.FlinkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.Default;
@@ -105,7 +105,7 @@ public class WordCount {
 
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation()
         .as(Options.class);
-    options.setRunner(FlinkPipelineRunner.class);
+    options.setRunner(FlinkRunner.class);
 
     Pipeline p = Pipeline.create(options);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index d83e662..9299955 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.flink.examples.streaming;
 
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.runners.flink.FlinkRunner;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AvroCoder;
@@ -378,7 +378,7 @@ public class AutoComplete {
     options.setCheckpointingInterval(1000L);
     options.setNumberOfExecutionRetries(5);
     options.setExecutionRetryDelay(3000L);
-    options.setRunner(FlinkPipelineRunner.class);
+    options.setRunner(FlinkRunner.class);
 
     PTransform<? super PBegin, PCollection<String>> readSource =
             Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
index d3e963d..b447a20 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.flink.examples.streaming;
 
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.runners.flink.FlinkRunner;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
@@ -133,7 +133,7 @@ public class JoinExamples {
     options.setCheckpointingInterval(1000L);
     options.setNumberOfExecutionRetries(5);
     options.setExecutionRetryDelay(3000L);
-    options.setRunner(FlinkPipelineRunner.class);
+    options.setRunner(FlinkRunner.class);
 
     PTransform<? super PBegin, PCollection<String>> readSourceA =
         Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
index af6bb35..8756abe 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.flink.examples.streaming;
 
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.runners.flink.FlinkRunner;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
 import org.apache.beam.sdk.Pipeline;
@@ -289,7 +289,7 @@ public class KafkaIOExamples {
         PipelineOptionsFactory.fromArgs(args).as(KafkaOptions.class);
 
     options.setStreaming(true);
-    options.setRunner(FlinkPipelineRunner.class);
+    options.setRunner(FlinkRunner.class);
 
     options.setCheckpointingInterval(1000L);
     options.setNumberOfExecutionRetries(5);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index abb9fea..b14c5ae 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.flink.examples.streaming;
 
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.runners.flink.FlinkRunner;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
@@ -115,7 +115,7 @@ public class KafkaWindowedWordCountExample {
     options.setCheckpointingInterval(1000L);
     options.setNumberOfExecutionRetries(5);
     options.setExecutionRetryDelay(3000L);
-    options.setRunner(FlinkPipelineRunner.class);
+    options.setRunner(FlinkRunner.class);
 
     System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() );
     Pipeline pipeline = Pipeline.create(options);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index e803e6e..f72b705 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.flink.examples.streaming;
 
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import org.apache.beam.runners.flink.FlinkRunner;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
@@ -111,7 +111,7 @@ public class WindowedWordCount {
     options.setCheckpointingInterval(1000L);
     options.setNumberOfExecutionRetries(5);
     options.setExecutionRetryDelay(3000L);
-    options.setRunner(FlinkPipelineRunner.class);
+    options.setRunner(FlinkRunner.class);
 
     LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() +
         " sec. and a slide of " + options.getSlide());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 33c13bf..dd32063 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -194,7 +194,7 @@
               <systemPropertyVariables>
                 <beamTestPipelineOptions>
                   [
-                    "--runner=org.apache.beam.runners.flink.TestFlinkPipelineRunner",
+                    "--runner=TestFlinkRunner",
                     "--streaming=false"
                   ]
                 </beamTestPipelineOptions>
@@ -218,7 +218,7 @@
               <systemPropertyVariables>
                 <beamTestPipelineOptions>
                   [
-                    "--runner=org.apache.beam.runners.flink.TestFlinkPipelineRunner",
+                    "--runner=TestFlinkRunner",
                     "--streaming=true"
                   ]
                 </beamTestPipelineOptions>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
deleted file mode 100644
index 46a4fce..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.common.base.Joiner;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A {@link PipelineRunner} that executes the operations in the
- * pipeline by first translating them to a Flink Plan and then executing them either locally
- * or on a Flink cluster, depending on the configuration.
- * <p>
- */
-public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class);
-
-  /**
-   * Provided options.
-   */
-  private final FlinkPipelineOptions options;
-
-  /**
-   * Construct a runner from the provided options.
-   *
-   * @param options Properties which configure the runner.
-   * @return The newly created runner.
-   */
-  public static FlinkPipelineRunner fromOptions(PipelineOptions options) {
-    FlinkPipelineOptions flinkOptions =
-        PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
-    ArrayList<String> missing = new ArrayList<>();
-
-    if (flinkOptions.getAppName() == null) {
-      missing.add("appName");
-    }
-    if (missing.size() > 0) {
-      throw new IllegalArgumentException(
-          "Missing required values: " + Joiner.on(',').join(missing));
-    }
-
-    if (flinkOptions.getFilesToStage() == null) {
-      flinkOptions.setFilesToStage(detectClassPathResourcesToStage(
-          FlinkPipelineRunner.class.getClassLoader()));
-      LOG.info("PipelineOptions.filesToStage was not specified. "
-              + "Defaulting to files from the classpath: will stage {} files. "
-              + "Enable logging at DEBUG level to see which files will be staged.",
-          flinkOptions.getFilesToStage().size());
-      LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
-    }
-
-    // Set Flink Master to [auto] if no option was specified.
-    if (flinkOptions.getFlinkMaster() == null) {
-      flinkOptions.setFlinkMaster("[auto]");
-    }
-
-    return new FlinkPipelineRunner(flinkOptions);
-  }
-
-  private FlinkPipelineRunner(FlinkPipelineOptions options) {
-    this.options = options;
-  }
-
-  @Override
-  public FlinkRunnerResult run(Pipeline pipeline) {
-    LOG.info("Executing pipeline using FlinkPipelineRunner.");
-
-    FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);
-
-    LOG.info("Translating pipeline to Flink program.");
-    env.translate(pipeline);
-
-    JobExecutionResult result;
-    try {
-      LOG.info("Starting execution of Flink program.");
-      result = env.executePipeline();
-    } catch (Exception e) {
-      LOG.error("Pipeline execution failed", e);
-      throw new RuntimeException("Pipeline execution failed", e);
-    }
-
-    LOG.info("Execution finished in {} msecs", result.getNetRuntime());
-
-    Map<String, Object> accumulators = result.getAllAccumulatorResults();
-    if (accumulators != null && !accumulators.isEmpty()) {
-      LOG.info("Final aggregator values:");
-
-      for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
-        LOG.info("{} : {}", entry.getKey(), entry.getValue());
-      }
-    }
-
-    return new FlinkRunnerResult(accumulators, result.getNetRuntime());
-  }
-
-  /**
-   * For testing.
-   */
-  public FlinkPipelineOptions getPipelineOptions() {
-    return options;
-  }
-
-  @Override
-  public <Output extends POutput, Input extends PInput> Output apply(
-      PTransform<Input, Output> transform, Input input) {
-    return super.apply(transform, input);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  @Override
-  public String toString() {
-    return "DataflowRunner#" + hashCode();
-  }
-
-  /**
-   * Attempts to detect all the resources the class loader has access to. This does not recurse
-   * to class loader parents stopping it from pulling in resources from the system class loader.
-   *
-   * @param classLoader The URLClassLoader to use to detect resources to stage.
-   * @return A list of absolute paths to the resources the class loader uses.
-   * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
-   *                                  of the resources the class loader exposes is not a file resource.
-   */
-  protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
-    if (!(classLoader instanceof URLClassLoader)) {
-      String message = String.format("Unable to use ClassLoader to detect classpath elements. "
-          + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
-      LOG.error(message);
-      throw new IllegalArgumentException(message);
-    }
-
-    List<String> files = new ArrayList<>();
-    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
-      try {
-        files.add(new File(url.toURI()).getAbsolutePath());
-      } catch (IllegalArgumentException | URISyntaxException e) {
-        String message = String.format("Unable to convert url (%s) to file.", url);
-        LOG.error(message);
-        throw new IllegalArgumentException(message, e);
-      }
-    }
-    return files;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
new file mode 100644
index 0000000..d8c5c12
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import com.google.common.base.Joiner;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the
+ * pipeline by first translating them to a Flink Plan and then executing them either locally
+ * or on a Flink cluster, depending on the configuration.
+ * <p>
+ */
+public class FlinkRunner extends PipelineRunner<FlinkRunnerResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkRunner.class);
+
+  /**
+   * Provided options.
+   */
+  private final FlinkPipelineOptions options;
+
+  /**
+   * Construct a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static FlinkRunner fromOptions(PipelineOptions options) {
+    FlinkPipelineOptions flinkOptions =
+        PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
+    ArrayList<String> missing = new ArrayList<>();
+
+    if (flinkOptions.getAppName() == null) {
+      missing.add("appName");
+    }
+    if (missing.size() > 0) {
+      throw new IllegalArgumentException(
+          "Missing required values: " + Joiner.on(',').join(missing));
+    }
+
+    if (flinkOptions.getFilesToStage() == null) {
+      flinkOptions.setFilesToStage(detectClassPathResourcesToStage(
+          FlinkRunner.class.getClassLoader()));
+      LOG.info("PipelineOptions.filesToStage was not specified. "
+              + "Defaulting to files from the classpath: will stage {} files. "
+              + "Enable logging at DEBUG level to see which files will be staged.",
+          flinkOptions.getFilesToStage().size());
+      LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
+    }
+
+    // Set Flink Master to [auto] if no option was specified.
+    if (flinkOptions.getFlinkMaster() == null) {
+      flinkOptions.setFlinkMaster("[auto]");
+    }
+
+    return new FlinkRunner(flinkOptions);
+  }
+
+  private FlinkRunner(FlinkPipelineOptions options) {
+    this.options = options;
+  }
+
+  @Override
+  public FlinkRunnerResult run(Pipeline pipeline) {
+    LOG.info("Executing pipeline using FlinkRunner.");
+
+    FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);
+
+    LOG.info("Translating pipeline to Flink program.");
+    env.translate(pipeline);
+
+    JobExecutionResult result;
+    try {
+      LOG.info("Starting execution of Flink program.");
+      result = env.executePipeline();
+    } catch (Exception e) {
+      LOG.error("Pipeline execution failed", e);
+      throw new RuntimeException("Pipeline execution failed", e);
+    }
+
+    LOG.info("Execution finished in {} msecs", result.getNetRuntime());
+
+    Map<String, Object> accumulators = result.getAllAccumulatorResults();
+    if (accumulators != null && !accumulators.isEmpty()) {
+      LOG.info("Final aggregator values:");
+
+      for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
+        LOG.info("{} : {}", entry.getKey(), entry.getValue());
+      }
+    }
+
+    return new FlinkRunnerResult(accumulators, result.getNetRuntime());
+  }
+
+  /**
+   * For testing.
+   */
+  public FlinkPipelineOptions getPipelineOptions() {
+    return options;
+  }
+
+  @Override
+  public <Output extends POutput, Input extends PInput> Output apply(
+      PTransform<Input, Output> transform, Input input) {
+    return super.apply(transform, input);
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public String toString() {
+    return "DataflowRunner#" + hashCode();
+  }
+
+  /**
+   * Attempts to detect all the resources the class loader has access to. This does not recurse
+   * to class loader parents stopping it from pulling in resources from the system class loader.
+   *
+   * @param classLoader The URLClassLoader to use to detect resources to stage.
+   * @return A list of absolute paths to the resources the class loader uses.
+   * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
+   *                                  of the resources the class loader exposes is not a file resource.
+   */
+  protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
+    if (!(classLoader instanceof URLClassLoader)) {
+      String message = String.format("Unable to use ClassLoader to detect classpath elements. "
+          + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
+      LOG.error(message);
+      throw new IllegalArgumentException(message);
+    }
+
+    List<String> files = new ArrayList<>();
+    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
+      try {
+        files.add(new File(url.toURI()).getAbsolutePath());
+      } catch (IllegalArgumentException | URISyntaxException e) {
+        String message = String.format("Unable to convert url (%s) to file.", url);
+        LOG.error(message);
+        throw new IllegalArgumentException(message, e);
+      }
+    }
+    return files;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
index ec61805..52b2a8d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
@@ -42,8 +42,8 @@ public class FlinkRunnerRegistrar {
     @Override
     public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
       return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
-          FlinkPipelineRunner.class,
-          TestFlinkPipelineRunner.class);
+          FlinkRunner.class,
+          TestFlinkRunner.class);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
deleted file mode 100644
index a55acb7..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import org.apache.flink.runtime.client.JobExecutionException;
-
-public class TestFlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
-
-  private FlinkPipelineRunner delegate;
-
-  private TestFlinkPipelineRunner(FlinkPipelineOptions options) {
-    // We use [auto] for testing since this will make it pick up the Testing ExecutionEnvironment
-    options.setFlinkMaster("[auto]");
-    this.delegate = FlinkPipelineRunner.fromOptions(options);
-  }
-
-  public static TestFlinkPipelineRunner fromOptions(PipelineOptions options) {
-    FlinkPipelineOptions flinkOptions = PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
-    return new TestFlinkPipelineRunner(flinkOptions);
-  }
-
-  public static TestFlinkPipelineRunner create(boolean streaming) {
-    FlinkPipelineOptions flinkOptions = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
-    flinkOptions.setRunner(TestFlinkPipelineRunner.class);
-    flinkOptions.setStreaming(streaming);
-    return TestFlinkPipelineRunner.fromOptions(flinkOptions);
-  }
-
-  @Override
-  public <OutputT extends POutput, InputT extends PInput>
-      OutputT apply(PTransform<InputT,OutputT> transform, InputT input) {
-    return delegate.apply(transform, input);
-  }
-
-  @Override
-  public FlinkRunnerResult run(Pipeline pipeline) {
-    try {
-      return delegate.run(pipeline);
-    } catch (RuntimeException e) {
-      // Special case hack to pull out assertion errors from PAssert; instead there should
-      // probably be a better story along the lines of UserCodeException.
-      if (e.getCause() != null
-          && e.getCause() instanceof JobExecutionException
-          && e.getCause().getCause() instanceof AssertionError) {
-          throw (AssertionError) e.getCause().getCause();
-      } else {
-        throw e;
-      }
-    }
-  }
-
-  public PipelineOptions getPipelineOptions() {
-    return delegate.getPipelineOptions();
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
new file mode 100644
index 0000000..460933f
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import org.apache.flink.runtime.client.JobExecutionException;
+
+public class TestFlinkRunner extends PipelineRunner<FlinkRunnerResult> {
+
+  private FlinkRunner delegate;
+
+  private TestFlinkRunner(FlinkPipelineOptions options) {
+    // We use [auto] for testing since this will make it pick up the Testing ExecutionEnvironment
+    options.setFlinkMaster("[auto]");
+    this.delegate = FlinkRunner.fromOptions(options);
+  }
+
+  public static TestFlinkRunner fromOptions(PipelineOptions options) {
+    FlinkPipelineOptions flinkOptions = PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
+    return new TestFlinkRunner(flinkOptions);
+  }
+
+  public static TestFlinkRunner create(boolean streaming) {
+    FlinkPipelineOptions flinkOptions = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    flinkOptions.setRunner(TestFlinkRunner.class);
+    flinkOptions.setStreaming(streaming);
+    return TestFlinkRunner.fromOptions(flinkOptions);
+  }
+
+  @Override
+  public <OutputT extends POutput, InputT extends PInput>
+      OutputT apply(PTransform<InputT,OutputT> transform, InputT input) {
+    return delegate.apply(transform, input);
+  }
+
+  @Override
+  public FlinkRunnerResult run(Pipeline pipeline) {
+    try {
+      return delegate.run(pipeline);
+    } catch (RuntimeException e) {
+      // Special case hack to pull out assertion errors from PAssert; instead there should
+      // probably be a better story along the lines of UserCodeException.
+      if (e.getCause() != null
+          && e.getCause() instanceof JobExecutionException
+          && e.getCause().getCause() instanceof AssertionError) {
+          throw (AssertionError) e.getCause().getCause();
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  public PipelineOptions getPipelineOptions() {
+    return delegate.getPipelineOptions();
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
index a157b46..b636036 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -51,18 +51,18 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
 
   @Override
   public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
-    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+    throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner.");
   }
 
   @Override
   public UnboundedReader<T> createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) {
-    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+    throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner.");
   }
 
   @Nullable
   @Override
   public Coder<UnboundedSource.CheckpointMark> getCheckpointMarkCoder() {
-    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+    throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner.");
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index 352c550..18d4c3c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -86,7 +86,7 @@ public class FlinkStateInternals<K> implements StateInternals<K> {
 
   /**
    * This is the interface state has to implement in order for it to be fault tolerant when
-   * executed by the FlinkPipelineRunner.
+   * executed by the FlinkRunner.
    */
   private interface CheckpointableIF {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
index bd149c7..ff1025f 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
@@ -33,17 +33,17 @@ public class FlinkRunnerRegistrarTest {
   @Test
   public void testFullName() {
     String[] args =
-        new String[] {String.format("--runner=%s", FlinkPipelineRunner.class.getName())};
+        new String[] {String.format("--runner=%s", FlinkRunner.class.getName())};
     PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
-    assertEquals(opts.getRunner(), FlinkPipelineRunner.class);
+    assertEquals(opts.getRunner(), FlinkRunner.class);
   }
 
   @Test
   public void testClassName() {
     String[] args =
-        new String[] {String.format("--runner=%s", FlinkPipelineRunner.class.getSimpleName())};
+        new String[] {String.format("--runner=%s", FlinkRunner.class.getSimpleName())};
     PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
-    assertEquals(opts.getRunner(), FlinkPipelineRunner.class);
+    assertEquals(opts.getRunner(), FlinkRunner.class);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b417680/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
index edde925..9f7bc00 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 
 /**
  * {@link org.apache.beam.sdk.Pipeline} for testing Dataflow programs on the
- * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}.
+ * {@link FlinkRunner}.
  */
 public class FlinkTestPipeline extends Pipeline {
 
@@ -60,7 +60,7 @@ public class FlinkTestPipeline extends Pipeline {
    * @return The Test Pipeline.
    */
   private static FlinkTestPipeline create(boolean streaming) {
-    TestFlinkPipelineRunner flinkRunner = TestFlinkPipelineRunner.create(streaming);
+    TestFlinkRunner flinkRunner = TestFlinkRunner.create(streaming);
     return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
   }