You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2018/11/27 15:39:25 UTC

[beam] 15/20: Move common translation context components to superclass

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 41742be0d1d546db174537df22c365c6d5b9c758
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Nov 22 12:01:28 2018 +0100

    Move common translation context components to superclass
---
 .../translation/TranslationContext.java            | 29 +++++++++++++++++++++-
 .../translation/batch/BatchTranslationContext.java | 20 ++-------------
 .../streaming/StreamingTranslationContext.java     |  6 ++++-
 3 files changed, 35 insertions(+), 20 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 3dacde4..e651e70 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -1,17 +1,44 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation;
 
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
 
 /**
- * Base class that gives a context for {@link PTransform} translation.
+ * Base class that gives a context for {@link PTransform} translation: keeping track of the datasets,
+ * the {@link SparkSession}, the current transform being translated.
  */
 public class TranslationContext {
 
   private AppliedPTransform<?, ?, ?> currentTransform;
+  private final Map<PValue, Dataset<?>> datasets;
+  private SparkSession sparkSession;
+  private final SparkPipelineOptions options;
 
   public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
     this.currentTransform = currentTransform;
   }
 
+  public TranslationContext(SparkPipelineOptions options) {
+    SparkConf sparkConf = new SparkConf();
+    sparkConf.setMaster(options.getSparkMaster());
+    sparkConf.setAppName(options.getAppName());
+    if (options.getFilesToStage() != null && !options.getFilesToStage().isEmpty()) {
+      sparkConf.setJars(options.getFilesToStage().toArray(new String[0]));
+    }
+
+    this.sparkSession = SparkSession
+        .builder()
+        .config(sparkConf)
+        .getOrCreate();
+    this.options = options;
+    this.datasets = new HashMap<>();
+  }
+
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
index f08e33c..02aad71 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
@@ -11,10 +11,9 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.SparkSession;
 
 /**
- * Keeps track of context of the translation.
+ * This class contains only batch specific context components.
  */
 public class BatchTranslationContext extends TranslationContext {
-  private final Map<PValue, Dataset<?>> datasets;
 
   /**
    * For keeping track about which DataSets don't have a successor. We need to terminate these with
@@ -22,23 +21,8 @@ public class BatchTranslationContext extends TranslationContext {
    */
   private final Map<PValue, Dataset<?>> danglingDataSets;
 
-  private SparkSession sparkSession;
-  private final SparkPipelineOptions options;
-
   public BatchTranslationContext(SparkPipelineOptions options) {
-    SparkConf sparkConf = new SparkConf();
-    sparkConf.setMaster(options.getSparkMaster());
-    sparkConf.setAppName(options.getAppName());
-    if (options.getFilesToStage() != null && !options.getFilesToStage().isEmpty()) {
-      sparkConf.setJars(options.getFilesToStage().toArray(new String[0]));
-    }
-
-    this.sparkSession = SparkSession
-        .builder()
-        .config(sparkConf)
-        .getOrCreate();
-    this.options = options;
-    this.datasets = new HashMap<>();
+    super(options);
     this.danglingDataSets = new HashMap<>();
   }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java
index f2ee34b..ebccfa7 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java
@@ -1,10 +1,14 @@
 package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
 
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 
 /**
- *  * Keeps track of context of the translation.
+ * This class contains only streaming specific context components.
  */
 public class StreamingTranslationContext extends TranslationContext {
 
+  public StreamingTranslationContext(SparkPipelineOptions options) {
+    super(options);
+  }
 }