You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/11/27 15:39:20 UTC

[beam] 10/20: Initialise BatchTranslationContext

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 53b2e71e9e93db052aa3fdc88bd3ad9afcb04274
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Nov 21 12:13:21 2018 +0100

    Initialise BatchTranslationContext
---
 .../runners/spark/structuredstreaming/SparkRunner.java  |  2 +-
 .../translation/batch/BatchPipelineTranslator.java      |  7 ++++++-
 .../translation/batch/BatchTranslationContext.java      | 17 ++++++++++++++---
 .../streaming/StreamingPipelineTranslator.java          |  5 ++++-
 4 files changed, 25 insertions(+), 6 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index ab2215b..de20133 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -98,7 +98,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
     PipelineTranslator.detectTranslationMode(pipeline, options);
     PipelineTranslator.replaceTransforms(pipeline, options);
     PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options);
-    PipelineTranslator pipelineTranslator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator();
+    PipelineTranslator pipelineTranslator = options.isStreaming() ? new StreamingPipelineTranslator(options) : new BatchPipelineTranslator(options);
     pipelineTranslator.translate(pipeline);
   }
   private void executePipeline(Pipeline pipeline) {}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
index 2459372..1bf660f 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
@@ -4,10 +4,13 @@ import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +48,9 @@ public class BatchPipelineTranslator extends PipelineTranslator {
   }
   private static final Logger LOG = LoggerFactory.getLogger(BatchPipelineTranslator.class);
 
-
+  public BatchPipelineTranslator(SparkPipelineOptions options) {
+    translationContext = new BatchTranslationContext(options);
+  }
 
   /** Returns a translator for the given node, if it is possible, otherwise null. */
   private static BatchTransformTranslator<?> getTransformTranslator(TransformHierarchy.Node node) {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
index 1d991f1..b53aa19 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
@@ -5,6 +5,7 @@ import java.util.Map;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.spark.SparkConf;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.SparkSession;
 
@@ -20,14 +21,24 @@ public class BatchTranslationContext {
    */
   private final Map<PValue, Dataset<?>> danglingDataSets;
 
-  private final SparkSession sparkSession;
+  private SparkSession sparkSession;
   private final SparkPipelineOptions options;
 
   private AppliedPTransform<?, ?, ?> currentTransform;
 
 
-  public BatchTranslationContext(SparkSession sparkSession, SparkPipelineOptions options) {
-    this.sparkSession = sparkSession;
+  public BatchTranslationContext(SparkPipelineOptions options) {
+    SparkConf sparkConf = new SparkConf();
+    sparkConf.setMaster(options.getSparkMaster());
+    sparkConf.setAppName(options.getAppName());
+    if (options.getFilesToStage() != null && !options.getFilesToStage().isEmpty()) {
+      sparkConf.setJars(options.getFilesToStage().toArray(new String[0]));
+    }
+
+    SparkSession sparkSession = SparkSession
+        .builder()
+        .config(sparkConf)
+        .getOrCreate();
     this.options = options;
     this.datasets = new HashMap<>();
     this.danglingDataSets = new HashMap<>();
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
index 547083c..7bed930 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
@@ -1,7 +1,10 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
 
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
 
 public class StreamingPipelineTranslator extends PipelineTranslator {
-//TODO impl
+
+  public StreamingPipelineTranslator(SparkPipelineOptions options) {
+  }
 }