You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:08 UTC

[13/50] [abbrv] incubator-beam git commit: [tests] add streaming mode to TestPipeline

[tests] add streaming mode to TestPipeline


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0be42cbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0be42cbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0be42cbf

Branch: refs/heads/master
Commit: 0be42cbf39ca0740e7f8e5f8faf38aa9126e8cf6
Parents: f0cb5f0
Author: Max <ma...@posteo.de>
Authored: Mon Feb 22 16:31:23 2016 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 .../flink/dataflow/FlinkPipelineRunner.java     |  3 +-
 .../flink/dataflow/FlinkTestPipeline.java       | 33 ++++++++++++++++++--
 2 files changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0be42cbf/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
index f57fed2..ebd2691 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
@@ -150,11 +150,12 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
 	 *
 	 * @return The newly created runner.
 	 */
-	public static FlinkPipelineRunner createForTest() {
+	public static FlinkPipelineRunner createForTest(boolean streaming) {
 		FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
 		// we use [auto] for testing since this will make it pick up the Testing
 		// ExecutionEnvironment
 		options.setFlinkMaster("[auto]");
+		options.setStreaming(streaming);
 		return new FlinkPipelineRunner(options);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0be42cbf/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
index a1f66c7..109b1ff 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
@@ -27,14 +27,41 @@ import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
 public class FlinkTestPipeline extends Pipeline {
 
 	/**
-	 * Creates and returns a new test pipeline.
+	 * Creates and returns a new test pipeline for batch execution.
 	 *
 	 * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
 	 * {@link Pipeline#run} to execute the pipeline and check the tests.
 	 */
 	public static FlinkTestPipeline create() {
-		FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest();
-		return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions());
+		return create(false);
+	}
+
+	/**
+	 * Creates and returns a new test pipeline for streaming execution.
+	 *
+	 * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
+	 * {@link Pipeline#run} to execute the pipeline and check the tests.
+	 *
+	 * @return The Test Pipeline
+	 */
+	public static FlinkTestPipeline createStreaming() {
+		return create(true);
+	}
+
+	/**
+	 * Creates and returns a new test pipeline for streaming or batch execution.
+	 *
+	 * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call
+	 * {@link Pipeline#run} to execute the pipeline and check the tests.
+	 *
+	 * @param streaming True for streaming mode, False for batch
+	 * @return The Test Pipeline
+	 */
+	public static FlinkTestPipeline create(boolean streaming) {
+		FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming);
+		FlinkPipelineOptions pipelineOptions = flinkRunner.getPipelineOptions();
+		pipelineOptions.setStreaming(streaming);
+		return new FlinkTestPipeline(flinkRunner, pipelineOptions);
 	}
 
 	private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions