You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "aromanenko-dev (via GitHub)" <gi...@apache.org> on 2023/02/01 14:49:24 UTC

[GitHub] [beam] aromanenko-dev commented on a diff in pull request #25187: [Spark Dataset runner] Break linage of dataset to reduce Spark planning overhead in case of large query plans

aromanenko-dev commented on code in PR #25187:
URL: https://github.com/apache/beam/pull/25187#discussion_r1093166881


##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java:
##########
@@ -61,25 +61,25 @@ public class PipelineTranslatorBatch extends PipelineTranslator {
   // https://github.com/seznam/euphoria/blob/master/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SparkFlowTranslator.java#L106
 
   static {
-    TRANSFORM_TRANSLATORS.put(Impulse.class, new ImpulseTranslatorBatch());
-    TRANSFORM_TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch<>());
-    TRANSFORM_TRANSLATORS.put(Combine.Globally.class, new CombineGloballyTranslatorBatch<>());
+    TRANSFORM_TRANSLATORS.put(Impulse.class, new ImpulseTranslatorBatch(0));

Review Comment:
   How all this and below complexity factors were estimated?



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java:
##########
@@ -146,13 +165,37 @@ public String name() {
     public @Nullable Dataset<WindowedValue<T>> dataset() {
       return dataset;
     }
+
+    private boolean isLeaf() {
+      return dependentTransforms.isEmpty();
+    }
+
+    private int usages() {
+      return dependentTransforms.size();
+    }
+
+    private void resetPlanComplexity() {
+      planComplexity = 1;
+    }
+
+    /** Estimate complexity of query plan by multiplying complexities of all dependencies. */
+    private float estimatePlanComplexity() {
+      if (planComplexity > 0) {
+        return planComplexity;
+      }
+      float complexity = 1 + complexityFactor;
+      for (TranslationResult<?> res : dependencies) {

Review Comment:
   nit: res -> result



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java:
##########
@@ -129,12 +138,22 @@ public EvaluationContext translate(
    */
   private static final class TranslationResult<T> implements EvaluationContext.NamedDataset<T> {
     private final String name;
+    private final float complexityFactor;

Review Comment:
   Default value?



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java:
##########
@@ -80,6 +83,12 @@
 public abstract class PipelineTranslator {
   private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class);
 
+  // Threshold to limit query plan complexity to avoid unnecessary planning overhead. Currently this
+  // is fairly low, Catalyst won't be able to optimize beyond ParDos anyways. Until there's
+  // dedicated support for schema transforms, there's little value of allowing more complex plans at
+  // this point.
+  private static final int PLAN_COMPLEXITY_THRESHOLD = 6;

Review Comment:
   Why it is 6? Should it be configurable? 



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java:
##########
@@ -129,12 +138,22 @@ public EvaluationContext translate(
    */
   private static final class TranslationResult<T> implements EvaluationContext.NamedDataset<T> {
     private final String name;
+    private final float complexityFactor;
+    private float planComplexity = 0;
+
     private @MonotonicNonNull Dataset<WindowedValue<T>> dataset = null;
     private @MonotonicNonNull Broadcast<SideInputValues<T>> sideInputBroadcast = null;
+
+    // dependent downstream transforms (if empty this is a leaf)
     private final Set<PTransform<?, ?>> dependentTransforms = new HashSet<>();
+    // upstream dependencies (requires inputs)

Review Comment:
   nit: downstream?



##########
runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/PipelineTranslator.java:
##########
@@ -247,20 +292,37 @@ public <T> Dataset<WindowedValue<T>> getDataset(PCollection<T> pCollection) {
     public <T> void putDataset(
         PCollection<T> pCollection, Dataset<WindowedValue<T>> dataset, boolean cache) {
       TranslationResult<T> result = getResult(pCollection);
-      if (cache && result.dependentTransforms.size() > 1) {
-        LOG.info("Dataset {} will be cached.", result.name);
-        result.dataset = dataset.persist(storageLevel); // use NONE to disable
-      } else {
-        result.dataset = dataset;
-        if (result.dependentTransforms.isEmpty()) {
-          leaves.add(result);
+      result.dataset = dataset;
+
+      if (!cache && isMemoryOnly) {
+        result.resetPlanComplexity(); // cached as RDD in memory which breaks linage
+      } else if (cache && result.usages() > 1) {
+        if (isMemoryOnly) {
+          // Cache as RDD in-memory only, this helps to also break linage of complex query plans.
+          LOG.info("Dataset {} will be cached in-memory as RDD for reuse.", result.name);
+          result.dataset = sparkSession.createDataset(dataset.rdd().persist(), dataset.encoder());
+          result.resetPlanComplexity();
+        } else {
+          LOG.info("Dataset {} will be cached for reuse.", result.name);
+          dataset.persist(storageLevel); // use NONE to disable
         }
       }
+
+      if (result.estimatePlanComplexity() > PLAN_COMPLEXITY_THRESHOLD) {
+        // Break linage of dataset to limit planning overhead for complex query plans.
+        LOG.debug("Breaking linage of dataset {} to limit complexity of query plan.", result.name);

Review Comment:
   nit: `info` log level



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org