You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/08/09 08:52:22 UTC

[spark] branch master updated: [SPARK-36424][SQL] Support eliminate limits in AQE Optimizer

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bb6f65a  [SPARK-36424][SQL] Support eliminate limits in AQE Optimizer
bb6f65a is described below

commit bb6f65acca2918a0ceb13b612d210f1b46fa1add
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Mon Aug 9 16:51:51 2021 +0800

    [SPARK-36424][SQL] Support eliminate limits in AQE Optimizer
    
    ### What changes were proposed in this pull request?
    
    * override the maxRows method in `LogicalQueryStage`
    * add rule `EliminateLimits` in `AQEOptimizer`
    
    ### Why are the changes needed?
    
    In Ad-hoc scenario, we always add limit for the query if user have no special limit value, but not all limit is nesessary.
    
    With the power of AQE, we can eliminate limits using running statistics.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    add test
    
    Closes #33651 from ulysses-you/SPARK-36424.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  8 ++---
 .../sql/execution/adaptive/AQEOptimizer.scala      |  5 ++--
 .../sql/execution/adaptive/LogicalQueryStage.scala |  2 ++
 .../adaptive/AdaptiveQueryExecSuite.scala          | 34 +++++++++++++++++++++-
 4 files changed, 42 insertions(+), 7 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 8f508d5..9683425 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1654,7 +1654,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
- * This rule optimizes Limit operators by:
+ * This rule is applied by both normal and AQE Optimizer, and optimizes Limit operators by:
  * 1. Eliminate [[Limit]]/[[GlobalLimit]] operators if it's child max row <= limit.
  * 2. Combines two adjacent [[Limit]] operators into one, merging the
  *    expressions into one single expression.
@@ -1672,11 +1672,11 @@ object EliminateLimits extends Rule[LogicalPlan] {
       child
 
     case GlobalLimit(le, GlobalLimit(ne, grandChild)) =>
-      GlobalLimit(Least(Seq(ne, le)), grandChild)
+      GlobalLimit(Literal(Least(Seq(ne, le)).eval().asInstanceOf[Int]), grandChild)
     case LocalLimit(le, LocalLimit(ne, grandChild)) =>
-      LocalLimit(Least(Seq(ne, le)), grandChild)
+      LocalLimit(Literal(Least(Seq(ne, le)).eval().asInstanceOf[Int]), grandChild)
     case Limit(le, Limit(ne, grandChild)) =>
-      Limit(Least(Seq(ne, le)), grandChild)
+      Limit(Literal(Least(Seq(ne, le)).eval().asInstanceOf[Int]), grandChild)
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
index f8cba90..ea1ab8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.adaptive
 
 import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability
-import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
+import org.apache.spark.sql.catalyst.optimizer.{ConvertToLocalRelation, EliminateLimits}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LogicalPlanIntegrity, PlanHelper}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.internal.SQLConf
@@ -39,7 +39,8 @@ class AQEOptimizer(conf: SQLConf) extends RuleExecutor[LogicalPlan] {
       AQEPropagateEmptyRelation,
       ConvertToLocalRelation,
       UpdateAttributeNullability),
-    Batch("Dynamic Join Selection", Once, DynamicJoinSelection)
+    Batch("Dynamic Join Selection", Once, DynamicJoinSelection),
+    Batch("Eliminate Limits", Once, EliminateLimits)
   )
 
   final override protected def batches: Seq[Batch] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala
index 8bb3708..f8b7867 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala
@@ -54,4 +54,6 @@ case class LogicalQueryStage(
     }
     physicalStats.getOrElse(logicalPlan.stats)
   }
+
+  override def maxRows: Option[Long] = stats.rowCount.map(_.min(Long.MaxValue).toLong)
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index ca8295e..d38a641 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe
 import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
-import org.apache.spark.sql.execution.{CommandResultExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
 import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
@@ -126,6 +126,12 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  private def findTopLevelLimit(plan: SparkPlan): Seq[CollectLimitExec] = {
+    collect(plan) {
+      case l: CollectLimitExec => l
+    }
+  }
+
   private def findReusedExchange(plan: SparkPlan): Seq[ReusedExchangeExec] = {
     collectWithSubqueries(plan) {
       case ShuffleQueryStageExec(_, e: ReusedExchangeExec, _) => e
@@ -2008,6 +2014,32 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-36424: Support eliminate limits in AQE Optimizer") {
+    withTempView("v") {
+      spark.sparkContext.parallelize(
+        (1 to 10).map(i => TestData(i, if (i > 2) "2" else i.toString)), 2)
+        .toDF("c1", "c2").createOrReplaceTempView("v")
+
+      withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+        SQLConf.SHUFFLE_PARTITIONS.key -> "3") {
+        val (origin1, adaptive1) = runAdaptiveAndVerifyResult(
+          """
+            |SELECT c2, sum(c1) FROM v GROUP BY c2 LIMIT 5
+          """.stripMargin)
+        assert(findTopLevelLimit(origin1).size == 1)
+        assert(findTopLevelLimit(adaptive1).isEmpty)
+
+        // eliminate limit through filter
+        val (origin2, adaptive2) = runAdaptiveAndVerifyResult(
+          """
+            |SELECT c2, sum(c1) FROM v GROUP BY c2 HAVING sum(c1) > 1 LIMIT 5
+          """.stripMargin)
+        assert(findTopLevelLimit(origin2).size == 1)
+        assert(findTopLevelLimit(adaptive2).isEmpty)
+      }
+    }
+  }
 }
 
 /**

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org