You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:08 UTC
[13/50] [abbrv] incubator-beam git commit: [tests] add streaming mode
to TestPipeline
[tests] add streaming mode to TestPipeline
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0be42cbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0be42cbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0be42cbf
Branch: refs/heads/master
Commit: 0be42cbf39ca0740e7f8e5f8faf38aa9126e8cf6
Parents: f0cb5f0
Author: Max <ma...@posteo.de>
Authored: Mon Feb 22 16:31:23 2016 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800
----------------------------------------------------------------------
.../flink/dataflow/FlinkPipelineRunner.java | 3 +-
.../flink/dataflow/FlinkTestPipeline.java | 33 ++++++++++++++++++--
2 files changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0be42cbf/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
index f57fed2..ebd2691 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
@@ -150,11 +150,12 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
*
* @return The newly created runner.
*/
- public static FlinkPipelineRunner createForTest() {
+ public static FlinkPipelineRunner createForTest(boolean streaming) {
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
// we use [auto] for testing since this will make it pick up the Testing
// ExecutionEnvironment
options.setFlinkMaster("[auto]");
+ options.setStreaming(streaming);
return new FlinkPipelineRunner(options);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0be42cbf/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
index a1f66c7..109b1ff 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
@@ -27,14 +27,41 @@ import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
public class FlinkTestPipeline extends Pipeline {
/**
- * Creates and returns a new test pipeline.
+ * Creates and returns a new test pipeline for batch execution.
*
* <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
* {@link Pipeline#run} to execute the pipeline and check the tests.
*/
public static FlinkTestPipeline create() {
- FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest();
- return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
+ return create(false);
+ }
+
+ /**
+ * Creates and returns a new test pipeline for streaming execution.
+ *
+ * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
+ * {@link Pipeline#run} to execute the pipeline and check the tests.
+ *
+ * @return The Test Pipeline
+ */
+ public static FlinkTestPipeline createStreaming() {
+ return create(true);
+ }
+
+ /**
+ * Creates and returns a new test pipeline for streaming or batch execution.
+ *
+ * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
+ * {@link Pipeline#run} to execute the pipeline and check the tests.
+ *
+ * @param streaming True for streaming mode, False for batch
+ * @return The Test Pipeline
+ */
+ public static FlinkTestPipeline create(boolean streaming) {
+ FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming);
+ FlinkPipelineOptions pipelineOptions = flinkRunner.getPipelineOptions();
+ pipelineOptions.setStreaming(streaming);
+ return new FlinkTestPipeline(flinkRunner, pipelineOptions);
}
private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions