You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/11 14:54:43 UTC

[beam] branch spark-runner_structured-streaming updated: Enable test mode

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
     new 2acdf67  Enable test mode
2acdf67 is described below

commit 2acdf672cbbde78e4fb9cd5053b44109c8a07a0f
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Jan 11 12:06:09 2019 +0100

    Enable test mode
---
 .../beam/runners/spark/structuredstreaming/SparkRunner.java   |  2 +-
 .../structuredstreaming/translation/TranslationContext.java   | 11 ++++++++---
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index 8e0cf25..97aa4d8 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -114,7 +114,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
   public SparkPipelineResult run(final Pipeline pipeline) {
     translationContext = translatePipeline(pipeline);
     //TODO initialise other services: checkpointing, metrics system, listeners, ...
-    translationContext.startPipeline();
+    translationContext.startPipeline(true);
     return new SparkPipelineResult();
   }
 
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index e40bb85..9a3330a 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -159,7 +159,7 @@ public class TranslationContext {
   //  Pipeline methods
   // --------------------------------------------------------------------------------------------
 
-  public void startPipeline() {
+  public void startPipeline(boolean testMode) {
     try {
       // to start a pipeline we need a DatastreamWriter to start
       for (Dataset<?> dataset : leaves) {
@@ -167,8 +167,13 @@ public class TranslationContext {
         if (options.isStreaming()) {
           dataset.writeStream().foreach(new NoOpForeachWriter<>()).start().awaitTermination();
         } else {
-          // apply a dummy fn just to apply forech action that will trigger the pipeline run in spark
-          dataset.foreachPartition(t -> {});
+          if (testMode){
+            dataset.show();
+          } else {
+            // apply a dummy fn just to apply forech action that will trigger the pipeline run in spark
+            dataset.foreachPartition(t -> {
+            });
+          }
         }
       }
     } catch (StreamingQueryException e) {