You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/10 14:26:23 UTC

[beam] 02/04: Fix pipeline triggering: use a spark action instead of writing the dataset

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3cc76b86b00861737764c3233a44868e4a255e9c
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Jan 10 11:09:28 2019 +0100

    Fix pipeline triggering: use a spark action instead of writing the dataset
---
 .../spark/structuredstreaming/translation/TranslationContext.java      | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index acc49f4..e40bb85 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -167,7 +167,8 @@ public class TranslationContext {
         if (options.isStreaming()) {
           dataset.writeStream().foreach(new NoOpForeachWriter<>()).start().awaitTermination();
         } else {
-          dataset.write();
+          // apply a dummy fn just to apply forech action that will trigger the pipeline run in spark
+          dataset.foreachPartition(t -> {});
         }
       }
     } catch (StreamingQueryException e) {