You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/11/19 04:37:46 UTC

[spark] branch master updated: [SPARK-37370][SQL] Add SQL configs to control newly added join code-gen in 3.3

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 43b05e7  [SPARK-37370][SQL] Add SQL configs to control newly added join code-gen in 3.3
43b05e7 is described below

commit 43b05e72808ead8ccc051872461ee7a0d2e893bf
Author: Cheng Su <ch...@fb.com>
AuthorDate: Fri Nov 19 12:36:42 2021 +0800

    [SPARK-37370][SQL] Add SQL configs to control newly added join code-gen in 3.3
    
    ### What changes were proposed in this pull request?
    
    During Spark 3.3, we added code-gen for FULL OUTER shuffled hash join, FULL OUTER sort merge join, and Existence sort merge join. Given the join test coverage is not high, and we would love to avoid any upcoming release regression due to it. So here we introduce three internal configs to allow users and developers to disable code-gen in case we found any bug after release.
    
    ### Why are the changes needed?
    
    Allow users for quick mitigation in case any bug found. Avoid release regression.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, three internal configs to control each join. By default they are true, so code-gen is enabled.
    
    ### How was this patch tested?
    
    Only added configs here, so rely on existing tests.
    
    Closes #34643 from c21/join-config.
    
    Authored-by: Cheng Su <ch...@fb.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 24 ++++++++++++++++++++++
 .../sql/execution/joins/ShuffledHashJoinExec.scala |  6 ++++++
 .../sql/execution/joins/SortMergeJoinExec.scala    |  7 +++++++
 3 files changed, 37 insertions(+)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5304bb3..c7535c3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1765,6 +1765,30 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN =
+    buildConf("spark.sql.codegen.join.fullOuterShuffledHashJoin.enabled")
+      .internal()
+      .doc("When true, enable code-gen for FULL OUTER shuffled hash join.")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  val ENABLE_FULL_OUTER_SORT_MERGE_JOIN_CODEGEN =
+    buildConf("spark.sql.codegen.join.fullOuterSortMergeJoin.enabled")
+      .internal()
+      .doc("When true, enable code-gen for FULL OUTER sort merge join.")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  val ENABLE_EXISTENCE_SORT_MERGE_JOIN_CODEGEN =
+    buildConf("spark.sql.codegen.join.existenceSortMergeJoin.enabled")
+      .internal()
+      .doc("When true, enable code-gen for Existence sort merge join.")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val MAX_NESTED_VIEW_DEPTH =
     buildConf("spark.sql.view.maxNestedViewDepth")
       .internal()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
index 9e81a6f..cfe35d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.{RowIterator, SparkPlan}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.collection.{BitSet, OpenHashSet}
 
 /**
@@ -311,6 +312,11 @@ case class ShuffledHashJoinExec(
     streamResultIter ++ buildResultIter
   }
 
+  override def supportCodegen: Boolean = joinType match {
+    case FullOuter => conf.getConf(SQLConf.ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN)
+    case _ => true
+  }
+
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
     streamedPlan.execute() :: buildPlan.execute() :: Nil
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 8e80168..76767cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.BooleanType
 import org.apache.spark.util.collection.BitSet
 
@@ -376,6 +377,12 @@ case class SortMergeJoinExec(
   private lazy val streamedOutput = streamedPlan.output
   private lazy val bufferedOutput = bufferedPlan.output
 
+  override def supportCodegen: Boolean = joinType match {
+    case FullOuter => conf.getConf(SQLConf.ENABLE_FULL_OUTER_SORT_MERGE_JOIN_CODEGEN)
+    case _: ExistenceJoin => conf.getConf(SQLConf.ENABLE_EXISTENCE_SORT_MERGE_JOIN_CODEGEN)
+    case _ => true
+  }
+
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
     streamedPlan.execute() :: bufferedPlan.execute() :: Nil
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org