You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/11/28 13:52:57 UTC

[beam] branch spark-runner_structured-streaming updated: Add basic pipeline execution. Refactor translatePipeline() to return the translationContext on which we can run startPipeline()

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
     new 9061cb0  Add basic pipeline execution. Refactor translatePipeline() to return the translationContext on which we can run startPipeline()
9061cb0 is described below

commit 9061cb02182ef508606aa78bb141e7e913406aa6
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Nov 28 14:52:20 2018 +0100

    Add basic pipeline execution.
    Refactor translatePipeline() to return the translationContext on which we can run startPipeline()
---
 .../spark/structuredstreaming/SparkRunner.java     | 12 +++---
 .../translation/PipelineTranslator.java            |  4 ++
 .../translation/TranslationContext.java            | 50 ++++++++++++++++++----
 3 files changed, 53 insertions(+), 13 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index e3fd6b4..8e0cf25 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.structuredstreaming;
 import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
 
 import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
 import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator;
 import org.apache.beam.sdk.Pipeline;
@@ -53,6 +54,8 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
   /** Options used in this pipeline runner. */
   private final SparkPipelineOptions options;
 
+  private TranslationContext translationContext;
+
   /**
    * Creates and returns a new SparkRunner with default options. In particular, against a spark
    * instance running in local mode.
@@ -109,13 +112,13 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
 
   @Override
   public SparkPipelineResult run(final Pipeline pipeline) {
-    translatePipeline(pipeline);
+    translationContext = translatePipeline(pipeline);
     //TODO initialise other services: checkpointing, metrics system, listeners, ...
-    executePipeline(pipeline);
+    translationContext.startPipeline();
     return new SparkPipelineResult();
   }
 
-  private void translatePipeline(Pipeline pipeline) {
+  private TranslationContext translatePipeline(Pipeline pipeline) {
     PipelineTranslator.detectTranslationMode(pipeline, options);
     PipelineTranslator.replaceTransforms(pipeline, options);
     PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options);
@@ -124,7 +127,6 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
             ? new StreamingPipelineTranslator(options)
             : new PipelineTranslatorBatch(options);
     pipelineTranslator.translate(pipeline);
+    return pipelineTranslator.getTranslationContext();
   }
-
-  private void executePipeline(Pipeline pipeline) {}
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
index d64b8b1..e0924e3 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -210,4 +210,8 @@ public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaul
     }
     applyTransformTranslator(node, transformTranslator);
   }
+
+  public TranslationContext getTranslationContext() {
+    return translationContext;
+  }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index aa831ed..71ae276 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -19,14 +19,18 @@ package org.apache.beam.runners.spark.structuredstreaming.translation;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Set;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.spark.SparkConf;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.ForeachWriter;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.StreamingQueryException;
 
 /**
  * Base class that gives a context for {@link PTransform} translation: keeping track of the
@@ -34,20 +38,16 @@ import org.apache.spark.sql.SparkSession;
  */
 public class TranslationContext {
 
+  private final Map<PValue, Dataset<?>> datasets;
+  private final Set<Dataset<?>> leaves;
+  private final SparkPipelineOptions options;
+
   @SuppressFBWarnings("URF_UNREAD_FIELD") // make findbug happy
   private AppliedPTransform<?, ?, ?> currentTransform;
 
-  private final Map<PValue, Dataset<?>> datasets;
-
   @SuppressFBWarnings("URF_UNREAD_FIELD") // make findbug happy
   private SparkSession sparkSession;
 
-  private final SparkPipelineOptions options;
-
-  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
-    this.currentTransform = currentTransform;
-  }
-
   public TranslationContext(SparkPipelineOptions options) {
     SparkConf sparkConf = new SparkConf();
     sparkConf.setMaster(options.getSparkMaster());
@@ -59,5 +59,39 @@ public class TranslationContext {
     this.sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
     this.options = options;
     this.datasets = new HashMap<>();
+    this.leaves = new LinkedHashSet<>();
+  }
+
+  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
+    this.currentTransform = currentTransform;
+  }
+
+  public void startPipeline() {
+    try {
+      // to start a pipeline we need a DatastreamWriter to start
+      for (Dataset<?> dataset : leaves) {
+        dataset.writeStream().foreach(new NoOpForeachWriter<>()).start().awaitTermination();
+      }
+    } catch (StreamingQueryException e) {
+      throw new RuntimeException("Pipeline execution failed: " + e);
+    }
+  }
+
+  private static class NoOpForeachWriter<T> extends ForeachWriter<T> {
+
+    @Override
+    public boolean open(long partitionId, long epochId) {
+      return false;
+    }
+
+    @Override
+    public void process(T value) {
+      // do nothing
+    }
+
+    @Override
+    public void close(Throwable errorOrNull) {
+      // do nothing
+    }
   }
 }