You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/11/27 15:39:20 UTC
[beam] 10/20: Initialise BatchTranslationContext
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 53b2e71e9e93db052aa3fdc88bd3ad9afcb04274
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed Nov 21 12:13:21 2018 +0100
Initialise BatchTranslationContext
---
.../runners/spark/structuredstreaming/SparkRunner.java | 2 +-
.../translation/batch/BatchPipelineTranslator.java | 7 ++++++-
.../translation/batch/BatchTranslationContext.java | 17 ++++++++++++++---
.../streaming/StreamingPipelineTranslator.java | 5 ++++-
4 files changed, 25 insertions(+), 6 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
index ab2215b..de20133 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkRunner.java
@@ -98,7 +98,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
PipelineTranslator.detectTranslationMode(pipeline, options);
PipelineTranslator.replaceTransforms(pipeline, options);
PipelineTranslator.prepareFilesToStageForRemoteClusterExecution(options);
- PipelineTranslator pipelineTranslator = options.isStreaming() ? new StreamingPipelineTranslator() : new BatchPipelineTranslator();
+ PipelineTranslator pipelineTranslator = options.isStreaming() ? new StreamingPipelineTranslator(options) : new BatchPipelineTranslator(options);
pipelineTranslator.translate(pipeline);
}
private void executePipeline(Pipeline pipeline) {}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
index 2459372..1bf660f 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchPipelineTranslator.java
@@ -4,10 +4,13 @@ import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +48,9 @@ public class BatchPipelineTranslator extends PipelineTranslator {
}
private static final Logger LOG = LoggerFactory.getLogger(BatchPipelineTranslator.class);
-
+ public BatchPipelineTranslator(SparkPipelineOptions options) {
+ translationContext = new BatchTranslationContext(options);
+ }
/** Returns a translator for the given node, if it is possible, otherwise null. */
private static BatchTransformTranslator<?> getTransformTranslator(TransformHierarchy.Node node) {
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
index 1d991f1..b53aa19 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
@@ -5,6 +5,7 @@ import java.util.Map;
import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.values.PValue;
+import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
@@ -20,14 +21,24 @@ public class BatchTranslationContext {
*/
private final Map<PValue, Dataset<?>> danglingDataSets;
- private final SparkSession sparkSession;
+ private SparkSession sparkSession;
private final SparkPipelineOptions options;
private AppliedPTransform<?, ?, ?> currentTransform;
- public BatchTranslationContext(SparkSession sparkSession, SparkPipelineOptions options) {
- this.sparkSession = sparkSession;
+ public BatchTranslationContext(SparkPipelineOptions options) {
+ SparkConf sparkConf = new SparkConf();
+ sparkConf.setMaster(options.getSparkMaster());
+ sparkConf.setAppName(options.getAppName());
+ if (options.getFilesToStage() != null && !options.getFilesToStage().isEmpty()) {
+ sparkConf.setJars(options.getFilesToStage().toArray(new String[0]));
+ }
+
+ SparkSession sparkSession = SparkSession
+ .builder()
+ .config(sparkConf)
+ .getOrCreate();
this.options = options;
this.datasets = new HashMap<>();
this.danglingDataSets = new HashMap<>();
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
index 547083c..7bed930 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingPipelineTranslator.java
@@ -1,7 +1,10 @@
package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
public class StreamingPipelineTranslator extends PipelineTranslator {
-//TODO impl
+
+ public StreamingPipelineTranslator(SparkPipelineOptions options) {
+ }
}