You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/11 14:54:43 UTC
[beam] branch spark-runner_structured-streaming updated: Enable
test mode
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
new 2acdf67 Enable test mode
2acdf67 is described below
commit 2acdf672cbbde78e4fb9cd5053b44109c8a07a0f
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 12:06:09 2019 +0100
Enable test mode
---
.../beam/runners/spark/structuredstreaming/SparkRunner.java | 2 +-
.../structuredstreaming/translation/TranslationContext.java | 11 ++++++++---
2 files changed, 9 insertions(+), 4 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index 8e0cf25..97aa4d8 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -114,7 +114,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
public SparkPipelineResult run(final Pipeline pipeline) {
translationContext = translatePipeline(pipeline);
//TODO initialise other services: checkpointing, metrics system, listeners, ...
- translationContext.startPipeline();
+ translationContext.startPipeline(true);
return new SparkPipelineResult();
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index e40bb85..9a3330a 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -159,7 +159,7 @@ public class TranslationContext {
// Pipeline methods
// --------------------------------------------------------------------------------------------
- public void startPipeline() {
+ public void startPipeline(boolean testMode) {
try {
// to start a pipeline we need a DatastreamWriter to start
for (Dataset<?> dataset : leaves) {
@@ -167,8 +167,13 @@ public class TranslationContext {
if (options.isStreaming()) {
dataset.writeStream().foreach(new NoOpForeachWriter<>()).start().awaitTermination();
} else {
- // apply a dummy fn just to apply forech action that will trigger the pipeline run in spark
- dataset.foreachPartition(t -> {});
+ if (testMode){
+ dataset.show();
+ } else {
+ // apply a dummy fn just to apply forech action that will trigger the pipeline run in spark
+ dataset.foreachPartition(t -> {
+ });
+ }
}
}
} catch (StreamingQueryException e) {