You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/01 07:14:20 UTC

[GitHub] [spark] beliefer opened a new pull request, #38461: [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code for MergeScalarSubqueries

beliefer opened a new pull request, #38461:
URL: https://github.com/apache/spark/pull/38461

   ### What changes were proposed in this pull request?
   Recently, I read the `MergeScalarSubqueries` because it is a feature used for improve performance.
   I fount the parameters of ScalarSubqueryReference is hard to understand, so I want add some comments on it.
   
   Additionally, the private method `supportedAggregateMerge` of `MergeScalarSubqueries` looks redundant, this PR wants simplify the code.
   
   
   ### Why are the changes needed?
   Improve the readability and simplify the code for `MergeScalarSubqueries`.
   
   
   ### Does this PR introduce _any_ user-facing change?
   'No'.
   Just improve the readability and simplify the code for `MergeScalarSubqueries`.
   
   
   ### How was this patch tested?
   Exists tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38461: [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code for MergeScalarSubqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38461:
URL: https://github.com/apache/spark/pull/38461#discussion_r1011362750


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala:
##########
@@ -346,25 +346,20 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
   // Only allow aggregates of the same implementation because merging different implementations
   // could cause performance regression.
   private def supportedAggregateMerge(newPlan: Aggregate, cachedPlan: Aggregate) = {
-    val newPlanAggregateExpressions = newPlan.aggregateExpressions.flatMap(_.collect {
-      case a: AggregateExpression => a
-    })
-    val cachedPlanAggregateExpressions = cachedPlan.aggregateExpressions.flatMap(_.collect {
-      case a: AggregateExpression => a
-    })
-    val newPlanSupportsHashAggregate = Aggregate.supportsHashAggregate(
-      newPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
-    val cachedPlanSupportsHashAggregate = Aggregate.supportsHashAggregate(
-      cachedPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
+    val aggregateExpressionsSeq =
+      Seq(newPlan, cachedPlan).map(plan => plan.aggregateExpressions.flatMap(_.collect {
+        case a: AggregateExpression => a
+      }))

Review Comment:
   ```suggestion
       val aggregateExpressionsSeq = Seq(newPlan, cachedPlan).map { plan => 
         plan.aggregateExpressions.flatMap(_.collect {
           case a: AggregateExpression => a
         })
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #38461: [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code for MergeScalarSubqueries

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #38461:
URL: https://github.com/apache/spark/pull/38461#discussion_r1011086001


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala:
##########
@@ -346,25 +346,19 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
   // Only allow aggregates of the same implementation because merging different implementations
   // could cause performance regression.
   private def supportedAggregateMerge(newPlan: Aggregate, cachedPlan: Aggregate) = {
-    val newPlanAggregateExpressions = newPlan.aggregateExpressions.flatMap(_.collect {
-      case a: AggregateExpression => a
-    })
-    val cachedPlanAggregateExpressions = cachedPlan.aggregateExpressions.flatMap(_.collect {
-      case a: AggregateExpression => a
-    })
-    val newPlanSupportsHashAggregate = Aggregate.supportsHashAggregate(
-      newPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
-    val cachedPlanSupportsHashAggregate = Aggregate.supportsHashAggregate(
-      cachedPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
-    newPlanSupportsHashAggregate && cachedPlanSupportsHashAggregate ||
-      newPlanSupportsHashAggregate == cachedPlanSupportsHashAggregate && {
-        val newPlanSupportsObjectHashAggregate =
-          Aggregate.supportsObjectHashAggregate(newPlanAggregateExpressions)
-        val cachedPlanSupportsObjectHashAggregate =
-          Aggregate.supportsObjectHashAggregate(cachedPlanAggregateExpressions)
-        newPlanSupportsObjectHashAggregate && cachedPlanSupportsObjectHashAggregate ||
-          newPlanSupportsObjectHashAggregate == cachedPlanSupportsObjectHashAggregate
-      }
+    val aggregateExprSeq =
+      Seq(newPlan, cachedPlan).map(plan => plan.aggregateExpressions.flatMap(_.collect {
+        case a: AggregateExpression => a
+      }))
+    val supportsHashAggregates = aggregateExprSeq.map(aggregateExpressions =>

Review Comment:
   @peter-toth 's suggestion could keep the readability and simplify code too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38461: [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code for MergeScalarSubqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38461:
URL: https://github.com/apache/spark/pull/38461#discussion_r1010607741


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala:
##########
@@ -346,25 +346,19 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
   // Only allow aggregates of the same implementation because merging different implementations
   // could cause performance regression.
   private def supportedAggregateMerge(newPlan: Aggregate, cachedPlan: Aggregate) = {
-    val newPlanAggregateExpressions = newPlan.aggregateExpressions.flatMap(_.collect {
-      case a: AggregateExpression => a
-    })
-    val cachedPlanAggregateExpressions = cachedPlan.aggregateExpressions.flatMap(_.collect {
-      case a: AggregateExpression => a
-    })
-    val newPlanSupportsHashAggregate = Aggregate.supportsHashAggregate(
-      newPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
-    val cachedPlanSupportsHashAggregate = Aggregate.supportsHashAggregate(
-      cachedPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
-    newPlanSupportsHashAggregate && cachedPlanSupportsHashAggregate ||
-      newPlanSupportsHashAggregate == cachedPlanSupportsHashAggregate && {
-        val newPlanSupportsObjectHashAggregate =
-          Aggregate.supportsObjectHashAggregate(newPlanAggregateExpressions)
-        val cachedPlanSupportsObjectHashAggregate =
-          Aggregate.supportsObjectHashAggregate(cachedPlanAggregateExpressions)
-        newPlanSupportsObjectHashAggregate && cachedPlanSupportsObjectHashAggregate ||
-          newPlanSupportsObjectHashAggregate == cachedPlanSupportsObjectHashAggregate
-      }
+    val aggregateExprSeq =
+      Seq(newPlan, cachedPlan).map(plan => plan.aggregateExpressions.flatMap(_.collect {
+        case a: AggregateExpression => a
+      }))
+    val supportsHashAggregates = aggregateExprSeq.map(aggregateExpressions =>

Review Comment:
   I feel like the previous code is more readable... Small code duplication doesn't hurt.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38461: [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code for MergeScalarSubqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38461:
URL: https://github.com/apache/spark/pull/38461#discussion_r1011363681


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala:
##########
@@ -346,25 +346,20 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
   // Only allow aggregates of the same implementation because merging different implementations
   // could cause performance regression.
   private def supportedAggregateMerge(newPlan: Aggregate, cachedPlan: Aggregate) = {
-    val newPlanAggregateExpressions = newPlan.aggregateExpressions.flatMap(_.collect {
-      case a: AggregateExpression => a
-    })
-    val cachedPlanAggregateExpressions = cachedPlan.aggregateExpressions.flatMap(_.collect {
-      case a: AggregateExpression => a
-    })
-    val newPlanSupportsHashAggregate = Aggregate.supportsHashAggregate(
-      newPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
-    val cachedPlanSupportsHashAggregate = Aggregate.supportsHashAggregate(
-      cachedPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
+    val aggregateExpressionsSeq =
+      Seq(newPlan, cachedPlan).map(plan => plan.aggregateExpressions.flatMap(_.collect {
+        case a: AggregateExpression => a
+      }))
+    val Seq(newPlanSupportsHashAggregate, cachedPlanSupportsHashAggregate) =
+      aggregateExpressionsSeq.map(aggregateExpressions => Aggregate.supportsHashAggregate(
+        aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)))
+    lazy val Seq(newPlanSupportsObjectHashAggregate, cachedPlanSupportsObjectHashAggregate) =
+      aggregateExpressionsSeq.map(aggregateExpressions =>
+        Aggregate.supportsObjectHashAggregate(aggregateExpressions))
     newPlanSupportsHashAggregate && cachedPlanSupportsHashAggregate ||
-      newPlanSupportsHashAggregate == cachedPlanSupportsHashAggregate && {
-        val newPlanSupportsObjectHashAggregate =
-          Aggregate.supportsObjectHashAggregate(newPlanAggregateExpressions)
-        val cachedPlanSupportsObjectHashAggregate =
-          Aggregate.supportsObjectHashAggregate(cachedPlanAggregateExpressions)
-        newPlanSupportsObjectHashAggregate && cachedPlanSupportsObjectHashAggregate ||
-          newPlanSupportsObjectHashAggregate == cachedPlanSupportsObjectHashAggregate
-      }
+      newPlanSupportsHashAggregate == cachedPlanSupportsHashAggregate &&

Review Comment:
   we can avoid using lazy val
   ```
   newPlanSupportsHashAggregate == cachedPlanSupportsHashAggregate && {
     val Seq(newPlanSupportsObjectHashAggregate, cachedPlanSupportsObjectHashAggregate) = ...
     ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #38461: [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code for MergeScalarSubqueries

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #38461:
URL: https://github.com/apache/spark/pull/38461#issuecomment-1302842869

   @cloud-fan @peter-toth Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #38461: [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code for MergeScalarSubqueries

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #38461:
URL: https://github.com/apache/spark/pull/38461#discussion_r1011416561


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala:
##########
@@ -346,25 +346,20 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
   // Only allow aggregates of the same implementation because merging different implementations
   // could cause performance regression.
   private def supportedAggregateMerge(newPlan: Aggregate, cachedPlan: Aggregate) = {
-    val newPlanAggregateExpressions = newPlan.aggregateExpressions.flatMap(_.collect {
-      case a: AggregateExpression => a
-    })
-    val cachedPlanAggregateExpressions = cachedPlan.aggregateExpressions.flatMap(_.collect {
-      case a: AggregateExpression => a
-    })
-    val newPlanSupportsHashAggregate = Aggregate.supportsHashAggregate(
-      newPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
-    val cachedPlanSupportsHashAggregate = Aggregate.supportsHashAggregate(
-      cachedPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
+    val aggregateExpressionsSeq =
+      Seq(newPlan, cachedPlan).map(plan => plan.aggregateExpressions.flatMap(_.collect {
+        case a: AggregateExpression => a
+      }))
+    val Seq(newPlanSupportsHashAggregate, cachedPlanSupportsHashAggregate) =
+      aggregateExpressionsSeq.map(aggregateExpressions => Aggregate.supportsHashAggregate(
+        aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)))
+    lazy val Seq(newPlanSupportsObjectHashAggregate, cachedPlanSupportsObjectHashAggregate) =
+      aggregateExpressionsSeq.map(aggregateExpressions =>
+        Aggregate.supportsObjectHashAggregate(aggregateExpressions))
     newPlanSupportsHashAggregate && cachedPlanSupportsHashAggregate ||
-      newPlanSupportsHashAggregate == cachedPlanSupportsHashAggregate && {
-        val newPlanSupportsObjectHashAggregate =
-          Aggregate.supportsObjectHashAggregate(newPlanAggregateExpressions)
-        val cachedPlanSupportsObjectHashAggregate =
-          Aggregate.supportsObjectHashAggregate(cachedPlanAggregateExpressions)
-        newPlanSupportsObjectHashAggregate && cachedPlanSupportsObjectHashAggregate ||
-          newPlanSupportsObjectHashAggregate == cachedPlanSupportsObjectHashAggregate
-      }
+      newPlanSupportsHashAggregate == cachedPlanSupportsHashAggregate &&

Review Comment:
   I got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #38461: [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code for MergeScalarSubqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #38461: [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code for MergeScalarSubqueries
URL: https://github.com/apache/spark/pull/38461


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] peter-toth commented on a diff in pull request #38461: [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code for MergeScalarSubqueries

Posted by GitBox <gi...@apache.org>.
peter-toth commented on code in PR #38461:
URL: https://github.com/apache/spark/pull/38461#discussion_r1010548429


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala:
##########
@@ -346,25 +346,19 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
   // Only allow aggregates of the same implementation because merging different implementations
   // could cause performance regression.
   private def supportedAggregateMerge(newPlan: Aggregate, cachedPlan: Aggregate) = {
-    val newPlanAggregateExpressions = newPlan.aggregateExpressions.flatMap(_.collect {
-      case a: AggregateExpression => a
-    })
-    val cachedPlanAggregateExpressions = cachedPlan.aggregateExpressions.flatMap(_.collect {
-      case a: AggregateExpression => a
-    })
-    val newPlanSupportsHashAggregate = Aggregate.supportsHashAggregate(
-      newPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
-    val cachedPlanSupportsHashAggregate = Aggregate.supportsHashAggregate(
-      cachedPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
-    newPlanSupportsHashAggregate && cachedPlanSupportsHashAggregate ||
-      newPlanSupportsHashAggregate == cachedPlanSupportsHashAggregate && {
-        val newPlanSupportsObjectHashAggregate =
-          Aggregate.supportsObjectHashAggregate(newPlanAggregateExpressions)
-        val cachedPlanSupportsObjectHashAggregate =
-          Aggregate.supportsObjectHashAggregate(cachedPlanAggregateExpressions)
-        newPlanSupportsObjectHashAggregate && cachedPlanSupportsObjectHashAggregate ||
-          newPlanSupportsObjectHashAggregate == cachedPlanSupportsObjectHashAggregate
-      }
+    val aggregateExprSeq =
+      Seq(newPlan, cachedPlan).map(plan => plan.aggregateExpressions.flatMap(_.collect {
+        case a: AggregateExpression => a
+      }))
+    val supportsHashAggregates = aggregateExprSeq.map(aggregateExpressions =>

Review Comment:
   Thanks @beliefer for the PR. I'm ok with the changes. Only a nit that you could probably use `val Seq(newPlanSupportsHashAggregates, cachedPlanSupportsHashAggregates) = ...` syntax here to avoid using `.head` and `.last`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #38461: [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code for MergeScalarSubqueries

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #38461:
URL: https://github.com/apache/spark/pull/38461#issuecomment-1301951436

   ping @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #38461: [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code for MergeScalarSubqueries

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #38461:
URL: https://github.com/apache/spark/pull/38461#issuecomment-1302062270

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #38461: [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code for MergeScalarSubqueries

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #38461:
URL: https://github.com/apache/spark/pull/38461#issuecomment-1298447704

   ping @peter-toth cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org