You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/01/04 10:38:37 UTC
[beam] 15/50: Move common translation context components to
superclass
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 4777e22592bbc8a92eccae30250bd9998bfd11da
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Thu Nov 22 12:01:28 2018 +0100
Move common translation context components to superclass
---
.../translation/TranslationContext.java | 29 +++++++++++++++++++++-
.../translation/batch/BatchTranslationContext.java | 20 ++-------------
.../streaming/StreamingTranslationContext.java | 6 ++++-
3 files changed, 35 insertions(+), 20 deletions(-)
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 3dacde4..e651e70 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -1,17 +1,44 @@
package org.apache.beam.runners.spark.structuredstreaming.translation;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
/**
- * Base class that gives a context for {@link PTransform} translation.
+ * Base class that gives a context for {@link PTransform} translation: keeping track of the datasets,
+ * the {@link SparkSession}, the current transform being translated.
*/
public class TranslationContext {
private AppliedPTransform<?, ?, ?> currentTransform;
+ private final Map<PValue, Dataset<?>> datasets;
+ private SparkSession sparkSession;
+ private final SparkPipelineOptions options;
public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
this.currentTransform = currentTransform;
}
+ public TranslationContext(SparkPipelineOptions options) {
+ SparkConf sparkConf = new SparkConf();
+ sparkConf.setMaster(options.getSparkMaster());
+ sparkConf.setAppName(options.getAppName());
+ if (options.getFilesToStage() != null && !options.getFilesToStage().isEmpty()) {
+ sparkConf.setJars(options.getFilesToStage().toArray(new String[0]));
+ }
+
+ this.sparkSession = SparkSession
+ .builder()
+ .config(sparkConf)
+ .getOrCreate();
+ this.options = options;
+ this.datasets = new HashMap<>();
+ }
+
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
index f08e33c..02aad71 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/BatchTranslationContext.java
@@ -11,10 +11,9 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
/**
- * Keeps track of context of the translation.
+ * This class contains only batch specific context components.
*/
public class BatchTranslationContext extends TranslationContext {
- private final Map<PValue, Dataset<?>> datasets;
/**
* For keeping track about which DataSets don't have a successor. We need to terminate these with
@@ -22,23 +21,8 @@ public class BatchTranslationContext extends TranslationContext {
*/
private final Map<PValue, Dataset<?>> danglingDataSets;
- private SparkSession sparkSession;
- private final SparkPipelineOptions options;
-
public BatchTranslationContext(SparkPipelineOptions options) {
- SparkConf sparkConf = new SparkConf();
- sparkConf.setMaster(options.getSparkMaster());
- sparkConf.setAppName(options.getAppName());
- if (options.getFilesToStage() != null && !options.getFilesToStage().isEmpty()) {
- sparkConf.setJars(options.getFilesToStage().toArray(new String[0]));
- }
-
- this.sparkSession = SparkSession
- .builder()
- .config(sparkConf)
- .getOrCreate();
- this.options = options;
- this.datasets = new HashMap<>();
+ super(options);
this.danglingDataSets = new HashMap<>();
}
}
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java
index f2ee34b..ebccfa7 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/StreamingTranslationContext.java
@@ -1,10 +1,14 @@
package org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
/**
- * * Keeps track of context of the translation.
+ * This class contains only streaming specific context components.
*/
public class StreamingTranslationContext extends TranslationContext {
+ public StreamingTranslationContext(SparkPipelineOptions options) {
+ super(options);
+ }
}