You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:27 UTC

[beam] 05/50: Add global pipeline translation structure

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit abf4b46a8a547a343b240af8ad895f0ec6975423
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Nov 21 09:36:32 2018 +0100

    Add global pipeline translation structure
---
 .../runners/spark/structuredstreaming/SparkRunner.java   |  9 ++++-----
 .../translation/PipelineTranslator.java                  | 16 ++++++++++++++--
 2 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index 62cd7d3..59c08f7 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -2,15 +2,14 @@ package org.apache.beam.runners.spark.structuredstreaming;
 
 import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
 
-import org.apache.beam.runners.spark.structuredstreaming.translation.BatchPipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.batch.BatchPipelineTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
-import org.apache.beam.runners.spark.structuredstreaming.translation.StreamingPipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -99,10 +98,10 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
     PipelineTranslator.detectTranslationMode(pipeline, options);
     PipelineTranslator.replaceTransforms(pipeline, options);
     PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options);
-    PipelineTranslator translator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator()
+    PipelineTranslator translator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator();
     //init translator with subclass based on mode and env
     translator.translate(pipeline);
   }
-  public void executePipeline(Pipeline pipeline) {}
+  private void executePipeline(Pipeline pipeline) {}
 
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
index f0ce1e5..99621f6 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -3,6 +3,8 @@ package org.apache.beam.runners.spark.structuredstreaming.translation;
 import org.apache.beam.runners.core.construction.PipelineResources;
 import org.apache.beam.runners.spark.SparkTransformOverrides;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.translation.batch.BatchPipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.values.PCollection;
@@ -11,7 +13,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Does all the translation work: mode detection, nodes translation.
+ /**
+ * The role of this class is to detect the pipeline mode and to translate the Beam operators to their Spark counterparts. If we have
+ * a streaming job, this is instantiated as a {@link StreamingPipelineTranslator}. In other
+ * case, i.e. for a batch job, a {@link BatchPipelineTranslator} is created. Correspondingly,
  */
 
 public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
@@ -41,11 +46,18 @@ public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
     TranslationModeDetector detector = new TranslationModeDetector();
     pipeline.traverseTopologically(detector);
     if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) {
-      // set streaming mode if it's a streaming pipeline
       options.setStreaming(true);
     }
   }
 
+  /**
+   * Translates the pipeline by passing this class as a visitor.
+   *
+   * @param pipeline The pipeline to be translated
+   */
+  public void translate(Pipeline pipeline) {
+    pipeline.traverseTopologically(this);
+  }