You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:44 UTC
[beam] 22/50: Add basic pipeline execution. Refactor
translatePipeline() to return the translationContext on which we can run
startPipeline()
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 866ef13df71f5042e0fbd8e33a13ac1e0308d487
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Nov 28 14:52:20 2018 +0100
Add basic pipeline execution.
Refactor translatePipeline() to return the translationContext on which we can run startPipeline()
---
.../spark/structuredstreaming/SparkRunner.java | 12 +++---
.../translation/PipelineTranslator.java | 4 ++
.../translation/TranslationContext.java | 50 ++++++++++++++++++----
3 files changed, 53 insertions(+), 13 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index e3fd6b4..8e0cf25 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.structuredstreaming;
import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
+import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.StreamingPipelineTranslator;
import org.apache.beam.sdk.Pipeline;
@@ -53,6 +54,8 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
/** Options used in this pipeline runner. */
private final SparkPipelineOptions options;
+ private TranslationContext translationContext;
+
/**
* Creates and returns a new SparkRunner with default options. In particular, against a spark
* instance running in local mode.
@@ -109,13 +112,13 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
@Override
public SparkPipelineResult run(final Pipeline pipeline) {
- translatePipeline(pipeline);
+ translationContext = translatePipeline(pipeline);
//TODO initialise other services: checkpointing, metrics system, listeners, ...
- executePipeline(pipeline);
+ translationContext.startPipeline();
return new SparkPipelineResult();
}
- private void translatePipeline(Pipeline pipeline) {
+ private TranslationContext translatePipeline(Pipeline pipeline) {
PipelineTranslator.detectTranslationMode(pipeline, options);
PipelineTranslator.replaceTransforms(pipeline, options);
PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options);
@@ -124,7 +127,6 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
? new StreamingPipelineTranslator(options)
: new PipelineTranslatorBatch(options);
pipelineTranslator.translate(pipeline);
+ return pipelineTranslator.getTranslationContext();
}
-
- private void executePipeline(Pipeline pipeline) {}
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
index d64b8b1..e0924e3 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java
@@ -210,4 +210,8 @@ public abstract class PipelineTranslator extends Pipeline.PipelineVisitor.Defaul
}
applyTransformTranslator(node, transformTranslator);
}
+
+ public TranslationContext getTranslationContext() {
+ return translationContext;
+ }
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index aa831ed..71ae276 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -19,14 +19,18 @@ package org.apache.beam.runners.spark.structuredstreaming.translation;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.HashMap;
+import java.util.LinkedHashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PValue;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.StreamingQueryException;
/**
* Base class that gives a context for {@link PTransform} translation: keeping track of the
@@ -34,20 +38,16 @@ import org.apache.spark.sql.SparkSession;
*/
public class TranslationContext {
+ private final Map<PValue, Dataset<?>> datasets;
+ private final Set<Dataset<?>> leaves;
+ private final SparkPipelineOptions options;
+
@SuppressFBWarnings("URF_UNREAD_FIELD") // make findbug happy
private AppliedPTransform<?, ?, ?> currentTransform;
- private final Map<PValue, Dataset<?>> datasets;
-
@SuppressFBWarnings("URF_UNREAD_FIELD") // make findbug happy
private SparkSession sparkSession;
- private final SparkPipelineOptions options;
-
- public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
- this.currentTransform = currentTransform;
- }
-
public TranslationContext(SparkPipelineOptions options) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster(options.getSparkMaster());
@@ -59,5 +59,39 @@ public class TranslationContext {
this.sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
this.options = options;
this.datasets = new HashMap<>();
+ this.leaves = new LinkedHashSet<>();
+ }
+
+ public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
+ this.currentTransform = currentTransform;
+ }
+
+ public void startPipeline() {
+ try {
+ // to start a pipeline we need a DatastreamWriter to start
+ for (Dataset<?> dataset : leaves) {
+ dataset.writeStream().foreach(new NoOpForeachWriter<>()).start().awaitTermination();
+ }
+ } catch (StreamingQueryException e) {
+ throw new RuntimeException("Pipeline execution failed: " + e);
+ }
+ }
+
+ private static class NoOpForeachWriter<T> extends ForeachWriter<T> {
+
+ @Override
+ public boolean open(long partitionId, long epochId) {
+ return false;
+ }
+
+ @Override
+ public void process(T value) {
+ // do nothing
+ }
+
+ @Override
+ public void close(Throwable errorOrNull) {
+ // do nothing
+ }
}
}