You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/07/07 06:20:55 UTC

[spark] branch master updated: [SPARK-39679][SQL] TakeOrderedAndProjectExec should respect child output ordering

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 427fbee4c00 [SPARK-39679][SQL] TakeOrderedAndProjectExec should respect child output ordering
427fbee4c00 is described below

commit 427fbee4c009d8d49fdb80a2e2532723eff84150
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Thu Jul 7 14:20:29 2022 +0800

    [SPARK-39679][SQL] TakeOrderedAndProjectExec should respect child output ordering
    
    ### What changes were proposed in this pull request?
    
    Skip local sort in `TakeOrderedAndProjectExec` if child output ordering satisfies the required.
    
    ### Why are the changes needed?
    
    TakeOrderedAndProjectExec should respect child output ordering to avoid unnecessary sort.
    For example:  TakeOrderedAndProjectExec on the top of SortMergeJoin.
    ```SQL
    SELECT * FROM t1 JOIN t2 ON t1.c1 = t2.c2 ORDER BY t1.c1 LIMIT 100;
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    no, only improve performance
    
    ### How was this patch tested?
    
    Add benchmark test:
    ```sql
    val row = 10 * 1000
    val df1 = spark.range(0, row, 1, 2).selectExpr("id % 3 as c1")
    val df2 = spark.range(0, row, 1, 2).selectExpr("id % 3 as c2")
    df1.join(df2, col("c1") === col("c2"))
              .orderBy(col("c1"))
              .limit(100)
              .noop()
    ```
    
    Before:
    ```
    ================================================================================================
    TakeOrderedAndProject
    ================================================================================================
    
    OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1031-azure
    Intel(R) Xeon(R) Platinum 8370C CPU  2.80GHz
    TakeOrderedAndProject with SMJ:                    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    ---------------------------------------------------------------------------------------------------------------------------------
    TakeOrderedAndProject with SMJ for doExecute                3356           3414          61          0.0      335569.5       1.0X
    TakeOrderedAndProject with SMJ for executeCollect           3331           3370          47          0.0      333118.0       1.0X
    
    OpenJDK 64-Bit Server VM 11.0.15+10-LTS on Linux 5.13.0-1031-azure
    Intel(R) Xeon(R) Platinum 8272CL CPU  2.60GHz
    TakeOrderedAndProject with SMJ:                    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    ---------------------------------------------------------------------------------------------------------------------------------
    TakeOrderedAndProject with SMJ for doExecute                3745           3766          24          0.0      374477.3       1.0X
    TakeOrderedAndProject with SMJ for executeCollect           3657           3680          38          0.0      365703.4       1.0X
    
    OpenJDK 64-Bit Server VM 17.0.3+7-LTS on Linux 5.13.0-1031-azure
    Intel(R) Xeon(R) Platinum 8272CL CPU  2.60GHz
    TakeOrderedAndProject with SMJ:                    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    ---------------------------------------------------------------------------------------------------------------------------------
    TakeOrderedAndProject with SMJ for doExecute                2499           2554          47          0.0      249945.5       1.0X
    TakeOrderedAndProject with SMJ for executeCollect           2510           2515           8          0.0      250956.9       1.0X
    ```
    
    After:
    ```
    ================================================================================================
    TakeOrderedAndProject
    ================================================================================================
    
    OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1031-azure
    Intel(R) Xeon(R) Platinum 8171M CPU  2.60GHz
    TakeOrderedAndProject with SMJ:                    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    ---------------------------------------------------------------------------------------------------------------------------------
    TakeOrderedAndProject with SMJ for doExecute                 287            337          43          0.0       28734.9       1.0X
    TakeOrderedAndProject with SMJ for executeCollect            150            170          30          0.1       15037.8       1.9X
    
    OpenJDK 64-Bit Server VM 11.0.15+10-LTS on Linux 5.13.0-1031-azure
    Intel(R) Xeon(R) Platinum 8272CL CPU  2.60GHz
    TakeOrderedAndProject with SMJ:                    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    ---------------------------------------------------------------------------------------------------------------------------------
    TakeOrderedAndProject with SMJ for doExecute                 521            537          14          0.0       52122.1       1.0X
    TakeOrderedAndProject with SMJ for executeCollect            247            300          48          0.0       24737.7       2.1X
    
    OpenJDK 64-Bit Server VM 17.0.3+7-LTS on Linux 5.13.0-1031-azure
    Intel(R) Xeon(R) Platinum 8370C CPU  2.80GHz
    TakeOrderedAndProject with SMJ:                    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    ---------------------------------------------------------------------------------------------------------------------------------
    TakeOrderedAndProject with SMJ for doExecute                 339            364          30          0.0       33873.3       1.0X
    TakeOrderedAndProject with SMJ for executeCollect            129            146          22          0.1       12949.3       2.6X
    ```
    
    Closes #37085 from ulysses-you/topn-ordering.
    
    Lead-authored-by: ulysses-you <ul...@gmail.com>
    Co-authored-by: ulysses <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 ...akeOrderedAndProjectBenchmark-jdk11-results.txt | 12 ++++
 ...akeOrderedAndProjectBenchmark-jdk17-results.txt | 12 ++++
 .../TakeOrderedAndProjectBenchmark-results.txt     | 12 ++++
 .../org/apache/spark/sql/execution/limit.scala     | 16 ++++-
 .../sql/execution/TakeOrderedAndProjectSuite.scala | 34 ++++++++++
 .../benchmark/TakeOrderedAndProjectBenchmark.scala | 76 ++++++++++++++++++++++
 6 files changed, 159 insertions(+), 3 deletions(-)

diff --git a/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-jdk11-results.txt b/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-jdk11-results.txt
new file mode 100644
index 00000000000..60c67b2f6a3
--- /dev/null
+++ b/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-jdk11-results.txt
@@ -0,0 +1,12 @@
+================================================================================================
+TakeOrderedAndProject
+================================================================================================
+
+OpenJDK 64-Bit Server VM 11.0.15+10-LTS on Linux 5.13.0-1031-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+TakeOrderedAndProject with SMJ:                    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+---------------------------------------------------------------------------------------------------------------------------------
+TakeOrderedAndProject with SMJ for doExecute                 521            537          14          0.0       52122.1       1.0X
+TakeOrderedAndProject with SMJ for executeCollect            247            300          48          0.0       24737.7       2.1X
+
+
diff --git a/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-jdk17-results.txt b/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-jdk17-results.txt
new file mode 100644
index 00000000000..21697d54449
--- /dev/null
+++ b/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-jdk17-results.txt
@@ -0,0 +1,12 @@
+================================================================================================
+TakeOrderedAndProject
+================================================================================================
+
+OpenJDK 64-Bit Server VM 17.0.3+7-LTS on Linux 5.13.0-1031-azure
+Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
+TakeOrderedAndProject with SMJ:                    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+---------------------------------------------------------------------------------------------------------------------------------
+TakeOrderedAndProject with SMJ for doExecute                 339            364          30          0.0       33873.3       1.0X
+TakeOrderedAndProject with SMJ for executeCollect            129            146          22          0.1       12949.3       2.6X
+
+
diff --git a/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-results.txt b/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-results.txt
new file mode 100644
index 00000000000..9cfe6ceb36a
--- /dev/null
+++ b/sql/core/benchmarks/TakeOrderedAndProjectBenchmark-results.txt
@@ -0,0 +1,12 @@
+================================================================================================
+TakeOrderedAndProject
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1031-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+TakeOrderedAndProject with SMJ:                    Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+---------------------------------------------------------------------------------------------------------------------------------
+TakeOrderedAndProject with SMJ for doExecute                 287            337          43          0.0       28734.9       1.0X
+TakeOrderedAndProject with SMJ for executeCollect            150            170          30          0.1       15037.8       1.9X
+
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index dbba19002c5..49f703fddb7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -283,8 +283,13 @@ case class TakeOrderedAndProjectExec(
   }
 
   override def executeCollect(): Array[InternalRow] = {
+    val orderingSatisfies = SortOrder.orderingSatisfies(child.outputOrdering, sortOrder)
     val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
-    val limited = child.execute().mapPartitionsInternal(_.map(_.copy())).takeOrdered(limit)(ord)
+    val limited = if (orderingSatisfies) {
+      child.execute().mapPartitionsInternal(_.map(_.copy()).take(limit)).takeOrdered(limit)(ord)
+    } else {
+      child.execute().mapPartitionsInternal(_.map(_.copy())).takeOrdered(limit)(ord)
+    }
     val data = if (offset > 0) limited.drop(offset) else limited
     if (projectList != child.output) {
       val proj = UnsafeProjection.create(projectList, child.output)
@@ -303,6 +308,7 @@ case class TakeOrderedAndProjectExec(
   override lazy val metrics = readMetrics ++ writeMetrics
 
   protected override def doExecute(): RDD[InternalRow] = {
+    val orderingSatisfies = SortOrder.orderingSatisfies(child.outputOrdering, sortOrder)
     val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
     val childRDD = child.execute()
     if (childRDD.getNumPartitions == 0) {
@@ -311,8 +317,12 @@ case class TakeOrderedAndProjectExec(
       val singlePartitionRDD = if (childRDD.getNumPartitions == 1) {
         childRDD
       } else {
-        val localTopK = childRDD.mapPartitionsInternal { iter =>
-          Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
+        val localTopK = if (orderingSatisfies) {
+          childRDD.mapPartitionsInternal(_.map(_.copy()).take(limit))
+        } else {
+          childRDD.mapPartitionsInternal { iter =>
+            Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
+          }
         }
 
         new ShuffledRowRDD(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
index 766b8238bb8..647d46f8fbf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
@@ -93,4 +93,38 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSparkSession {
       }
     }
   }
+
+  test("TakeOrderedAndProject.doExecute with local sort") {
+    withClue(s"seed = $seed") {
+      val expected = (input: SparkPlan) => {
+        GlobalLimitExec(limit,
+          LocalLimitExec(limit,
+            ProjectExec(Seq(input.output.last),
+              SortExec(sortOrder, true, input))))
+      }
+
+      // test doExecute
+      Seq((10000, 10), (200, 10)).foreach { case (n, m) =>
+        checkThatPlansAgree(
+          generateRandomInputData(n, m),
+          input =>
+            noOpFilter(
+              TakeOrderedAndProjectExec(limit, sortOrder, Seq(input.output.last),
+                SortExec(sortOrder, false, input))),
+          input => expected(input),
+          sortAnswers = false)
+      }
+
+      // test executeCollect
+      Seq((10000, 10), (200, 10)).foreach { case (n, m) =>
+        checkThatPlansAgree(
+          generateRandomInputData(n, m),
+          input =>
+            TakeOrderedAndProjectExec(limit, sortOrder, Seq(input.output.last),
+              SortExec(sortOrder, false, input)),
+          input => expected(input),
+          sortAnswers = false)
+      }
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TakeOrderedAndProjectBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TakeOrderedAndProjectBenchmark.scala
new file mode 100644
index 00000000000..88cdfebbb17
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TakeOrderedAndProjectBenchmark.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * TakeOrderedAndProject benchmark.
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class>
+ *        --jars <spark core test jar>,<spark catalyst test jar> <sql core test jar>
+ *   2. build/sbt "sql/test:runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
+ *      Results will be written to "benchmarks/TakeOrderedAndProjectBenchmark-results.txt".
+ * }}}
+ */
+object TakeOrderedAndProjectBenchmark extends SqlBasedBenchmark {
+
+  private def takeOrderedAndProjectWithSMJ(): Unit = {
+    val row = 10 * 1000
+
+    val df1 = spark.range(0, row, 1, 2).selectExpr("id % 3 as c1")
+    val df2 = spark.range(0, row, 1, 2).selectExpr("id % 3 as c2")
+
+    val benchmark = new Benchmark("TakeOrderedAndProject with SMJ", row, output = output)
+
+    benchmark.addCase("TakeOrderedAndProject with SMJ for doExecute", 3) { _ =>
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+          SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+          SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+        df1.join(df2, col("c1") === col("c2"))
+          .orderBy(col("c1"))
+          .limit(100)
+          .noop()
+      }
+    }
+
+    benchmark.addCase("TakeOrderedAndProject with SMJ for executeCollect", 3) { _ =>
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+          SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+          SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+        df1.join(df2, col("c1") === col("c2"))
+          .orderBy(col("c1"))
+          .limit(100)
+          .collect()
+      }
+    }
+    benchmark.run()
+  }
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    runBenchmark("TakeOrderedAndProject") {
+      takeOrderedAndProjectWithSMJ()
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org