You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2024/02/04 01:47:56 UTC

(kyuubi) branch master updated: [KYUUBI #5377][FOLLOWUP] Get limit from more spark plan and regard result max rows

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 576379c67 [KYUUBI #5377][FOLLOWUP] Get limit from more spark plan and regard result max rows
576379c67 is described below

commit 576379c677107b21cd3e9fbc212654de9f56cbc7
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Sat Feb 3 17:47:48 2024 -0800

    [KYUUBI #5377][FOLLOWUP] Get limit from more spark plan and regard result max rows
    
    # :mag: Description
    Followup #5591
    Support to get existing limit from more plan and regard the result max rows.
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #5963 from turboFei/incremental_save.
    
    Closes #5377
    
    223d51034 [Fei Wang] use optimized plan
    ecefc2a9d [Fei Wang] use spark plan
    57091e5f9 [Fei Wang] minor
    209614492 [Fei Wang] for logical plan
    0f734eea7 [Fei Wang] ut
    fdc115541 [Fei Wang] save
    f8e405af8 [Fei Wang] math.min
    
    Authored-by: Fei Wang <fw...@ebay.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 .../spark/sql/kyuubi/SparkDatasetHelper.scala      | 21 ++++++----
 .../spark/sql/kyuubi/SparkDatasetHelperSuite.scala | 45 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 7 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 16f597cdb..e8f90ecee 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -25,9 +25,9 @@ import org.apache.spark.network.util.{ByteUnit, JavaUtils}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
-import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, SparkPlan, SQLExecution}
-import org.apache.spark.sql.execution.HiveResult
+import org.apache.spark.sql.execution.{CollectLimitExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -295,16 +295,23 @@ object SparkDatasetHelper extends Logging {
     SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq)
   }
 
+  private[kyuubi] def optimizedPlanLimit(queryExecution: QueryExecution): Option[Long] =
+    queryExecution.optimizedPlan match {
+      case globalLimit: GlobalLimit => globalLimit.maxRows
+      case _ => None
+    }
+
   def shouldSaveResultToFs(resultMaxRows: Int, minSize: Long, result: DataFrame): Boolean = {
     if (isCommandExec(result.queryExecution.executedPlan.nodeName)) {
       return false
     }
-    lazy val limit = result.queryExecution.executedPlan match {
-      case collectLimit: CollectLimitExec => collectLimit.limit
-      case _ => resultMaxRows
+    val finalLimit = optimizedPlanLimit(result.queryExecution) match {
+      case Some(limit) if resultMaxRows > 0 => math.min(limit, resultMaxRows)
+      case Some(limit) => limit
+      case None => resultMaxRows
     }
-    lazy val stats = if (limit > 0) {
-      limit * EstimationUtils.getSizePerRow(
+    lazy val stats = if (finalLimit > 0) {
+      finalLimit * EstimationUtils.getSizePerRow(
         result.queryExecution.executedPlan.output)
     } else {
       result.queryExecution.optimizedPlan.stats.sizeInBytes
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala
new file mode 100644
index 000000000..8ac00e602
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kyuubi
+
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+
+class SparkDatasetHelperSuite extends WithSparkSQLEngine {
+  override def withKyuubiConf: Map[String, String] = Map.empty
+
+  test("get limit from spark plan") {
+    Seq(true, false).foreach { aqe =>
+      val topKThreshold = 3
+      spark.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, aqe)
+      spark.sessionState.conf.setConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD, topKThreshold)
+      spark.sql("CREATE OR REPLACE TEMPORARY VIEW tv AS" +
+        " SELECT * FROM VALUES(1),(2),(3),(4) AS t(id)")
+
+      val topKStatement = s"SELECT * FROM(SELECT * FROM tv ORDER BY id LIMIT ${topKThreshold - 1})"
+      assert(SparkDatasetHelper.optimizedPlanLimit(
+        spark.sql(topKStatement).queryExecution) === Option(topKThreshold - 1))
+
+      val collectLimitStatement =
+        s"SELECT * FROM (SELECT * FROM tv ORDER BY id LIMIT $topKThreshold)"
+      assert(SparkDatasetHelper.optimizedPlanLimit(
+        spark.sql(collectLimitStatement).queryExecution) === Option(topKThreshold))
+    }
+  }
+}