You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/05/20 07:15:25 UTC
[07/14] incubator-beam git commit: Add TestFlinkPipelineRunner to
FlinkRunnerRegistrar
Add TestFlinkPipelineRunner to FlinkRunnerRegistrar
This makes the runner available for selection by integration tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/58d66a34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/58d66a34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/58d66a34
Branch: refs/heads/master
Commit: 58d66a344985eecc9cc3f43c0ecd5dbc7b4fb2e6
Parents: dc98211
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 2 13:11:12 2016 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri May 20 08:08:24 2016 +0200
----------------------------------------------------------------------
.../beam/runners/flink/FlinkPipelineRunner.java | 16 +----
.../runners/flink/FlinkRunnerRegistrar.java | 4 +-
.../runners/flink/TestFlinkPipelineRunner.java | 66 ++++++++++++++++++++
.../beam/runners/flink/FlinkTestPipeline.java | 2 +-
4 files changed, 71 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58d66a34/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index 3edf6f3..b5ffced 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -108,7 +108,7 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
this.flinkJobEnv.translate(pipeline);
LOG.info("Starting execution of Flink program.");
-
+
JobExecutionResult result;
try {
result = this.flinkJobEnv.executePipeline();
@@ -138,20 +138,6 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
return options;
}
- /**
- * Constructs a runner with default properties for testing.
- *
- * @return The newly created runner.
- */
- public static FlinkPipelineRunner createForTest(boolean streaming) {
- FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
- // we use [auto] for testing since this will make it pick up the Testing
- // ExecutionEnvironment
- options.setFlinkMaster("[auto]");
- options.setStreaming(streaming);
- return new FlinkPipelineRunner(options);
- }
-
@Override
public <Output extends POutput, Input extends PInput> Output apply(
PTransform<Input, Output> transform, Input input) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58d66a34/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
index cd99f4e..ec61805 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java
@@ -41,7 +41,9 @@ public class FlinkRunnerRegistrar {
public static class Runner implements PipelineRunnerRegistrar {
@Override
public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
- return ImmutableList.<Class<? extends PipelineRunner<?>>>of(FlinkPipelineRunner.class);
+ return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+ FlinkPipelineRunner.class,
+ TestFlinkPipelineRunner.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58d66a34/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
new file mode 100644
index 0000000..24883c8
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkPipelineRunner.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+public class TestFlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
+
+ private FlinkPipelineRunner delegate;
+
+ private TestFlinkPipelineRunner(FlinkPipelineOptions options) {
+ // We use [auto] for testing since this will make it pick up the Testing ExecutionEnvironment
+ options.setFlinkMaster("[auto]");
+ this.delegate = FlinkPipelineRunner.fromOptions(options);
+ }
+
+ public static TestFlinkPipelineRunner fromOptions(PipelineOptions options) {
+ FlinkPipelineOptions flinkOptions = PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
+ return new TestFlinkPipelineRunner(flinkOptions);
+ }
+
+ public static TestFlinkPipelineRunner create(boolean streaming) {
+ FlinkPipelineOptions flinkOptions = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ flinkOptions.setStreaming(streaming);
+ return TestFlinkPipelineRunner.fromOptions(flinkOptions);
+ }
+
+ @Override
+ public <OutputT extends POutput, InputT extends PInput>
+ OutputT apply(PTransform<InputT,OutputT> transform, InputT input) {
+ return delegate.apply(transform, input);
+ }
+
+ @Override
+ public FlinkRunnerResult run(Pipeline pipeline) {
+ return delegate.run(pipeline);
+ }
+
+ public PipelineOptions getPipelineOptions() {
+ return delegate.getPipelineOptions();
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58d66a34/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
index f015a66..edde925 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
@@ -60,7 +60,7 @@ public class FlinkTestPipeline extends Pipeline {
* @return The Test Pipeline.
*/
private static FlinkTestPipeline create(boolean streaming) {
- FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming);
+ TestFlinkPipelineRunner flinkRunner = TestFlinkPipelineRunner.create(streaming);
return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
}