You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "wankunde (via GitHub)" <gi...@apache.org> on 2023/09/20 07:21:14 UTC

[GitHub] [spark] wankunde opened a new pull request, #43009: [SPARK-45230][SQL] Plan sorter for Aggregate after SMJ

wankunde opened a new pull request, #43009:
URL: https://github.com/apache/spark/pull/43009

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   
   This PR could be a followup of https://github.com/apache/spark/pull/42488 and https://github.com/apache/spark/pull/42557.
   
   If there is an aggregate operator after the SMJ and the grouping expressions of aggregate operator contain all the join keys of the streamed side, we can add a sorter on the streamed side of the SMJ, so that the aggregate can be convert to a SortAggregate which will be faster than HashAggregate.
   
   For example, with table t1(a, b, c) and t2(x, y, z):
   ```
   SELECT a, b, sum(c)
   FROM t1
   JOIN t2
   ON t1.b = t2.y
   GROUP BY a, b
   ```
   
   The optimized plan:
   ```
   Scan(t1)                Scan(t2)
       |                      |
       |                      |
   Exchange 1           Exchange 2
        \                     /
          \                 /
            \             /
           SMJ (t1.b = t2.y)
                   |
                   |
               Aggregate
   ```
   Before this PR, spark EnsureReqirement will add Sorter(t1.b) to the left side of SMJ.
   ```
   Scan(t1)                Scan(t2)
       |                      |
       |                      |
   Exchange 1           Exchange 2
        \                     /
      Sort(t1.b)        Sort(t2.y) 
            \             /
          SMJ (t1.b = t2.y)
                   |
                   |
            HashAggregate
   ```
   
   If we add a Sort(t1.b, t1.a) to the left side of the SMJ, the following aggregate could be convert to SortAggregate, will be faster.
   
   ```
   Scan(t1)                Scan(t2)
       |                      |
       |                      |
   Exchange 1           Exchange 2
        \                    /
    Sort(t1.b, t1.a)   Sort(t2.y) 
            \             /
         SMJ (t1.b = t2.y)
                   |
                   |
             SortAggregate
   ```
   
   
   ### Why are the changes needed?
   
   Optimize HashAggregate
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Added UT
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-45230][SQL] Plan sorter for Aggregate after SMJ [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #43009: [WIP][SPARK-45230][SQL] Plan sorter for Aggregate after SMJ
URL: https://github.com/apache/spark/pull/43009


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-45230][SQL] Plan sorter for Aggregate after SMJ [spark]

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #43009:
URL: https://github.com/apache/spark/pull/43009#discussion_r1364554543


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateAfterSMJBenchmark.scala:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Benchmark to measure performance for aggregate primitives.
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt: bin/spark-submit --class <this class>
+ *      --jars <spark core test jar>,<spark catalyst test jar> <spark sql test jar>
+ *   2. build/sbt "sql/Test/runMain <this class>"
+ *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain <this class>"
+ *      Results will be written to "benchmarks/AggregateAfterSMJBenchmark-results.txt".
+ * }}}
+ */
+object AggregateAfterSMJBenchmark extends SqlBasedBenchmark {
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    runBenchmark("Aggregate after SMJ") {
+      val N = 10 << 21
+
+      val benchmark = new Benchmark("Aggregate after SMJ", N, output = output)
+      spark
+        .range(N)
+        .selectExpr(
+          "cast(id as decimal) as id1",
+          "cast(id as decimal) as id2",
+          "cast(id + 1 as decimal) as id3")
+        .createOrReplaceTempView("t1")
+      spark
+        .range(N)
+        .selectExpr(
+          "cast(id as decimal) as id1",
+          "cast(id as decimal) as id2",
+          "cast(id as decimal) as id3")
+        .createOrReplaceTempView("t2")
+
+      def f(): Unit = {
+        withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+          spark.sql(
+            s"""SELECT t1.id1, t1.id2, count(t1.id3) as cnt
+               |FROM t1
+               |JOIN t2
+               |ON t1.id2 = t2.id2 AND t1.id3 > t2.id3
+               |GROUP BY t1.id1, t1.id2

Review Comment:
   presently WSCG is not supported for SortAgg when group by expressions are present : https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala#L95
   
   wondering if there are scenarios where it can regress because of this ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-45230][SQL] Plan sorter for Aggregate after SMJ [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #43009:
URL: https://github.com/apache/spark/pull/43009#issuecomment-1912859595

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org