You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "wangyum (via GitHub)" <gi...@apache.org> on 2023/11/16 12:24:51 UTC

[PR] [SPARK-45954][SQL] Remove redundant shuffles [spark]

wangyum opened a new pull request, #43841:
URL: https://github.com/apache/spark/pull/43841

   ### What changes were proposed in this pull request?
   
   This PR adds a new physical rule to remove redundant `ShuffleExchangeExec` node from the Spark plan. A shuffle node is redundant when its child is also a shuffle.
   
   ### Why are the changes needed?
   
   Reduce shuffle to improve query performance.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Unit test.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45954][SQL] Avoid generating redundant `ShuffleExchangeExec` node [spark]

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on PR #43841:
URL: https://github.com/apache/spark/pull/43841#issuecomment-1815619629

   `df1.repartition` is a user-specified shuffle, we should better respect it, or at least we should add a new config. e.g., it's likely people want to add a shuffle to solve skewed data before aggregation or something else.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45954][SQL] Remove redundant shuffles [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #43841: [SPARK-45954][SQL] Remove redundant shuffles
URL: https://github.com/apache/spark/pull/43841


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45954][SQL] Remove redundant shuffles [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #43841:
URL: https://github.com/apache/spark/pull/43841#issuecomment-1977715036

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45954][SQL] Remove redundant shuffles [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43841:
URL: https://github.com/apache/spark/pull/43841#discussion_r1404840863


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantShufflesSuite.scala:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{DataFrame, QueryTest}
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+abstract class RemoveRedundantShufflesSuiteBase
+    extends QueryTest
+    with SharedSparkSession
+    with AdaptiveSparkPlanHelper {
+  import testImplicits._
+
+  private def getShuffleExchangeNum(df: DataFrame): Int = {
+    collect(df.queryExecution.executedPlan) {
+      case s: ShuffleExchangeExec => s
+    }.size
+  }
+
+  test("Remove redundant shuffle") {
+    withTempView("t1", "t2") {
+      spark.range(10).select($"id").createOrReplaceTempView("t1")
+      spark.range(20).select($"id").createOrReplaceTempView("t2")
+      Seq(-1, 1000000).foreach { threshold =>
+        withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> threshold.toString) {
+          val query = spark.table("t1").repartition(10).join(spark.table("t2"), "id")
+          query.collect()
+          val shuffleNum = getShuffleExchangeNum(query)
+          if (threshold > 0) {

Review Comment:
   The test condition looks not good.
   How about `threshold == 1000000`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45954][SQL] Avoid generating redundant `ShuffleExchangeExec` node [spark]

Posted by "wangyum (via GitHub)" <gi...@apache.org>.
wangyum commented on code in PR #43841:
URL: https://github.com/apache/spark/pull/43841#discussion_r1396593112


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -68,7 +68,13 @@ case class EnsureRequirements(
       case (child, distribution) =>
         val numPartitions = distribution.requiredNumPartitions
           .getOrElse(conf.numShufflePartitions)
-        ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child, shuffleOrigin)
+        // Avoid generating redundant ShuffleExchangeExec node
+        val shuffleChild = child match {
+          case s: ShuffleExchangeExec => s.child
+          case other => other
+        }
+        ShuffleExchangeExec(
+          distribution.createPartitioning(numPartitions), shuffleChild, shuffleOrigin)

Review Comment:
   @ulysses-you It only takes effect if there are no operators between two shuffles.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45954][SQL] Avoid generating redundant `ShuffleExchangeExec` node [spark]

Posted by "wangyum (via GitHub)" <gi...@apache.org>.
wangyum commented on PR #43841:
URL: https://github.com/apache/spark/pull/43841#issuecomment-1815633494

   This change does not affect adding repartition before aggregation:
   ```scala
   spark.table("t1").repartition(10).groupBy("a").count().explain(true)
   ```
   
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- HashAggregate(keys=[a#16L], functions=[count(1)], output=[a#16L, count#23L])
      +- Exchange hashpartitioning(a#16L, 5), ENSURE_REQUIREMENTS, [plan_id=62]
         +- HashAggregate(keys=[a#16L], functions=[partial_count(1)], output=[a#16L, count#28L])
            +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=58]
               +- FileScan parquet spark_catalog.default.t1[a#16L]
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45954][SQL] Avoid generating redundant `ShuffleExchangeExec` node [spark]

Posted by "wangyum (via GitHub)" <gi...@apache.org>.
wangyum commented on PR #43841:
URL: https://github.com/apache/spark/pull/43841#issuecomment-1815523392

   It should only take effect in the following cases, usually the user wants to increase the parallelism for `BroadcastHashJoin`/`BroadcastNestedLoopJoin`:
   ```scala
   df1.repartition(num).join(df2, )
   df1.repartition(num).select(xxx).join(df2, "id")
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45954][SQL] Avoid generating redundant `ShuffleExchangeExec` node [spark]

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on code in PR #43841:
URL: https://github.com/apache/spark/pull/43841#discussion_r1396615662


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -68,7 +68,13 @@ case class EnsureRequirements(
       case (child, distribution) =>
         val numPartitions = distribution.requiredNumPartitions
           .getOrElse(conf.numShufflePartitions)
-        ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child, shuffleOrigin)
+        // Avoid generating redundant ShuffleExchangeExec node
+        val shuffleChild = child match {
+          case s: ShuffleExchangeExec => s.child
+          case other => other
+        }
+        ShuffleExchangeExec(
+          distribution.createPartitioning(numPartitions), shuffleChild, shuffleOrigin)

Review Comment:
   My point is that, it's a user-specified shuffle, we can not just remove it. Does it affect shuffle reuse ? e.g.,
   
   ```
   with t (select /*+ repartition */ * from t)
   select * from t join t2
   union all
   select * from t
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45954][SQL] Remove redundant shuffles [spark]

Posted by "wangyum (via GitHub)" <gi...@apache.org>.
wangyum commented on code in PR #43841:
URL: https://github.com/apache/spark/pull/43841#discussion_r1404766735


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala:
##########
@@ -68,7 +68,13 @@ case class EnsureRequirements(
       case (child, distribution) =>
         val numPartitions = distribution.requiredNumPartitions
           .getOrElse(conf.numShufflePartitions)
-        ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child, shuffleOrigin)
+        // Avoid generating redundant ShuffleExchangeExec node
+        val shuffleChild = child match {
+          case s: ShuffleExchangeExec => s.child
+          case other => other
+        }
+        ShuffleExchangeExec(
+          distribution.createPartitioning(numPartitions), shuffleChild, shuffleOrigin)

Review Comment:
   Skip reuse shuffle case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org