You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/03/15 23:54:46 UTC

[GitHub] [spark] minyyy opened a new pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

minyyy opened a new pull request #35864:
URL: https://github.com/apache/spark/pull/35864


   ### What changes were proposed in this pull request?
   
   The "prune unrequired references" branch has the condition:
   
   `case p @ Project(_, g: Generate) if p.references != g.outputSet => `
   
   This is wrong as generators like Inline will always enter this branch as long as it does not use all the generator output.
   
   Example:
   
   input: <col1: array<struct<a: struct<a: int, b: int>, b: int>>>
   
   Project(a.a as x)
   \- Generate(Inline(col1), ..., a, b)
   
   p.references is [a]
   g.outputSet is [a, b]
   
   This bug makes us never enter the GeneratorNestedColumnAliasing branch below thus miss some optimization opportunities. This PR changes the condition to check whether the child output is not used by the project and it is either not used by the generator or not already put into unrequiredChildOutput.
   
   ### Why are the changes needed?
   The wrong condition prevents some expressions like Inline, PosExplode from being optimized by rules after it. Before the PR, the test query added in the PR is not optimized since the optimization rule is not able to apply to it. After the PR the optimization rule can be correctly applied to the query.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Unit tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sigmod commented on a change in pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
sigmod commented on a change in pull request #35864:
URL: https://github.com/apache/spark/pull/35864#discussion_r828505854



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -831,8 +831,16 @@ object ColumnPruning extends Rule[LogicalPlan] {
       e.copy(child = prunedChild(child, e.references))
 
     // prune unrequired references
-    case p @ Project(_, g: Generate) if p.references != g.outputSet =>
-      val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references
+    // There are 2 types of pruning here:
+    // 1. For attributes in g.child.outputSet that is not used by the generator nor the project,
+    //    we directly remove it from the output list of g.child.
+    // 2. For attributes that is not used by the project but it is used by the generator, we put
+    //    it in g.unrequiredChildIndex to save memory usage.
+    case p @ Project(_, g: Generate) if g.child.output.zipWithIndex.exists(
+      pair =>
+      !p.references.contains(pair._1) &&
+        (!g.generator.references.contains(pair._1) || !g.unrequiredChildIndex.contains(pair._2))) =>

Review comment:
       Does it work if we switch the order case branches? Since the rule is a fix-point batch, it can hit `GeneratorNestedColumnAliasing` in the first iteration and `Project(_, g: Generate)` in the next?
   
   ```
       // prune unrequired nested fields from `Generate`.
       // prioritize this case branch over  Project(_, g: Generate) for SPARK-38531
       case GeneratorNestedColumnAliasing(rewrittenPlan) => rewrittenPlan
       
      // prune unrequired references
       case p @ Project(_, g: Generate) if p.references != g.outputSet => ...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sigmod commented on a change in pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
sigmod commented on a change in pull request #35864:
URL: https://github.com/apache/spark/pull/35864#discussion_r838264253



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -831,8 +831,16 @@ object ColumnPruning extends Rule[LogicalPlan] {
       e.copy(child = prunedChild(child, e.references))
 
     // prune unrequired references
-    case p @ Project(_, g: Generate) if p.references != g.outputSet =>
-      val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references
+    // There are 2 types of pruning here:
+    // 1. For attributes in g.child.outputSet that is not used by the generator nor the project,
+    //    we directly remove it from the output list of g.child.
+    // 2. For attributes that is not used by the project but it is used by the generator, we put
+    //    it in g.unrequiredChildIndex to save memory usage.
+    case p @ Project(_, g: Generate) if g.child.output.zipWithIndex.exists(
+      pair =>
+      !p.references.contains(pair._1) &&
+        (!g.generator.references.contains(pair._1) || !g.unrequiredChildIndex.contains(pair._2))) =>

Review comment:
       Some thoughts here:
   - the condition sounds right to me;
   - however, maybe it's just me, but it seems fragile for me to maintain two versions of the same logic forward - in `if` guard and in branch body;
   - `unrequiredChildIndex` is a `Seq` rather than a set. Can it result in quadratic behaviors (probably in extreme cases where both `g.child.output` and `g.generator.references` are large)? 
   
   Does it make sense to use an extractor pattern?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #35864:
URL: https://github.com/apache/spark/pull/35864#issuecomment-1071270701


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sigmod commented on a change in pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
sigmod commented on a change in pull request #35864:
URL: https://github.com/apache/spark/pull/35864#discussion_r838835663



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
##########
@@ -312,6 +312,32 @@ object NestedColumnAliasing {
   }
 }
 
+object GeneratorUnrequiredChildrenPruning {
+  def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
+    case p@Project(_, g: Generate) =>
+      val requiredAttrs = p.references ++ g.generator.references
+      var pruned = false
+      val newChild = if (!g.child.outputSet.subsetOf(requiredAttrs)) {
+        pruned = true
+        Project(g.child.output.filter(requiredAttrs.contains), g.child)
+      } else {
+        g.child
+      }
+      val unrequired = g.generator.references -- p.references
+      val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))
+        .map(_._2)
+      if (unrequiredIndices.toSet != g.unrequiredChildIndex.toSet) {
+        pruned = true
+      }
+      if (pruned) {
+        Some(p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices)))
+      } else {
+        None
+      }

Review comment:
       There seems some logic change in the new code: https://www.diffchecker.com/wEi4OAYD, e.g., how `requiredAttrs` is obtained.
   
   Is it possible to keep the change "mechanical", e.g.,
   
   ```
   case p @ Project(_, g: Generate) if p.references != g.outputSet =>
   val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references	
   val newChild = prunedChild(g.child, requiredAttrs)	
   val unrequired = g.generator.references -- p.references	
   val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))	
           .map(_._2)	
   if (!newChild.fastEq(g.Child) || unrequiredIndices.toSet != g.unrequiredChildIndex.toSet) {
       p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices))
   } else {
       None
   }
   ```
   
   The only difference is the last line so that we still can enter the `GeneratorNestedColumnAliasing` branch if no rewrite happens?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sigmod commented on a change in pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
sigmod commented on a change in pull request #35864:
URL: https://github.com/apache/spark/pull/35864#discussion_r838835663



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
##########
@@ -312,6 +312,32 @@ object NestedColumnAliasing {
   }
 }
 
+object GeneratorUnrequiredChildrenPruning {
+  def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
+    case p@Project(_, g: Generate) =>
+      val requiredAttrs = p.references ++ g.generator.references
+      var pruned = false
+      val newChild = if (!g.child.outputSet.subsetOf(requiredAttrs)) {
+        pruned = true
+        Project(g.child.output.filter(requiredAttrs.contains), g.child)
+      } else {
+        g.child
+      }
+      val unrequired = g.generator.references -- p.references
+      val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))
+        .map(_._2)
+      if (unrequiredIndices.toSet != g.unrequiredChildIndex.toSet) {
+        pruned = true
+      }
+      if (pruned) {
+        Some(p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices)))
+      } else {
+        None
+      }

Review comment:
       There seems some logic change in the new code: https://www.diffchecker.com/wEi4OAYD, e.g., how `requiredAttrs` is obtained.
   
   Is it possible to keep the change "mechanical", e.g.,
   
   ```
   case p @ Project(_, g: Generate) if p.references != g.outputSet =>
   val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references	
   val newChild = prunedChild(g.child, requiredAttrs)	
   val unrequired = g.generator.references -- p.references	
   val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))	
           .map(_._2)	
   if (!newChild.fastEq(g.Child) || unrequiredIndices.toSet != g.unrequiredChildIndex.toSet) {
       p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices))
   } else {
       None
   }
   
   The only difference is the last line so that we still can enter the `GeneratorNestedColumnAliasing` branch if no rewrite happens?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sigmod commented on a change in pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
sigmod commented on a change in pull request #35864:
URL: https://github.com/apache/spark/pull/35864#discussion_r838835663



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
##########
@@ -312,6 +312,32 @@ object NestedColumnAliasing {
   }
 }
 
+object GeneratorUnrequiredChildrenPruning {
+  def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
+    case p@Project(_, g: Generate) =>
+      val requiredAttrs = p.references ++ g.generator.references
+      var pruned = false
+      val newChild = if (!g.child.outputSet.subsetOf(requiredAttrs)) {
+        pruned = true
+        Project(g.child.output.filter(requiredAttrs.contains), g.child)
+      } else {
+        g.child
+      }
+      val unrequired = g.generator.references -- p.references
+      val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))
+        .map(_._2)
+      if (unrequiredIndices.toSet != g.unrequiredChildIndex.toSet) {
+        pruned = true
+      }
+      if (pruned) {
+        Some(p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices)))
+      } else {
+        None
+      }

Review comment:
       There seems some logic change in the new code: https://www.diffchecker.com/wEi4OAYD, e.g., how `requiredAttrs` is obtained.
   
   Is it possible to keep the change "mechanical", e.g.,
   
   ```
   case p @ Project(_, g: Generate) if p.references != g.outputSet =>
   val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references	
   val newChild = prunedChild(g.child, requiredAttrs)	
   val unrequired = g.generator.references -- p.references	
   val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))	
           .map(_._2)	
   if (newChild!=g.Child || unrequiredIndices.toSet != g.unrequiredChildIndex.toSet) {
       p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices))
   } else {
       None
   }
   
   The only difference is the last line so that we still can enter the `GeneratorNestedColumnAliasing` branch if no rewrite happens?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sigmod commented on a change in pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
sigmod commented on a change in pull request #35864:
URL: https://github.com/apache/spark/pull/35864#discussion_r828505854



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -831,8 +831,16 @@ object ColumnPruning extends Rule[LogicalPlan] {
       e.copy(child = prunedChild(child, e.references))
 
     // prune unrequired references
-    case p @ Project(_, g: Generate) if p.references != g.outputSet =>
-      val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references
+    // There are 2 types of pruning here:
+    // 1. For attributes in g.child.outputSet that is not used by the generator nor the project,
+    //    we directly remove it from the output list of g.child.
+    // 2. For attributes that is not used by the project but it is used by the generator, we put
+    //    it in g.unrequiredChildIndex to save memory usage.
+    case p @ Project(_, g: Generate) if g.child.output.zipWithIndex.exists(
+      pair =>
+      !p.references.contains(pair._1) &&
+        (!g.generator.references.contains(pair._1) || !g.unrequiredChildIndex.contains(pair._2))) =>

Review comment:
       Does it work if we switch the order case branches? Since the rule is a fix-point batch, it can hit `GeneratorNestedColumnAliasing` in the first iteration and Project(_, g: Generate) in the next?
   
   ```
       // prune unrequired nested fields from `Generate`.
       // prioritize this case branch over  Project(_, g: Generate) for SPARK-38531
       case GeneratorNestedColumnAliasing(rewrittenPlan) => rewrittenPlan
       
      // prune unrequired references
       case p @ Project(_, g: Generate) if p.references != g.outputSet => ...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sigmod commented on a change in pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
sigmod commented on a change in pull request #35864:
URL: https://github.com/apache/spark/pull/35864#discussion_r838825676



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -831,8 +831,16 @@ object ColumnPruning extends Rule[LogicalPlan] {
       e.copy(child = prunedChild(child, e.references))
 
     // prune unrequired references
-    case p @ Project(_, g: Generate) if p.references != g.outputSet =>
-      val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references
+    // There are 2 types of pruning here:
+    // 1. For attributes in g.child.outputSet that is not used by the generator nor the project,
+    //    we directly remove it from the output list of g.child.
+    // 2. For attributes that is not used by the project but it is used by the generator, we put
+    //    it in g.unrequiredChildIndex to save memory usage.
+    case p @ Project(_, g: Generate) if g.child.output.zipWithIndex.exists(
+      pair =>
+      !p.references.contains(pair._1) &&
+        (!g.generator.references.contains(pair._1) || !g.unrequiredChildIndex.contains(pair._2))) =>

Review comment:
       > I changed to the extractor pattern. What part did you refer to for the quadratic behaviors?
   
   `g.unrequiredChildIndex.contains(pair._2)` is o(n) where n is the size of g.unrequiredChildIndex, so that it will be o(m * n) where m is the g.child.output.size.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35864:
URL: https://github.com/apache/spark/pull/35864#discussion_r827542268



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -831,8 +831,16 @@ object ColumnPruning extends Rule[LogicalPlan] {
       e.copy(child = prunedChild(child, e.references))
 
     // prune unrequired references
-    case p @ Project(_, g: Generate) if p.references != g.outputSet =>
-      val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references
+    // There are 2 types of pruning here:
+    // 1. For attributes in g.child.outputSet that is not used by the generator nor the project,
+    //    we directly remove it from the output list of g.child.
+    // 2. For attributes that is not used by the project but it is used by the generator, we put
+    //    it in g.unrequiredChildIndex to save memory usage.
+    case p @ Project(_, g: Generate) if g.child.output.zipWithIndex.exists(
+      pair =>
+      !p.references.contains(pair._1) &&
+        (!g.generator.references.contains(pair._1) || !g.unrequiredChildIndex.contains(pair._2))) =>

Review comment:
       Hmm, this is not for NestedColumnAliasing only but ColumnPruning. Is it possible to add a test in `ColumnPruningSuite` too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] minyyy commented on a change in pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
minyyy commented on a change in pull request #35864:
URL: https://github.com/apache/spark/pull/35864#discussion_r828412078



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -831,8 +831,16 @@ object ColumnPruning extends Rule[LogicalPlan] {
       e.copy(child = prunedChild(child, e.references))
 
     // prune unrequired references
-    case p @ Project(_, g: Generate) if p.references != g.outputSet =>
-      val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references
+    // There are 2 types of pruning here:
+    // 1. For attributes in g.child.outputSet that is not used by the generator nor the project,
+    //    we directly remove it from the output list of g.child.
+    // 2. For attributes that is not used by the project but it is used by the generator, we put
+    //    it in g.unrequiredChildIndex to save memory usage.
+    case p @ Project(_, g: Generate) if g.child.output.zipWithIndex.exists(
+      pair =>
+      !p.references.contains(pair._1) &&
+        (!g.generator.references.contains(pair._1) || !g.unrequiredChildIndex.contains(pair._2))) =>

Review comment:
       Done. But the 2 tests are the same, if you want I can remove one test.

##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
##########
@@ -313,6 +313,25 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
     comparePlans(optimized, expected)
   }
 
+  test("Nested field pruning for Project and PosExplode") {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] minyyy commented on a change in pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
minyyy commented on a change in pull request #35864:
URL: https://github.com/apache/spark/pull/35864#discussion_r837987367



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -831,8 +831,16 @@ object ColumnPruning extends Rule[LogicalPlan] {
       e.copy(child = prunedChild(child, e.references))
 
     // prune unrequired references
-    case p @ Project(_, g: Generate) if p.references != g.outputSet =>
-      val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references
+    // There are 2 types of pruning here:
+    // 1. For attributes in g.child.outputSet that is not used by the generator nor the project,
+    //    we directly remove it from the output list of g.child.
+    // 2. For attributes that is not used by the project but it is used by the generator, we put
+    //    it in g.unrequiredChildIndex to save memory usage.
+    case p @ Project(_, g: Generate) if g.child.output.zipWithIndex.exists(
+      pair =>
+      !p.references.contains(pair._1) &&
+        (!g.generator.references.contains(pair._1) || !g.unrequiredChildIndex.contains(pair._2))) =>

Review comment:
       Even if we switch the order, we still have to fix the condition otherwise this rule prevents following rules from being executed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sigmod commented on a change in pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
sigmod commented on a change in pull request #35864:
URL: https://github.com/apache/spark/pull/35864#discussion_r838835663



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
##########
@@ -312,6 +312,32 @@ object NestedColumnAliasing {
   }
 }
 
+object GeneratorUnrequiredChildrenPruning {
+  def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
+    case p@Project(_, g: Generate) =>
+      val requiredAttrs = p.references ++ g.generator.references
+      var pruned = false
+      val newChild = if (!g.child.outputSet.subsetOf(requiredAttrs)) {
+        pruned = true
+        Project(g.child.output.filter(requiredAttrs.contains), g.child)
+      } else {
+        g.child
+      }
+      val unrequired = g.generator.references -- p.references
+      val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))
+        .map(_._2)
+      if (unrequiredIndices.toSet != g.unrequiredChildIndex.toSet) {
+        pruned = true
+      }
+      if (pruned) {
+        Some(p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices)))
+      } else {
+        None
+      }

Review comment:
       There seems some logic change in the new code: https://www.diffchecker.com/wEi4OAYD, e.g., how `requiredAttrs` is obtained.
   
   Is it possible to keep the change "mechanical", e.g.,
   
   ```
   case p @ Project(_, g: Generate) if p.references != g.outputSet =>
   val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references	
   val newChild = prunedChild(g.child, requiredAttrs)	
   val unrequired = g.generator.references -- p.references	
   val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))	
           .map(_._2)	
   val newP = if (newChild!=g.Child || unrequiredIndices.toSet != g.unrequiredChildIndex.toSet) {
       p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices))
   } else {
       None
   }
   
   The only difference is the last line so that we still can enter the `GeneratorNestedColumnAliasing` branch if no rewrite happens?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
##########
@@ -312,6 +312,32 @@ object NestedColumnAliasing {
   }
 }
 
+object GeneratorUnrequiredChildrenPruning {
+  def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
+    case p@Project(_, g: Generate) =>

Review comment:
       Nit: spaces around @ ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35864:
URL: https://github.com/apache/spark/pull/35864#discussion_r827541259



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
##########
@@ -313,6 +313,25 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
     comparePlans(optimized, expected)
   }
 
+  test("Nested field pruning for Project and PosExplode") {

Review comment:
       Oh, BTW, this rule is not for NestedColumnAliasing actually. Can you move it to `ColumnPruningSuite`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #35864:
URL: https://github.com/apache/spark/pull/35864#discussion_r827540821



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
##########
@@ -313,6 +313,25 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
     comparePlans(optimized, expected)
   }
 
+  test("Nested field pruning for Project and PosExplode") {

Review comment:
       Can you add JIRA ticket as usual? Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #35864:
URL: https://github.com/apache/spark/pull/35864#issuecomment-1071270701


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] minyyy commented on a change in pull request #35864: [SPARK-38531][SQL] Fix the condition of "Prune unrequired child index" branch of ColumnPruning

Posted by GitBox <gi...@apache.org>.
minyyy commented on a change in pull request #35864:
URL: https://github.com/apache/spark/pull/35864#discussion_r838823323



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -831,8 +831,16 @@ object ColumnPruning extends Rule[LogicalPlan] {
       e.copy(child = prunedChild(child, e.references))
 
     // prune unrequired references
-    case p @ Project(_, g: Generate) if p.references != g.outputSet =>
-      val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references
+    // There are 2 types of pruning here:
+    // 1. For attributes in g.child.outputSet that is not used by the generator nor the project,
+    //    we directly remove it from the output list of g.child.
+    // 2. For attributes that is not used by the project but it is used by the generator, we put
+    //    it in g.unrequiredChildIndex to save memory usage.
+    case p @ Project(_, g: Generate) if g.child.output.zipWithIndex.exists(
+      pair =>
+      !p.references.contains(pair._1) &&
+        (!g.generator.references.contains(pair._1) || !g.unrequiredChildIndex.contains(pair._2))) =>

Review comment:
       I changed to the extractor pattern. What part did you refer to for the `quadratic behaviors`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org