You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/10 08:00:03 UTC

[GitHub] [spark] ulysses-you opened a new pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

ulysses-you opened a new pull request #35473:
URL: https://github.com/apache/spark/pull/35473


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   - Add a new rule `OptimizeOneMaxRowPlan` in normal Optimizer and AQE Optimizer.
   - Move the similar optimization of `EliminateSorts` into `OptimizeOneMaxRowPlan`, also update its comment and test
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   Optimize the plan if its max row is equal to or less than 1 in these cases:
   
   - if the child of sort max rows less than or equal to 1, remove the sort
   - if the child of local sort max rows per partition less than or equal to 1, remove the local sort
   - if the child of aggregate max rows less than or equal to 1 and it's grouping only (include the rewritten distinct plan), remove the aggregate
   - if the child of aggregate max rows less than or equal to 1, set distinct to false in all aggregate expression
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   no, only change the plan
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   - Add a new test `OptimizeOneMaxRowPlanSuite` for normal optimizer
   - Add test in `AdaptiveQueryExecSuite` for AQE optimizer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35473:
URL: https://github.com/apache/spark/pull/35473#discussion_r810853755



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -454,6 +455,40 @@ object EliminateAggregateFilter extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * The rule is applied both normal and AQE Optimizer. It optimizes plan using max rows:
+ *   - if the child of sort max rows less than or equal to 1, remove the sort
+ *   - if the child of local sort max rows per partition less than or equal to 1, remove the
+ *     local sort
+ *   - if the child of aggregate max rows less than or equal to 1 and its output is subset of
+ *     its child and it's grouping only(include the rewritten distinct plan), remove the aggregate
+ *   - if the child of aggregate max rows less than or equal to 1, set distinct to false in all
+ *     aggregate expression
+ */
+object OptimizeOneRowPlan extends Rule[LogicalPlan] {
+  private def maxRowNotLargerThanOne(plan: LogicalPlan): Boolean = {
+    plan.maxRows.exists(_ <= 1L)
+  }
+
+  private def maxRowPerPartitionNotLargerThanOne(plan: LogicalPlan): Boolean = {
+    plan.maxRowsPerPartition.exists(_ <= 1L)
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformDownWithPruning(_.containsAnyPattern(SORT, AGGREGATE), ruleId) {

Review comment:
       since this rule removes node, I think transform up should be more efficient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35473:
URL: https://github.com/apache/spark/pull/35473#discussion_r810851479



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -454,6 +455,40 @@ object EliminateAggregateFilter extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * The rule is applied both normal and AQE Optimizer. It optimizes plan using max rows:
+ *   - if the child of sort max rows less than or equal to 1, remove the sort
+ *   - if the child of local sort max rows per partition less than or equal to 1, remove the
+ *     local sort
+ *   - if the child of aggregate max rows less than or equal to 1 and its output is subset of
+ *     its child and it's grouping only(include the rewritten distinct plan), remove the aggregate
+ *   - if the child of aggregate max rows less than or equal to 1, set distinct to false in all
+ *     aggregate expression
+ */
+object OptimizeOneRowPlan extends Rule[LogicalPlan] {

Review comment:
       can we put it in a new file?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35473:
URL: https://github.com/apache/spark/pull/35473#discussion_r810851715



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -454,6 +455,40 @@ object EliminateAggregateFilter extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * The rule is applied both normal and AQE Optimizer. It optimizes plan using max rows:
+ *   - if the child of sort max rows less than or equal to 1, remove the sort

Review comment:
       ```suggestion
    *   - if the max rows of the child of sort less than or equal to 1, remove the sort
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on pull request #35473:
URL: https://github.com/apache/spark/pull/35473#issuecomment-1038536460


   previous test failed caused by bug, now is ok. cc @HyukjinKwon @cloud-fan @maryannxue  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35473:
URL: https://github.com/apache/spark/pull/35473#discussion_r811610092



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeOneRowPlan.scala
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.catalyst.trees.TreePattern._
+
+/**
+ * The rule is applied both normal and AQE Optimizer. It optimizes plan using max rows:
+ *   - if the max rows of the child of sort less than or equal to 1, remove the sort

Review comment:
       ```suggestion
    *   - if the max rows of the child of sort is less than or equal to 1, remove the sort
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35473:
URL: https://github.com/apache/spark/pull/35473#discussion_r810853031



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -454,6 +455,40 @@ object EliminateAggregateFilter extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * The rule is applied both normal and AQE Optimizer. It optimizes plan using max rows:
+ *   - if the child of sort max rows less than or equal to 1, remove the sort
+ *   - if the child of local sort max rows per partition less than or equal to 1, remove the
+ *     local sort
+ *   - if the child of aggregate max rows less than or equal to 1 and its output is subset of
+ *     its child and it's grouping only(include the rewritten distinct plan), remove the aggregate
+ *   - if the child of aggregate max rows less than or equal to 1, set distinct to false in all
+ *     aggregate expression
+ */
+object OptimizeOneRowPlan extends Rule[LogicalPlan] {
+  private def maxRowNotLargerThanOne(plan: LogicalPlan): Boolean = {
+    plan.maxRows.exists(_ <= 1L)
+  }
+
+  private def maxRowPerPartitionNotLargerThanOne(plan: LogicalPlan): Boolean = {
+    plan.maxRowsPerPartition.exists(_ <= 1L)
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformDownWithPruning(_.containsAnyPattern(SORT, AGGREGATE), ruleId) {
+      case Sort(_, _, child) if maxRowNotLargerThanOne(child) => child
+      case Sort(_, false, child) if maxRowPerPartitionNotLargerThanOne(child) => child
+      case agg @ Aggregate(_, _, child) if agg.groupOnly &&
+        agg.outputSet.subsetOf(child.outputSet) && maxRowNotLargerThanOne(child) => child

Review comment:
       won't this change the query plan output columns? I think a clear idea is: if child outputs at most one row, we can turn group-only aggregate into a project.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #35473:
URL: https://github.com/apache/spark/pull/35473#issuecomment-1035803911


   cc @maryannxue FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on pull request #35473:
URL: https://github.com/apache/spark/pull/35473#issuecomment-1038536460


   previous test failed caused by bug, now is ok. cc @HyukjinKwon @cloud-fan @maryannxue  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a change in pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on a change in pull request #35473:
URL: https://github.com/apache/spark/pull/35473#discussion_r811195794



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
##########
@@ -432,7 +432,7 @@ package object dsl {
       def groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*): LogicalPlan = {
         val aliasedExprs = aggregateExprs.map {
           case ne: NamedExpression => ne
-          case e => Alias(e, e.toString)()
+          case e => UnresolvedAlias(e)

Review comment:
       it seems a small bug in test, the name will be an unresolved string if there is no alias specified.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #35473:
URL: https://github.com/apache/spark/pull/35473


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35473:
URL: https://github.com/apache/spark/pull/35473#discussion_r811610570



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeOneRowPlan.scala
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.catalyst.trees.TreePattern._
+
+/**
+ * The rule is applied both normal and AQE Optimizer. It optimizes plan using max rows:
+ *   - if the max rows of the child of sort less than or equal to 1, remove the sort
+ *   - if the max rows per partition of the child of local sort less than or equal to 1,
+ *     remove the local sort
+ *   - if the max rows of the child of aggregate less than or equal to 1 and its child and
+ *     it's grouping only(include the rewritten distinct plan), convert aggregate to project
+ *   - if the max rows of the child of aggregate less than or equal to 1,
+ *     set distinct to false in all aggregate expression
+ */
+object OptimizeOneRowPlan extends Rule[LogicalPlan] {
+  private def maxRowNotLargerThanOne(plan: LogicalPlan): Boolean = {
+    plan.maxRows.exists(_ <= 1L)

Review comment:
       Seems the code itself is simple and clean, we don't need to create a method for it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on pull request #35473:
URL: https://github.com/apache/spark/pull/35473#issuecomment-1039807186


   A weird case is `Sample` with `withReplacement=true`. The underlying `SampleExec` may output more rows than `maxRows`.
   
   see https://github.com/apache/spark/pull/35250


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35473:
URL: https://github.com/apache/spark/pull/35473#discussion_r810854787



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1407,7 +1441,6 @@ object EliminateSorts extends Rule[LogicalPlan] {
     _.containsPattern(SORT))(applyLocally)
 
   private val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
-    case Sort(_, _, child) if child.maxRows.exists(_ <= 1L) => recursiveRemoveSort(child)

Review comment:
       hmm, are you sure the new rule can fully cover this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a change in pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on a change in pull request #35473:
URL: https://github.com/apache/spark/pull/35473#discussion_r811192145



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1407,7 +1441,6 @@ object EliminateSorts extends Rule[LogicalPlan] {
     _.containsPattern(SORT))(applyLocally)
 
   private val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
-    case Sort(_, _, child) if child.maxRows.exists(_ <= 1L) => recursiveRemoveSort(child)

Review comment:
       I think it is. The `EliminateLimits` only run `Once` , and the added rule run `fixedPoint`. It's no harmful since we have transformWithPruning




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a change in pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on a change in pull request #35473:
URL: https://github.com/apache/spark/pull/35473#discussion_r811809065



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -2484,6 +2491,38 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-38162: Optimize one row plan in AQE Optimizer") {
+    withTempView("v") {
+      spark.sparkContext.parallelize(
+        (1 to 4).map(i => TestData( i, i.toString)), 2)
+        .toDF("c1", "c2").createOrReplaceTempView("v")
+
+      // remove sort
+      val (origin1, adaptive1) = runAdaptiveAndVerifyResult(
+        """
+          |SELECT * FROM v where c1 = 1 order by c1, c2
+          |""".stripMargin)
+      assert(findTopLevelSort(origin1).size == 1)
+      assert(findTopLevelSort(adaptive1).isEmpty)
+
+      // convert group only aggregate to project
+      val (origin2, adaptive2) = runAdaptiveAndVerifyResult(
+        """
+          |SELECT distinct c1 FROM (SELECT /*+ repartition(c1) */ * FROM v where c1 = 1)

Review comment:
       nothing happens, the aggregate node is inside the logical query stage, so we can not optimize it at logical side:
   
   `LogicalQueryStage(logicalAgg: Aggregate, physicalAgg: BaseAggregateExec)`
   
   And the plan inside physicalAgg:
   ```sql
   BaseAggregateExec final
     ShuffleQueryStage
       Exchange
         BaseAggregateExec partial
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on pull request #35473:
URL: https://github.com/apache/spark/pull/35473#issuecomment-1039807186


   A weird case is `Sample` with `withReplacement=true`. The underlying `SampleExec` may output more rows than `maxRows`.
   
   see https://github.com/apache/spark/pull/35250


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #35473:
URL: https://github.com/apache/spark/pull/35473#issuecomment-1048512891


   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35473:
URL: https://github.com/apache/spark/pull/35473#discussion_r811614426



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -2484,6 +2491,38 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-38162: Optimize one row plan in AQE Optimizer") {
+    withTempView("v") {
+      spark.sparkContext.parallelize(
+        (1 to 4).map(i => TestData( i, i.toString)), 2)
+        .toDF("c1", "c2").createOrReplaceTempView("v")
+
+      // remove sort
+      val (origin1, adaptive1) = runAdaptiveAndVerifyResult(
+        """
+          |SELECT * FROM v where c1 = 1 order by c1, c2
+          |""".stripMargin)
+      assert(findTopLevelSort(origin1).size == 1)
+      assert(findTopLevelSort(adaptive1).isEmpty)
+
+      // convert group only aggregate to project
+      val (origin2, adaptive2) = runAdaptiveAndVerifyResult(
+        """
+          |SELECT distinct c1 FROM (SELECT /*+ repartition(c1) */ * FROM v where c1 = 1)
+          |""".stripMargin)
+      assert(findTopLevelAggregate(origin2).size == 2)
+      assert(findTopLevelAggregate(adaptive2).isEmpty)
+
+      // remove distinct in aggregate
+      val (origin3, adaptive3) = runAdaptiveAndVerifyResult(
+        """
+          |SELECT sum(distinct c1) FROM (SELECT /*+ repartition(c1) */ * FROM v where c1 = 1)

Review comment:
       same question




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35473:
URL: https://github.com/apache/spark/pull/35473#discussion_r811614292



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -2484,6 +2491,38 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-38162: Optimize one row plan in AQE Optimizer") {
+    withTempView("v") {
+      spark.sparkContext.parallelize(
+        (1 to 4).map(i => TestData( i, i.toString)), 2)
+        .toDF("c1", "c2").createOrReplaceTempView("v")
+
+      // remove sort
+      val (origin1, adaptive1) = runAdaptiveAndVerifyResult(
+        """
+          |SELECT * FROM v where c1 = 1 order by c1, c2
+          |""".stripMargin)
+      assert(findTopLevelSort(origin1).size == 1)
+      assert(findTopLevelSort(adaptive1).isEmpty)
+
+      // convert group only aggregate to project
+      val (origin2, adaptive2) = runAdaptiveAndVerifyResult(
+        """
+          |SELECT distinct c1 FROM (SELECT /*+ repartition(c1) */ * FROM v where c1 = 1)

Review comment:
       what happens if there is no `/*+ repartition(c1) */`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35473:
URL: https://github.com/apache/spark/pull/35473#discussion_r811613615



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -2484,6 +2491,38 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-38162: Optimize one row plan in AQE Optimizer") {
+    withTempView("v") {
+      spark.sparkContext.parallelize(
+        (1 to 4).map(i => TestData( i, i.toString)), 2)

Review comment:
       ```suggestion
           (1 to 4).map(i => TestData(i, i.toString)), 2)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ulysses-you commented on a change in pull request #35473: [SPARK-38162][SQL] Optimize one row plan in normal and AQE Optimizer

Posted by GitBox <gi...@apache.org>.
ulysses-you commented on a change in pull request #35473:
URL: https://github.com/apache/spark/pull/35473#discussion_r811193079



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -454,6 +455,40 @@ object EliminateAggregateFilter extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * The rule is applied both normal and AQE Optimizer. It optimizes plan using max rows:
+ *   - if the child of sort max rows less than or equal to 1, remove the sort
+ *   - if the child of local sort max rows per partition less than or equal to 1, remove the
+ *     local sort
+ *   - if the child of aggregate max rows less than or equal to 1 and its output is subset of
+ *     its child and it's grouping only(include the rewritten distinct plan), remove the aggregate
+ *   - if the child of aggregate max rows less than or equal to 1, set distinct to false in all
+ *     aggregate expression
+ */
+object OptimizeOneRowPlan extends Rule[LogicalPlan] {
+  private def maxRowNotLargerThanOne(plan: LogicalPlan): Boolean = {
+    plan.maxRows.exists(_ <= 1L)
+  }
+
+  private def maxRowPerPartitionNotLargerThanOne(plan: LogicalPlan): Boolean = {
+    plan.maxRowsPerPartition.exists(_ <= 1L)
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformDownWithPruning(_.containsAnyPattern(SORT, AGGREGATE), ruleId) {
+      case Sort(_, _, child) if maxRowNotLargerThanOne(child) => child
+      case Sort(_, false, child) if maxRowPerPartitionNotLargerThanOne(child) => child
+      case agg @ Aggregate(_, _, child) if agg.groupOnly &&
+        agg.outputSet.subsetOf(child.outputSet) && maxRowNotLargerThanOne(child) => child

Review comment:
       yes, it's more general. updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org