You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/10 14:44:41 UTC

[GitHub] [beam] aromanenko-dev commented on a diff in pull request #22446: Improved pipeline translation in SparkStructuredStreamingRunner

aromanenko-dev commented on code in PR #22446:
URL: https://github.com/apache/beam/pull/22446#discussion_r942510378


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java:
##########
@@ -182,10 +184,7 @@ private AbstractTranslationContext translatePipeline(Pipeline pipeline) {
 
     PipelineTranslator.replaceTransforms(pipeline, options);
     prepareFilesToStage(options);
-    PipelineTranslator pipelineTranslator =
-        options.isStreaming()
-            ? new PipelineTranslatorStreaming(options)
-            : new PipelineTranslatorBatch(options);
+    PipelineTranslator pipelineTranslator = new PipelineTranslatorBatch(options);

Review Comment:
   Don't believe that streaming will be supported one day in this runner? =)



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java:
##########
@@ -17,27 +17,132 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation;
 
-import java.util.concurrent.TimeoutException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.spark.api.java.function.ForeachFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Subclass of {@link
- * org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext} that
- * address spark breaking changes.
+ * Base class that gives a context for {@link PTransform} translation: keeping track of the
+ * datasets, the {@link SparkSession}, the current transform being translated.
  */
-public class TranslationContext extends AbstractTranslationContext {
+@SuppressWarnings({

Review Comment:
   Can we avoid `@SuppressWarnings` here and in any other places where it's been used?



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java:
##########
@@ -17,27 +17,132 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation;
 
-import java.util.concurrent.TimeoutException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.spark.api.java.function.ForeachFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Subclass of {@link
- * org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext} that
- * address spark breaking changes.
+ * Base class that gives a context for {@link PTransform} translation: keeping track of the
+ * datasets, the {@link SparkSession}, the current transform being translated.
  */
-public class TranslationContext extends AbstractTranslationContext {
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class TranslationContext {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
+
+  /** All the datasets of the DAG. */
+  private final Map<PValue, Dataset<?>> datasets;
+  /** datasets that are not used as input to other datasets (leaves of the DAG). */
+  private final Set<Dataset<?>> leaves;
+
+  private final SerializablePipelineOptions serializablePipelineOptions;
+
+  private final SparkSession sparkSession;
+
+  private final Map<PCollectionView<?>, Dataset<?>> broadcastDataSets;
+
+  private final Map<Coder<?>, ExpressionEncoder<?>> encoders;
 
   public TranslationContext(SparkStructuredStreamingPipelineOptions options) {
-    super(options);
+    this.sparkSession = SparkSessionFactory.getOrCreateSession(options);
+    this.serializablePipelineOptions = new SerializablePipelineOptions(options);
+    this.datasets = new HashMap<>();
+    this.leaves = new HashSet<>();
+    this.broadcastDataSets = new HashMap<>();
+    this.encoders = new HashMap<>();
+  }
+
+  public SparkSession getSparkSession() {
+    return sparkSession;
+  }
+
+  public SerializablePipelineOptions getSerializableOptions() {
+    return serializablePipelineOptions;
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Datasets methods
+  // --------------------------------------------------------------------------------------------
+
+  public <T> Encoder<T> encoderOf(Coder<T> coder, Function<Coder<T>, Encoder<T>> loadFn) {
+    return (Encoder<T>) encoders.computeIfAbsent(coder, (Function) loadFn);
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> Dataset<WindowedValue<T>> getDataset(PCollection<T> pCollection) {
+    Dataset<?> dataset = datasets.get(pCollection);
+    // assume that the Dataset is used as an input if retrieved here. So it is not a leaf anymore
+    leaves.remove(dataset);
+    return (Dataset<WindowedValue<T>>) dataset;
+  }
+
+  public <T> void putDataset(PCollection<T> pCollection, Dataset<WindowedValue<T>> dataset) {
+    if (!datasets.containsKey(pCollection)) {
+      datasets.put(pCollection, dataset);
+      leaves.add(dataset);
+    }
+  }
+
+  public <ViewT, ElemT> void setSideInputDataset(
+      PCollectionView<ViewT> value, Dataset<WindowedValue<ElemT>> set) {
+    if (!broadcastDataSets.containsKey(value)) {
+      broadcastDataSets.put(value, set);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> Dataset<T> getSideInputDataSet(PCollectionView<?> value) {
+    return (Dataset<T>) broadcastDataSets.get(value);
+  }
+
+  // --------------------------------------------------------------------------------------------

Review Comment:
   The same as above.



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java:
##########
@@ -17,27 +17,132 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation;
 
-import java.util.concurrent.TimeoutException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.spark.api.java.function.ForeachFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Subclass of {@link
- * org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext} that
- * address spark breaking changes.
+ * Base class that gives a context for {@link PTransform} translation: keeping track of the
+ * datasets, the {@link SparkSession}, the current transform being translated.
  */
-public class TranslationContext extends AbstractTranslationContext {
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class TranslationContext {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
+
+  /** All the datasets of the DAG. */
+  private final Map<PValue, Dataset<?>> datasets;
+  /** datasets that are not used as input to other datasets (leaves of the DAG). */
+  private final Set<Dataset<?>> leaves;
+
+  private final SerializablePipelineOptions serializablePipelineOptions;
+
+  private final SparkSession sparkSession;
+
+  private final Map<PCollectionView<?>, Dataset<?>> broadcastDataSets;
+
+  private final Map<Coder<?>, ExpressionEncoder<?>> encoders;
 
   public TranslationContext(SparkStructuredStreamingPipelineOptions options) {
-    super(options);
+    this.sparkSession = SparkSessionFactory.getOrCreateSession(options);
+    this.serializablePipelineOptions = new SerializablePipelineOptions(options);
+    this.datasets = new HashMap<>();
+    this.leaves = new HashSet<>();
+    this.broadcastDataSets = new HashMap<>();
+    this.encoders = new HashMap<>();
+  }
+
+  public SparkSession getSparkSession() {
+    return sparkSession;
+  }
+
+  public SerializablePipelineOptions getSerializableOptions() {
+    return serializablePipelineOptions;
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Datasets methods
+  // --------------------------------------------------------------------------------------------
+
+  public <T> Encoder<T> encoderOf(Coder<T> coder, Function<Coder<T>, Encoder<T>> loadFn) {
+    return (Encoder<T>) encoders.computeIfAbsent(coder, (Function) loadFn);
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> Dataset<WindowedValue<T>> getDataset(PCollection<T> pCollection) {
+    Dataset<?> dataset = datasets.get(pCollection);
+    // assume that the Dataset is used as an input if retrieved here. So it is not a leaf anymore
+    leaves.remove(dataset);
+    return (Dataset<WindowedValue<T>>) dataset;
+  }
+
+  public <T> void putDataset(PCollection<T> pCollection, Dataset<WindowedValue<T>> dataset) {
+    if (!datasets.containsKey(pCollection)) {
+      datasets.put(pCollection, dataset);
+      leaves.add(dataset);
+    }
+  }
+
+  public <ViewT, ElemT> void setSideInputDataset(
+      PCollectionView<ViewT> value, Dataset<WindowedValue<ElemT>> set) {
+    if (!broadcastDataSets.containsKey(value)) {
+      broadcastDataSets.put(value, set);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> Dataset<T> getSideInputDataSet(PCollectionView<?> value) {
+    return (Dataset<T>) broadcastDataSets.get(value);
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Pipeline methods
+  // --------------------------------------------------------------------------------------------
+
+  /** Starts the batch pipeline, streaming is not supported. */

Review Comment:
   It would be helpful to add a comment why streaming is not supported.



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java:
##########
@@ -17,27 +17,132 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation;
 
-import java.util.concurrent.TimeoutException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
-import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.spark.api.java.function.ForeachFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Subclass of {@link
- * org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext} that
- * address spark breaking changes.
+ * Base class that gives a context for {@link PTransform} translation: keeping track of the
+ * datasets, the {@link SparkSession}, the current transform being translated.
  */
-public class TranslationContext extends AbstractTranslationContext {
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class TranslationContext {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
+
+  /** All the datasets of the DAG. */
+  private final Map<PValue, Dataset<?>> datasets;
+  /** datasets that are not used as input to other datasets (leaves of the DAG). */
+  private final Set<Dataset<?>> leaves;
+
+  private final SerializablePipelineOptions serializablePipelineOptions;
+
+  private final SparkSession sparkSession;
+
+  private final Map<PCollectionView<?>, Dataset<?>> broadcastDataSets;
+
+  private final Map<Coder<?>, ExpressionEncoder<?>> encoders;
 
   public TranslationContext(SparkStructuredStreamingPipelineOptions options) {
-    super(options);
+    this.sparkSession = SparkSessionFactory.getOrCreateSession(options);
+    this.serializablePipelineOptions = new SerializablePipelineOptions(options);
+    this.datasets = new HashMap<>();
+    this.leaves = new HashSet<>();
+    this.broadcastDataSets = new HashMap<>();
+    this.encoders = new HashMap<>();
+  }
+
+  public SparkSession getSparkSession() {
+    return sparkSession;
+  }
+
+  public SerializablePipelineOptions getSerializableOptions() {
+    return serializablePipelineOptions;
+  }
+
+  // --------------------------------------------------------------------------------------------

Review Comment:
   Better to remove these comments since they don't bring any useful information.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org