You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/05/20 07:15:25 UTC

[07/14] incubator-beam git commit: Add TestFlinkPipelineRunner to FlinkRunnerRegistrar

Add TestFlinkPipelineRunner to FlinkRunnerRegistrar

This makes the runner available for selection by integration tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/58d66a34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/58d66a34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/58d66a34

Branch: refs/heads/master
Commit: 58d66a344985eecc9cc3f43c0ecd5dbc7b4fb2e6
Parents: dc98211
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 2 13:11:12 2016 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri May 20 08:08:24 2016 +0200

----------------------------------------------------------------------
 .../beam/runners/flink/FlinkPipelineRunner.java | 16 +----
 .../runners/flink/FlinkRunnerRegistrar.java     |  4 +-
 .../runners/flink/TestFlinkPipelineRunner.java  | 66 ++++++++++++++++++++
 .../beam/runners/flink/FlinkTestPipeline.java   |  2 +-
 4 files changed, 71 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58d66a34/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index 3edf6f3..b5ffced 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -108,7 +108,7 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
     this.flinkJobEnv.translate(pipeline);
 
     LOG.info("Starting execution of Flink program.");
-    
+
     JobExecutionResult result;
     try {
       result = this.flinkJobEnv.executePipeline();
@@ -138,20 +138,6 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
     return options;
   }
 
-  /**
-   * Constructs a runner with default properties for testing.
-   *
-   * @return The newly created runner.
-   */
-  public static FlinkPipelineRunner createForTest(boolean streaming) {
-    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
-    // we use [auto] for testing since this will make it pick up the Testing
-    // ExecutionEnvironment
-    options.setFlinkMaster("[auto]");
-    options.setStreaming(streaming);
-    return new FlinkPipelineRunner(options);
-  }
-
   @Override
   public <Output extends POutput, Input extends PInput> Output apply(
       PTransform<Input, Output> transform, Input input) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58d66a34/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
index cd99f4e..ec61805 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
@@ -41,7 +41,9 @@ public class FlinkRunnerRegistrar {
   public static class Runner implements PipelineRunnerRegistrar {
     @Override
     public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
-      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(FlinkPipelineRunner.class);
+      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+          FlinkPipelineRunner.class,
+          TestFlinkPipelineRunner.class);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58d66a34/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
new file mode 100644
index 0000000..24883c8
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+public class TestFlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
+
+  private FlinkPipelineRunner delegate;
+
+  private TestFlinkPipelineRunner(FlinkPipelineOptions options) {
+    // We use [auto] for testing since this will make it pick up the Testing ExecutionEnvironment
+    options.setFlinkMaster("[auto]");
+    this.delegate = FlinkPipelineRunner.fromOptions(options);
+  }
+
+  public static TestFlinkPipelineRunner fromOptions(PipelineOptions options) {
+    FlinkPipelineOptions flinkOptions = PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
+    return new TestFlinkPipelineRunner(flinkOptions);
+  }
+
+  public static TestFlinkPipelineRunner create(boolean streaming) {
+    FlinkPipelineOptions flinkOptions = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    flinkOptions.setStreaming(streaming);
+    return TestFlinkPipelineRunner.fromOptions(flinkOptions);
+  }
+
+  @Override
+  public <OutputT extends POutput, InputT extends PInput>
+      OutputT apply(PTransform<InputT,OutputT> transform, InputT input) {
+    return delegate.apply(transform, input);
+  }
+
+  @Override
+  public FlinkRunnerResult run(Pipeline pipeline) {
+    return delegate.run(pipeline);
+  }
+
+  public PipelineOptions getPipelineOptions() {
+    return delegate.getPipelineOptions();
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58d66a34/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
index f015a66..edde925 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
@@ -60,7 +60,7 @@ public class FlinkTestPipeline extends Pipeline {
    * @return The Test Pipeline.
    */
   private static FlinkTestPipeline create(boolean streaming) {
-    FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming);
+    TestFlinkPipelineRunner flinkRunner = TestFlinkPipelineRunner.create(streaming);
     return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
   }