You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/07/07 06:20:55 UTC
[spark] branch master updated: [SPARK-39679][SQL] TakeOrderedAndProjectExec should respect child output ordering
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 427fbee4c00 [SPARK-39679][SQL] TakeOrderedAndProjectExec should respect child output ordering
427fbee4c00 is described below
commit 427fbee4c009d8d49fdb80a2e2532723eff84150
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Thu Jul 7 14:20:29 2022 +0800
[SPARK-39679][SQL] TakeOrderedAndProjectExec should respect child output ordering
### What changes were proposed in this pull request?
Skip local sort in `TakeOrderedAndProjectExec` if child output ordering satisfies the required.
### Why are the changes needed?
TakeOrderedAndProjectExec should respect child output ordering to avoid unnecessary sort.
For example: TakeOrderedAndProjectExec on the top of SortMergeJoin.
```SQL
SELECT * FROM t1 JOIN t2 ON t1.c1 = t2.c2 ORDER BY t1.c1 LIMIT 100;
```
### Does this PR introduce _any_ user-facing change?
no, only improve performance
### How was this patch tested?
Add benchmark test:
```sql
val row = 10 * 1000
val df1 = spark.range(0, row, 1, 2).selectExpr("id % 3 as c1")
val df2 = spark.range(0, row, 1, 2).selectExpr("id % 3 as c2")
df1.join(df2, col("c1") === col("c2"))
.orderBy(col("c1"))
.limit(100)
.noop()
```
Before:
```
================================================================================================
TakeOrderedAndProject
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1031-azure
Intel(R) Xeon(R) Platinum 8370C CPU 2.80GHz
TakeOrderedAndProject with SMJ: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProject with SMJ for doExecute 3356 3414 61 0.0 335569.5 1.0X
TakeOrderedAndProject with SMJ for executeCollect 3331 3370 47 0.0 333118.0 1.0X
OpenJDK 64-Bit Server VM 11.0.15+10-LTS on Linux 5.13.0-1031-azure
Intel(R) Xeon(R) Platinum 8272CL CPU 2.60GHz
TakeOrderedAndProject with SMJ: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProject with SMJ for doExecute 3745 3766 24 0.0 374477.3 1.0X
TakeOrderedAndProject with SMJ for executeCollect 3657 3680 38 0.0 365703.4 1.0X
OpenJDK 64-Bit Server VM 17.0.3+7-LTS on Linux 5.13.0-1031-azure
Intel(R) Xeon(R) Platinum 8272CL CPU 2.60GHz
TakeOrderedAndProject with SMJ: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProject with SMJ for doExecute 2499 2554 47 0.0 249945.5 1.0X
TakeOrderedAndProject with SMJ for executeCollect 2510 2515 8 0.0 250956.9 1.0X
```
After:
```
================================================================================================
TakeOrderedAndProject
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1031-azure
Intel(R) Xeon(R) Platinum 8171M CPU 2.60GHz
TakeOrderedAndProject with SMJ: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProject with SMJ for doExecute 287 337 43 0.0 28734.9 1.0X
TakeOrderedAndProject with SMJ for executeCollect 150 170 30 0.1 15037.8 1.9X
OpenJDK 64-Bit Server VM 11.0.15+10-LTS on Linux 5.13.0-1031-azure
Intel(R) Xeon(R) Platinum 8272CL CPU 2.60GHz
TakeOrderedAndProject with SMJ: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProject with SMJ for doExecute 521 537 14 0.0 52122.1 1.0X
TakeOrderedAndProject with SMJ for executeCollect 247 300 48 0.0 24737.7 2.1X
OpenJDK 64-Bit Server VM 17.0.3+7-LTS on Linux 5.13.0-1031-azure
Intel(R) Xeon(R) Platinum 8370C CPU 2.80GHz
TakeOrderedAndProject with SMJ: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
TakeOrderedAndProject with SMJ for doExecute 339 364 30 0.0 33873.3 1.0X
TakeOrderedAndProject with SMJ for executeCollect 129 146 22 0.1 12949.3 2.6X
```
Closes #37085 from ulysses-you/topn-ordering.
Lead-authored-by: ulysses-you <ul...@gmail.com>
Co-authored-by: ulysses <ul...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
...akeOrderedAndProjectBenchmark-jdk11-results.txt | 12 ++++
...akeOrderedAndProjectBenchmark-jdk17-results.txt | 12 ++++
.../TakeOrderedAndProjectBenchmark-results.txt | 12 ++++
.../org/apache/spark/sql/execution/limit.scala | 16 ++++-
.../sql/execution/TakeOrderedAndProjectSuite.scala | 34 ++++++++++
.../benchmark/TakeOrderedAndProjectBenchmark.scala | 76 ++++++++++++++++++++++
6 files changed, 159 insertions(+), 3 deletions(-)
diff --git a/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-jdk11-results.txt b/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-jdk11-results.txt
new file mode 100644
index 00000000000..60c67b2f6a3
--- /dev/null
+++ b/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-jdk11-results.txt
@@ -0,0 +1,12 @@
+================================================================================================
+TakeOrderedAndProject
+================================================================================================
+
+OpenJDK 64-Bit Server VM 11.0.15+10-LTS on Linux 5.13.0-1031-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+TakeOrderedAndProject with SMJ: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+---------------------------------------------------------------------------------------------------------------------------------
+TakeOrderedAndProject with SMJ for doExecute 521 537 14 0.0 52122.1 1.0X
+TakeOrderedAndProject with SMJ for executeCollect 247 300 48 0.0 24737.7 2.1X
+
+
diff --git a/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-jdk17-results.txt b/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-jdk17-results.txt
new file mode 100644
index 00000000000..21697d54449
--- /dev/null
+++ b/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-jdk17-results.txt
@@ -0,0 +1,12 @@
+================================================================================================
+TakeOrderedAndProject
+================================================================================================
+
+OpenJDK 64-Bit Server VM 17.0.3+7-LTS on Linux 5.13.0-1031-azure
+Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
+TakeOrderedAndProject with SMJ: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+---------------------------------------------------------------------------------------------------------------------------------
+TakeOrderedAndProject with SMJ for doExecute 339 364 30 0.0 33873.3 1.0X
+TakeOrderedAndProject with SMJ for executeCollect 129 146 22 0.1 12949.3 2.6X
+
+
diff --git a/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-results.txt b/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-results.txt
new file mode 100644
index 00000000000..9cfe6ceb36a
--- /dev/null
+++ b/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-results.txt
@@ -0,0 +1,12 @@
+================================================================================================
+TakeOrderedAndProject
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1031-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+TakeOrderedAndProject with SMJ: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+---------------------------------------------------------------------------------------------------------------------------------
+TakeOrderedAndProject with SMJ for doExecute 287 337 43 0.0 28734.9 1.0X
+TakeOrderedAndProject with SMJ for executeCollect 150 170 30 0.1 15037.8 1.9X
+
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index dbba19002c5..49f703fddb7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -283,8 +283,13 @@ case class TakeOrderedAndProjectExec(
}
override def executeCollect(): Array[InternalRow] = {
+ val orderingSatisfies = SortOrder.orderingSatisfies(child.outputOrdering, sortOrder)
val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
- val limited = child.execute().mapPartitionsInternal(_.map(_.copy())).takeOrdered(limit)(ord)
+ val limited = if (orderingSatisfies) {
+ child.execute().mapPartitionsInternal(_.map(_.copy()).take(limit)).takeOrdered(limit)(ord)
+ } else {
+ child.execute().mapPartitionsInternal(_.map(_.copy())).takeOrdered(limit)(ord)
+ }
val data = if (offset > 0) limited.drop(offset) else limited
if (projectList != child.output) {
val proj = UnsafeProjection.create(projectList, child.output)
@@ -303,6 +308,7 @@ case class TakeOrderedAndProjectExec(
override lazy val metrics = readMetrics ++ writeMetrics
protected override def doExecute(): RDD[InternalRow] = {
+ val orderingSatisfies = SortOrder.orderingSatisfies(child.outputOrdering, sortOrder)
val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
val childRDD = child.execute()
if (childRDD.getNumPartitions == 0) {
@@ -311,8 +317,12 @@ case class TakeOrderedAndProjectExec(
val singlePartitionRDD = if (childRDD.getNumPartitions == 1) {
childRDD
} else {
- val localTopK = childRDD.mapPartitionsInternal { iter =>
- Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
+ val localTopK = if (orderingSatisfies) {
+ childRDD.mapPartitionsInternal(_.map(_.copy()).take(limit))
+ } else {
+ childRDD.mapPartitionsInternal { iter =>
+ Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
+ }
}
new ShuffledRowRDD(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
index 766b8238bb8..647d46f8fbf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
@@ -93,4 +93,38 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSparkSession {
}
}
}
+
+ test("TakeOrderedAndProject.doExecute with local sort") {
+ withClue(s"seed = $seed") {
+ val expected = (input: SparkPlan) => {
+ GlobalLimitExec(limit,
+ LocalLimitExec(limit,
+ ProjectExec(Seq(input.output.last),
+ SortExec(sortOrder, true, input))))
+ }
+
+ // test doExecute
+ Seq((10000, 10), (200, 10)).foreach { case (n, m) =>
+ checkThatPlansAgree(
+ generateRandomInputData(n, m),
+ input =>
+ noOpFilter(
+ TakeOrderedAndProjectExec(limit, sortOrder, Seq(input.output.last),
+ SortExec(sortOrder, false, input))),
+ input => expected(input),
+ sortAnswers = false)
+ }
+
+ // test executeCollect
+ Seq((10000, 10), (200, 10)).foreach { case (n, m) =>
+ checkThatPlansAgree(
+ generateRandomInputData(n, m),
+ input =>
+ TakeOrderedAndProjectExec(limit, sortOrder, Seq(input.output.last),
+ SortExec(sortOrder, false, input)),
+ input => expected(input),
+ sortAnswers = false)
+ }
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TakeOrderedAndProjectBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TakeOrderedAndProjectBenchmark.scala
new file mode 100644
index 00000000000..88cdfebbb17
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TakeOrderedAndProjectBenchmark.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TakeOrderedAndProject benchmark.
+ * To run this benchmark:
+ * {{{
+ * 1. without sbt:
+ * bin/spark-submit --class <this class>
+ * --jars <spark core test jar>,<spark catalyst test jar> <sql core test jar>
+ * 2. build/sbt "sql/test:runMain <this class>"
+ * 3. generate result:
+ * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
+ * Results will be written to "benchmarks/TakeOrderedAndProjectBenchmark-results.txt".
+ * }}}
+ */
+object TakeOrderedAndProjectBenchmark extends SqlBasedBenchmark {
+
+ private def takeOrderedAndProjectWithSMJ(): Unit = {
+ val row = 10 * 1000
+
+ val df1 = spark.range(0, row, 1, 2).selectExpr("id % 3 as c1")
+ val df2 = spark.range(0, row, 1, 2).selectExpr("id % 3 as c2")
+
+ val benchmark = new Benchmark("TakeOrderedAndProject with SMJ", row, output = output)
+
+ benchmark.addCase("TakeOrderedAndProject with SMJ for doExecute", 3) { _ =>
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ df1.join(df2, col("c1") === col("c2"))
+ .orderBy(col("c1"))
+ .limit(100)
+ .noop()
+ }
+ }
+
+ benchmark.addCase("TakeOrderedAndProject with SMJ for executeCollect", 3) { _ =>
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ df1.join(df2, col("c1") === col("c2"))
+ .orderBy(col("c1"))
+ .limit(100)
+ .collect()
+ }
+ }
+ benchmark.run()
+ }
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ runBenchmark("TakeOrderedAndProject") {
+ takeOrderedAndProjectWithSMJ()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org