You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:27 UTC
[beam] 05/50: Add global pipeline translation structure
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit abf4b46a8a547a343b240af8ad895f0ec6975423
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Nov 21 09:36:32 2018 +0100
Add global pipeline translation structure
---
.../runners/spark/structuredstreaming/SparkRunner.java | 9 ++++-----
.../translation/PipelineTranslator.java | 16 ++++++++++++++--
2 files changed, 18 insertions(+), 7 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index 62cd7d3..59c08f7 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -2,15 +2,14 @@ package org.apache.beam.runners.spark.structuredstreaming;
import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
-import org.apache.beam.runners.spark.structuredstreaming.translation.BatchPipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.batch.BatchPipelineTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
-import org.apache.beam.runners.spark.structuredstreaming.translation.StreamingPipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,10 +98,10 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
PipelineTranslator.detectTranslationMode(pipeline, options);
PipelineTranslator.replaceTransforms(pipeline, options);
PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options);
- PipelineTranslator translator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator()
+ PipelineTranslator translator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator();
//init translator with subclass based on mode and env
translator.translate(pipeline);
}
- public void executePipeline(Pipeline pipeline) {}
+ private void executePipeline(Pipeline pipeline) {}
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
index f0ce1e5..99621f6 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -3,6 +3,8 @@ package org.apache.beam.runners.spark.structuredstreaming.translation;
import org.apache.beam.runners.core.construction.PipelineResources;
import org.apache.beam.runners.spark.SparkTransformOverrides;
import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.translation.batch.BatchPipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.values.PCollection;
@@ -11,7 +13,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Does all the translation work: mode detection, nodes translation.
+ /**
+ * The role of this class is to detect the pipeline mode and to translate the Beam operators to their Spark counterparts. If we have
+ * a streaming job, this is instantiated as a {@link StreamingPipelineTranslator}. In other
+ * case, i.e. for a batch job, a {@link BatchPipelineTranslator} is created. Correspondingly,
*/
public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
@@ -41,11 +46,18 @@ public class PipelineTranslator extends Pipeline.PipelineVisitor.Defaults{
TranslationModeDetector detector = new TranslationModeDetector();
pipeline.traverseTopologically(detector);
if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) {
- // set streaming mode if it's a streaming pipeline
options.setStreaming(true);
}
}
+ /**
+ * Translates the pipeline by passing this class as a visitor.
+ *
+ * @param pipeline The pipeline to be translated
+ */
+ public void translate(Pipeline pipeline) {
+ pipeline.traverseTopologically(this);
+ }