You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2024/02/04 01:47:56 UTC
(kyuubi) branch master updated: [KYUUBI #5377][FOLLOWUP] Get limit from more spark plan and regard result max rows
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 576379c67 [KYUUBI #5377][FOLLOWUP] Get limit from more spark plan and regard result max rows
576379c67 is described below
commit 576379c677107b21cd3e9fbc212654de9f56cbc7
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Sat Feb 3 17:47:48 2024 -0800
[KYUUBI #5377][FOLLOWUP] Get limit from more spark plan and regard result max rows
# :mag: Description
Followup #5591
Support to get existing limit from more plan and regard the result max rows.
## Issue References ๐
This pull request fixes #
## Describe Your Solution ๐ง
Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #5963 from turboFei/incremental_save.
Closes #5377
223d51034 [Fei Wang] use optimized plan
ecefc2a9d [Fei Wang] use spark plan
57091e5f9 [Fei Wang] minor
209614492 [Fei Wang] for logical plan
0f734eea7 [Fei Wang] ut
fdc115541 [Fei Wang] save
f8e405af8 [Fei Wang] math.min
Authored-by: Fei Wang <fw...@ebay.com>
Signed-off-by: Fei Wang <fw...@ebay.com>
---
.../spark/sql/kyuubi/SparkDatasetHelper.scala | 21 ++++++----
.../spark/sql/kyuubi/SparkDatasetHelperSuite.scala | 45 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 7 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 16f597cdb..e8f90ecee 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -25,9 +25,9 @@ import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
-import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, SparkPlan, SQLExecution}
-import org.apache.spark.sql.execution.HiveResult
+import org.apache.spark.sql.execution.{CollectLimitExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -295,16 +295,23 @@ object SparkDatasetHelper extends Logging {
SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq)
}
+ private[kyuubi] def optimizedPlanLimit(queryExecution: QueryExecution): Option[Long] =
+ queryExecution.optimizedPlan match {
+ case globalLimit: GlobalLimit => globalLimit.maxRows
+ case _ => None
+ }
+
def shouldSaveResultToFs(resultMaxRows: Int, minSize: Long, result: DataFrame): Boolean = {
if (isCommandExec(result.queryExecution.executedPlan.nodeName)) {
return false
}
- lazy val limit = result.queryExecution.executedPlan match {
- case collectLimit: CollectLimitExec => collectLimit.limit
- case _ => resultMaxRows
+ val finalLimit = optimizedPlanLimit(result.queryExecution) match {
+ case Some(limit) if resultMaxRows > 0 => math.min(limit, resultMaxRows)
+ case Some(limit) => limit
+ case None => resultMaxRows
}
- lazy val stats = if (limit > 0) {
- limit * EstimationUtils.getSizePerRow(
+ lazy val stats = if (finalLimit > 0) {
+ finalLimit * EstimationUtils.getSizePerRow(
result.queryExecution.executedPlan.output)
} else {
result.queryExecution.optimizedPlan.stats.sizeInBytes
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala
new file mode 100644
index 000000000..8ac00e602
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelperSuite.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kyuubi
+
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+
+class SparkDatasetHelperSuite extends WithSparkSQLEngine {
+ override def withKyuubiConf: Map[String, String] = Map.empty
+
+ test("get limit from spark plan") {
+ Seq(true, false).foreach { aqe =>
+ val topKThreshold = 3
+ spark.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, aqe)
+ spark.sessionState.conf.setConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD, topKThreshold)
+ spark.sql("CREATE OR REPLACE TEMPORARY VIEW tv AS" +
+ " SELECT * FROM VALUES(1),(2),(3),(4) AS t(id)")
+
+ val topKStatement = s"SELECT * FROM(SELECT * FROM tv ORDER BY id LIMIT ${topKThreshold - 1})"
+ assert(SparkDatasetHelper.optimizedPlanLimit(
+ spark.sql(topKStatement).queryExecution) === Option(topKThreshold - 1))
+
+ val collectLimitStatement =
+ s"SELECT * FROM (SELECT * FROM tv ORDER BY id LIMIT $topKThreshold)"
+ assert(SparkDatasetHelper.optimizedPlanLimit(
+ spark.sql(collectLimitStatement).queryExecution) === Option(topKThreshold))
+ }
+ }
+}