You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/06 16:59:40 UTC

[spark] branch branch-3.2 updated: [SPARK-35984][SQL][TEST] Config to force applying shuffled hash join

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new f3ec799  [SPARK-35984][SQL][TEST] Config to force applying shuffled hash join
f3ec799 is described below

commit f3ec79990f66c9e3d1908f228a41c244da2d84f1
Author: Linhong Liu <li...@databricks.com>
AuthorDate: Wed Jul 7 00:58:14 2021 +0800

    [SPARK-35984][SQL][TEST] Config to force applying shuffled hash join
    
    ### What changes were proposed in this pull request?
    Add a config `spark.sql.join.forceApplyShuffledHashJoin` to force applying shuffled hash join
    during the join selection.
    
    ### Why are the changes needed?
    In the `SQLQueryTestSuite`, we want to cover 3 kinds of join (BHJ, SHJ, SMJ) in join.sql. But even
    if the `spark.sql.join.preferSortMergeJoin` is set to `false`, shuffled hash join is still not guaranteed.
    Thus, we need another config to force the selection.
    
    ### Does this PR introduce _any_ user-facing change?
    No, only for testing
    
    ### How was this patch tested?
    newly added tests
    Verified all queries in join.sql will use `ShuffledHashJoin` when the config set to `true`
    
    Closes #33182 from linhongliu-db/SPARK-35984-hash-join-config.
    
    Authored-by: Linhong Liu <li...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 7566db603305ef04db89768bbb50d8b978a5c93c)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/catalyst/optimizer/joins.scala  | 16 ++++++++++++++--
 .../test/resources/sql-tests/inputs/postgreSQL/join.sql  |  2 +-
 .../src/test/scala/org/apache/spark/sql/JoinSuite.scala  |  8 ++++++++
 3 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 31a48c8..d6e2a59 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.trees.TreePattern._
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
 
 /**
  * Reorder the joins and push all the conditions into join, so that the bottom ones have at least
@@ -274,14 +275,16 @@ trait JoinSelectionHelper {
     } else {
       hintToPreferShuffleHashJoinLeft(hint) ||
         (!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(left, conf) &&
-          muchSmaller(left, right))
+          muchSmaller(left, right)) ||
+        forceApplyShuffledHashJoin(conf)
     }
     val buildRight = if (hintOnly) {
       hintToShuffleHashJoinRight(hint)
     } else {
       hintToPreferShuffleHashJoinRight(hint) ||
         (!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(right, conf) &&
-          muchSmaller(right, left))
+          muchSmaller(right, left)) ||
+        forceApplyShuffledHashJoin(conf)
     }
     getBuildSide(
       canBuildShuffledHashJoinLeft(joinType) && buildLeft,
@@ -424,5 +427,14 @@ trait JoinSelectionHelper {
   private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
     a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
   }
+
+  /**
+   * Returns whether a shuffled hash join should be force applied.
+   * The config key is hard-coded because it's testing only and should not be exposed.
+   */
+  private def forceApplyShuffledHashJoin(conf: SQLConf): Boolean = {
+    Utils.isTesting &&
+      conf.getConfString("spark.sql.join.forceApplyShuffledHashJoin", "false") == "true"
+  }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql
index 183e79e..dc0b561 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql
@@ -13,7 +13,7 @@
 
 --CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=10485760
 --CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=true
---CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=false
+--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.forceApplyShuffledHashJoin=true
 
 --CONFIG_DIM2 spark.sql.codegen.wholeStage=true
 --CONFIG_DIM2 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index acbf300..abfc19a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -1394,4 +1394,12 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
       checkAnswer(fullJoinDF, Row(100))
     }
   }
+
+  test("SPARK-35984: Config to force applying shuffled hash join") {
+    val sql = "SELECT * FROM testData JOIN testData2 ON key = a"
+    assertJoin(sql, classOf[SortMergeJoinExec])
+    withSQLConf("spark.sql.join.forceApplyShuffledHashJoin" -> "true") {
+      assertJoin(sql, classOf[ShuffledHashJoinExec])
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org