You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/07/12 22:58:48 UTC

[GitHub] [spark] c21 opened a new pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

c21 opened a new pull request #29079:
URL: https://github.com/apache/spark/pull/29079


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
   -->
   
   ### What changes were proposed in this pull request?
   Based on a follow up comment in https://github.com/apache/spark/pull/28123, where we can coalesce buckets for shuffled hash join as well. The note here is we only coalesce the buckets from shuffled hash join stream side (i.e. the side not building hash map), so we don't need to worry about OOM when coalescing multiple buckets in one task for building hash map.
   
   > If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
   
   Refactor existing physical plan rule `CoalesceBucketsInSortMergeJoin` to `CoalesceBucketsInJoin`, for covering shuffled hash join as well.
   Refactor existing unit test `CoalesceBucketsInSortMergeJoinSuite` to `CoalesceBucketsInJoinSuite`, for covering shuffled hash join as well.
   
   ### Why are the changes needed?
   Avoid shuffle for joining different bucketed tables, is also useful for shuffled hash join. In production, we are seeing users to use shuffled hash join to join bucketed tables (set `spark.sql.join.preferSortMergeJoin`=false, to avoid sort), and this can help avoid shuffle if number of buckets are not same.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   Added unit tests in `CoalesceBucketsInJoinSuite` for verifying shuffled hash join physical plan.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r454939944



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2659,7 +2660,19 @@ object SQLConf {
     buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
       .doc("The ratio of the number of two buckets being coalesced should be less than or " +
         "equal to this value for bucket coalescing to be applied. This configuration only " +
-        s"has an effect when '${COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key}' is set to true.")
+        s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
+      .version("3.1.0")
+      .intConf
+      .checkValue(_ > 0, "The difference must be positive.")
+      .createWithDefault(4)

Review comment:
       How about setting a smaller default value (e.g., 2?) than `coalesceBucketsInSortMergeJoin.maxBucketRatio`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-661983489


   **[Test build #126266 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126266/testReport)** for PR 29079 at commit [`b1a8a92`](https://github.com/apache/spark/commit/b1a8a927f283b095716a638a26f9c5d5e9cc380c).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-661939804






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r458192784



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -178,7 +244,24 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS
         rightKeys = rCols.reverse,
         leftRelation = lRel,
         rightRelation = RelationSetting(rCols, 8, Some(4)),
-        isSortMergeJoin = true))
+        joinOperator = SORT_MERGE_JOIN,
+        shjBuildSide = None))
+
+      run(JoinSetting(
+        leftKeys = lCols.reverse,
+        rightKeys = rCols.reverse,
+        leftRelation = lRel,
+        rightRelation = RelationSetting(rCols, 8, Some(4)),

Review comment:
       @cloud-fan - am I missing anything? the test is coalescing right side where we build left side.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r456862050



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2651,12 +2651,13 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED =
-    buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled")
+  val COALESCE_BUCKETS_IN_JOIN_ENABLED =
+    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
       .doc("When true, if two bucketed tables with the different number of buckets are joined, " +
         "the side with a bigger number of buckets will be coalesced to have the same number " +
-        "of buckets as the other side. Bucket coalescing is applied only to sort-merge joins " +
-        "and only when the bigger number of buckets is divisible by the smaller number of buckets.")
+        "of buckets as the other side. Bigger number of buckets is divisible by the smaller " +
+        "number of buckets. Bucket coalescing is applied to sort-merge joins and " +
+        "shuffled hash join.")

Review comment:
       @viirya - sure. added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659375946






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r456839808



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2651,12 +2651,13 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED =
-    buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled")
+  val COALESCE_BUCKETS_IN_JOIN_ENABLED =
+    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
       .doc("When true, if two bucketed tables with the different number of buckets are joined, " +
         "the side with a bigger number of buckets will be coalesced to have the same number " +
-        "of buckets as the other side. Bucket coalescing is applied only to sort-merge joins " +
-        "and only when the bigger number of buckets is divisible by the smaller number of buckets.")
+        "of buckets as the other side. Bigger number of buckets is divisible by the smaller " +
+        "number of buckets. Bucket coalescing is applied to sort-merge joins and " +
+        "shuffled hash join.")

Review comment:
       Can we add more doc like "Coalescing bucketed table can avoid unnecessary shuffling during joining but it also reduces parallelism and could possibly cause OOM  for shuffled hash join"?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r458288998



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import scala.annotation.tailrec
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
+ * if the following conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - Join keys match with output partition expressions on their respective sides.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
+ *   - The ratio of the number of buckets is less than the value set in
+ *     COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
+ */
+case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
+  private def updateNumCoalescedBucketsInScan(
+      plan: SparkPlan,
+      numCoalescedBuckets: Int): SparkPlan = {
+    plan transformUp {
+      case f: FileSourceScanExec =>
+        f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+    }
+  }
+
+  private def updateNumCoalescedBuckets(
+      join: BaseJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): BaseJoinExec = {
+    if (numCoalescedBuckets != numLeftBuckets) {
+      val leftCoalescedChild =
+        updateNumCoalescedBucketsInScan(join.left, numCoalescedBuckets)
+      join match {
+        case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild)
+      }
+    } else {
+      val rightCoalescedChild =
+        updateNumCoalescedBucketsInScan(join.right, numCoalescedBuckets)
+      join match {
+        case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild)
+      }
+    }
+  }
+
+  private def isCoalesceSHJStreamSide(
+      join: ShuffledHashJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): Boolean = {
+    if (numCoalescedBuckets == numLeftBuckets) {
+      join.buildSide != BuildRight
+    } else {
+      join.buildSide != BuildLeft
+    }
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.coalesceBucketsInJoinEnabled) {
+      return plan
+    }
+
+    plan transform {
+      case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets)
+        if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <=
+          conf.coalesceBucketsInJoinMaxBucketRatio =>
+        val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
+        join match {
+          case j: SortMergeJoinExec =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case j: ShuffledHashJoinExec
+            // Only coalesce the buckets for shuffled hash join stream side,
+            // to avoid OOM for build side.
+            if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case other => other
+        }
+      case other => other
+    }
+  }
+}
+
+/**
+ * An extractor that extracts `SortMergeJoinExec` and `ShuffledHashJoin`,
+ * where both sides of the join have the bucketed tables,
+ * are consisted of only the scan operation,
+ * and numbers of buckets are not equal but divisible.
+ */
+object ExtractJoinWithBuckets {
+  @tailrec
+  private def isScanOperation(plan: SparkPlan): Boolean = plan match {
+    case f: FilterExec => isScanOperation(f.child)
+    case p: ProjectExec => isScanOperation(p.child)
+    case _: FileSourceScanExec => true
+    case _ => false
+  }
+
+  private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = {
+    plan.collectFirst {
+      case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty &&
+          f.optionalNumCoalescedBuckets.isEmpty =>
+        f.relation.bucketSpec.get
+    }
+  }
+
+  /**
+   * The join keys should match with expressions for output partitioning. Note that
+   * the ordering does not matter because it will be handled in `EnsureRequirements`.

Review comment:
       @viirya - yes, no worries, thanks for careful review on this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659138022






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
maropu commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-662165053


   Thanks, all! Merged to master.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
imback82 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r455950182



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -178,7 +235,16 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS
         rightKeys = rCols.reverse,
         leftRelation = lRel,
         rightRelation = RelationSetting(rCols, 8, Some(4)),
-        isSortMergeJoin = true))
+        joinOperator = sortMergeJoin,
+        shjBuildSide = None))
+
+      run(JoinSetting(
+        leftKeys = lCols.reverse,
+        rightKeys = rCols.reverse,
+        leftRelation = lRel,
+        rightRelation = RelationSetting(rCols, 8, Some(4)),
+        joinOperator = shuffledHashJoin,
+        shjBuildSide = Some(BuildLeft)))

Review comment:
       nit: test `BuildRight` as well?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -103,46 +119,69 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS
   }
 
   test("bucket coalescing - basic") {
-    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") {
+    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = sortMergeJoin))
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = shuffledHashJoin,
+        shjBuildSide = Some(BuildLeft)))
+      // Coalescing bucket should not happen when the target is on shuffled hash join

Review comment:
       How about pulling this out as a separate test?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
maropu commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-657289124


   Could you show us performance numbers in the PR description, first? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659210167


   **[Test build #125955 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125955/testReport)** for PR 29079 at commit [`11d138b`](https://github.com/apache/spark/commit/11d138bf188212f7248592c236dd1831e0b8affa).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-661795205


   LGTM except for some comments in the test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660429662






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-657287917


   **[Test build #125736 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125736/testReport)** for PR 29079 at commit [`64a95d1`](https://github.com/apache/spark/commit/64a95d1a4e95ec978a00c56e97a142dedfa371c9).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-657288056






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660440566






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-661939804






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r454827484



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2645,21 +2645,22 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED =
-    buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled")
+  val COALESCE_BUCKETS_IN_JOIN_ENABLED =
+    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
       .doc("When true, if two bucketed tables with the different number of buckets are joined, " +
         "the side with a bigger number of buckets will be coalesced to have the same number " +
-        "of buckets as the other side. Bucket coalescing is applied only to sort-merge joins " +
-        "and only when the bigger number of buckets is divisible by the smaller number of buckets.")
+        "of buckets as the other side. Bigger number of buckets is divisible by the smaller " +
+        "number of buckets. Bucket coalescing is applied to sort-merge joins and " +
+        "shuffled hash join.")
       .version("3.1.0")
       .booleanConf
       .createWithDefault(false)
 
-  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO =
-    buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
+  val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
+    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")

Review comment:
       Is it okay to share this parameter between sort-merge/hash joins? As @viirya suggested, we have some risk of OOM. So, I think we need a different threshold policy for the hash-join case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660440568


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/126095/
   Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r458019741



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -178,7 +244,24 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS
         rightKeys = rCols.reverse,
         leftRelation = lRel,
         rightRelation = RelationSetting(rCols, 8, Some(4)),
-        isSortMergeJoin = true))
+        joinOperator = SORT_MERGE_JOIN,
+        shjBuildSide = None))
+
+      run(JoinSetting(
+        leftKeys = lCols.reverse,
+        rightKeys = rCols.reverse,
+        leftRelation = lRel,
+        rightRelation = RelationSetting(rCols, 8, Some(4)),

Review comment:
       wait, I think we can't coalesce the build side?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660590396






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r455193119



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2659,7 +2660,19 @@ object SQLConf {
     buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
       .doc("The ratio of the number of two buckets being coalesced should be less than or " +
         "equal to this value for bucket coalescing to be applied. This configuration only " +
-        s"has an effect when '${COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key}' is set to true.")
+        s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
+      .version("3.1.0")
+      .intConf
+      .checkValue(_ > 0, "The difference must be positive.")
+      .createWithDefault(4)

Review comment:
       @maropu, sure, updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-657354358






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-658857701


   **[Test build #125889 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125889/testReport)** for PR 29079 at commit [`43a59b9`](https://github.com/apache/spark/commit/43a59b92a49af24794c6e42379a45ec1916a6f0f).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-662101678






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660590396






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660440472


   **[Test build #126095 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126095/testReport)** for PR 29079 at commit [`d620940`](https://github.com/apache/spark/commit/d6209407731bbed2602c1d6a05c7c50982561faf).
    * This patch **fails due to an unknown error code, -9**.
    * This patch **does not merge cleanly**.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659075273


   **[Test build #125889 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125889/testReport)** for PR 29079 at commit [`43a59b9`](https://github.com/apache/spark/commit/43a59b92a49af24794c6e42379a45ec1916a6f0f).
    * This patch **fails PySpark pip packaging tests**.
    * This patch **does not merge cleanly**.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-662101678






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r456232773



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import scala.annotation.tailrec
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
+ * if the following conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - Join keys match with output partition expressions on their respective sides.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
+ *   - The ratio of the number of buckets is less than the value set in
+ *     COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO (`SortMergeJoin`) or,
+ *     COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO (`ShuffledHashJoin`).
+ */
+case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
+  private def updateNumCoalescedBuckets(
+      join: BaseJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): BaseJoinExec = {
+    if (numCoalescedBuckets != numLeftBuckets) {
+      val leftCoalescedChild = join.left transformUp {
+        case f: FileSourceScanExec =>
+          f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+      }
+      join match {
+        case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild)
+      }
+    } else {
+      val rightCoalescedChild = join.right transformUp {
+        case f: FileSourceScanExec =>
+          f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+      }
+      join match {
+        case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild)
+      }
+    }
+  }
+
+  private def isCoalesceSHJStreamSide(
+      join: ShuffledHashJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): Boolean = {
+    if (numCoalescedBuckets == numLeftBuckets) {
+      join.buildSide != BuildRight
+    } else {
+      join.buildSide != BuildLeft
+    }
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.coalesceBucketsInJoinEnabled) {
+      return plan
+    }
+
+    plan transform {
+      case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) =>
+        val bucketRatio = math.max(numLeftBuckets, numRightBuckets) /
+          math.min(numLeftBuckets, numRightBuckets)
+        val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
+        join match {
+          case j: SortMergeJoinExec
+            if bucketRatio <= conf.coalesceBucketsInSortMergeJoinMaxBucketRatio =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case j: ShuffledHashJoinExec
+            // Only coalesce the buckets for shuffled hash join stream side,
+            // to avoid OOM for build side.
+            if bucketRatio <= conf.coalesceBucketsInShuffledHashJoinMaxBucketRatio &&
+              isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case other => other
+        }
+      case other => other
+    }
+  }
+}
+
+/**
+ * An extractor that extracts `SortMergeJoinExec` and `ShuffledHashJoin`,
+ * where both sides of the join have the bucketed tables,
+ * are consisted of only the scan operation,
+ * and numbers of buckets are not equal but divisible.
+ */
+object ExtractJoinWithBuckets {
+  @tailrec
+  private def isScanOperation(plan: SparkPlan): Boolean = plan match {
+    case f: FilterExec => isScanOperation(f.child)
+    case p: ProjectExec => isScanOperation(p.child)
+    case _: FileSourceScanExec => true
+    case _ => false
+  }
+
+  private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = {
+    plan.collectFirst {
+      case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty &&
+          f.optionalNumCoalescedBuckets.isEmpty =>
+        f.relation.bucketSpec.get
+    }
+  }
+
+  /**
+   * The join keys should match with expressions for output partitioning. Note that
+   * the ordering does not matter because it will be handled in `EnsureRequirements`.
+   */
+  private def satisfiesOutputPartitioning(
+      keys: Seq[Expression],
+      partitioning: Partitioning): Boolean = {
+    partitioning match {
+      case HashPartitioning(exprs, _) if exprs.length == keys.length =>
+        exprs.forall(e => keys.exists(_.semanticEquals(e)))
+      case _ => false
+    }
+  }
+
+  private def isApplicable(j: BaseJoinExec): Boolean = {
+    (j.isInstanceOf[SortMergeJoinExec] ||
+      j.isInstanceOf[ShuffledHashJoinExec]) &&
+      isScanOperation(j.left) &&
+      isScanOperation(j.right) &&
+      satisfiesOutputPartitioning(j.leftKeys, j.left.outputPartitioning) &&
+      satisfiesOutputPartitioning(j.rightKeys, j.right.outputPartitioning)
+  }
+
+  private def isDivisible(numBuckets1: Int, numBuckets2: Int): Boolean = {
+    val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2))
+    // A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller
+    // number of buckets because bucket id is calculated by modding the total number of buckets.
+    numBuckets1 != numBuckets2 && large % small == 0
+  }
+
+  def unapply(plan: SparkPlan): Option[(BaseJoinExec, Int, Int)] = {
+    plan match {
+      case s: BaseJoinExec if isApplicable(s) =>

Review comment:
       @imback82 - my bad, updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r458017324



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -103,46 +119,78 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS
   }
 
   test("bucket coalescing - basic") {
-    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") {
+    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SORT_MERGE_JOIN))
       run(JoinSetting(
-        RelationSetting(4, None), RelationSetting(8, Some(4)), isSortMergeJoin = true))
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SHUFFLED_HASH_JOIN,
+        shjBuildSide = Some(BuildLeft)))
     }
-    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "false") {
-      run(JoinSetting(RelationSetting(4, None), RelationSetting(8, None), isSortMergeJoin = true))
+
+    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") {
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, None), joinOperator = BROADCAST_HASH_JOIN))

Review comment:
       nit: this test is duplicated with the one in `bucket coalescing should work only for sort merge join and shuffled hash join`. We can remove it from here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r458249837



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import scala.annotation.tailrec
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
+ * if the following conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - Join keys match with output partition expressions on their respective sides.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
+ *   - The ratio of the number of buckets is less than the value set in
+ *     COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
+ */
+case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
+  private def updateNumCoalescedBucketsInScan(
+      plan: SparkPlan,
+      numCoalescedBuckets: Int): SparkPlan = {
+    plan transformUp {
+      case f: FileSourceScanExec =>
+        f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+    }
+  }
+
+  private def updateNumCoalescedBuckets(
+      join: BaseJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): BaseJoinExec = {
+    if (numCoalescedBuckets != numLeftBuckets) {
+      val leftCoalescedChild =
+        updateNumCoalescedBucketsInScan(join.left, numCoalescedBuckets)
+      join match {
+        case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild)
+      }
+    } else {
+      val rightCoalescedChild =
+        updateNumCoalescedBucketsInScan(join.right, numCoalescedBuckets)
+      join match {
+        case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild)
+      }
+    }
+  }
+
+  private def isCoalesceSHJStreamSide(
+      join: ShuffledHashJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): Boolean = {
+    if (numCoalescedBuckets == numLeftBuckets) {
+      join.buildSide != BuildRight
+    } else {
+      join.buildSide != BuildLeft
+    }
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.coalesceBucketsInJoinEnabled) {
+      return plan
+    }
+
+    plan transform {
+      case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets)
+        if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <=
+          conf.coalesceBucketsInJoinMaxBucketRatio =>
+        val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
+        join match {
+          case j: SortMergeJoinExec =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case j: ShuffledHashJoinExec
+            // Only coalesce the buckets for shuffled hash join stream side,
+            // to avoid OOM for build side.
+            if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case other => other
+        }
+      case other => other
+    }
+  }
+}
+
+/**
+ * An extractor that extracts `SortMergeJoinExec` and `ShuffledHashJoin`,
+ * where both sides of the join have the bucketed tables,
+ * are consisted of only the scan operation,
+ * and numbers of buckets are not equal but divisible.
+ */
+object ExtractJoinWithBuckets {
+  @tailrec
+  private def isScanOperation(plan: SparkPlan): Boolean = plan match {

Review comment:
       @viirya - agree, updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660599800


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu edited a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
maropu edited a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-657289124


   Could you show us performance numbers in the PR description, first? I think we need to check the trade-off between #parallelism and shuffle I/O.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659909720


   Merged build finished. Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-657300530


   > We build hash map for each bucket on other side and it also sounds to OOM easily. This feature is disabled by a config by default, so it may be okay. But we should be careful not to enable it by default later.
   
   @viirya, thanks for comment. I agree this feature should be selectively enabled, but sorry I don't see OOM has anything to do with this feature.
   
   You are saying OOM is an issue for shuffled hash join on bucketed table, which I agree. This feature is coalescing on stream side (not touch build side at all), so I don't think it's adding any more risk for OOM on build side. As [sort merge join is by default preferred over shuffled hash join](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L335), so when users enable shuffled hash join by config explicitly, they should already pay attention to OOM problem.
   
   Am I miss anything? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r455969279



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2659,12 +2660,24 @@ object SQLConf {
     buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
       .doc("The ratio of the number of two buckets being coalesced should be less than or " +
         "equal to this value for bucket coalescing to be applied. This configuration only " +
-        s"has an effect when '${COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key}' is set to true.")
+        s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
       .version("3.1.0")
       .intConf
       .checkValue(_ > 0, "The difference must be positive.")
       .createWithDefault(4)
 
+  val COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO =
+    buildConf("spark.sql.bucketing.coalesceBucketsInShuffledHashJoin.maxBucketRatio")
+      .doc("The ratio of the number of two buckets being coalesced should be less than or " +
+        "equal to this value for bucket coalescing to be applied. This configuration only " +
+        s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true. " +
+        "Note as coalescing reduces parallelism, there might be a higher risk for " +
+        "out of memory error at shuffled hash join build side.")
+      .version("3.1.0")
+      .intConf
+      .checkValue(_ > 0, "The difference must be positive.")
+      .createWithDefault(2)

Review comment:
       @cloud-fan - I feel it's not very necessary for most of cases, but I am fine either way. The only cases I can think it to be useful is:
   
   (1).if we want to enable coalesce bucketed tables by default in the future. We probably want to have a separate configs to be more cautious about shuffled hash join, as it potentially can bring more OOM on build side.
   
   (2).user has one complicated query involved shuffled hash join and sort merge join on bucketed tables, and they want to tune coalescing for each join separately.
   
   @maropu - wondering what do you think? keep them separately or not?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-657353621


   > Could you show us performance numbers in the PR description, first? I think we need to check the trade-off between #parallelism and shuffle I/O.
   
   @maropu, update PR description for one test query in TPCDS (with modification). Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659210905






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659075799


   Build finished. Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660599886






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-662165418


   Thank you all @maropu, @cloud-fan, @viirya and @imback82 for review!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r455557231



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import scala.annotation.tailrec
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
+ * if the following conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - Join keys match with output partition expressions on their respective sides.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
+ *   - The ratio of the number of buckets is less than the value set in
+ *     COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO (`SortMergeJoin`) or,
+ *     COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO (`ShuffledHashJoin`).
+ */
+case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
+  private def updateNumCoalescedBuckets(
+      join: BaseJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): BaseJoinExec = {
+    if (numCoalescedBuckets != numLeftBuckets) {
+      val leftCoalescedChild = join.left transformUp {
+        case f: FileSourceScanExec =>
+          f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+      }

Review comment:
       nit: How about pulling out this code part as a private method?
   ```
         val leftCoalescedChild = updateNumCoalescedBucketsInScan(join.left, numCoalescedBuckets)
         ...
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-658883356






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r458224938



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import scala.annotation.tailrec
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
+ * if the following conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - Join keys match with output partition expressions on their respective sides.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
+ *   - The ratio of the number of buckets is less than the value set in
+ *     COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
+ */
+case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
+  private def updateNumCoalescedBucketsInScan(
+      plan: SparkPlan,
+      numCoalescedBuckets: Int): SparkPlan = {
+    plan transformUp {
+      case f: FileSourceScanExec =>
+        f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+    }
+  }
+
+  private def updateNumCoalescedBuckets(
+      join: BaseJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): BaseJoinExec = {
+    if (numCoalescedBuckets != numLeftBuckets) {
+      val leftCoalescedChild =
+        updateNumCoalescedBucketsInScan(join.left, numCoalescedBuckets)
+      join match {
+        case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild)
+      }
+    } else {
+      val rightCoalescedChild =
+        updateNumCoalescedBucketsInScan(join.right, numCoalescedBuckets)
+      join match {
+        case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild)
+      }
+    }
+  }
+
+  private def isCoalesceSHJStreamSide(
+      join: ShuffledHashJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): Boolean = {
+    if (numCoalescedBuckets == numLeftBuckets) {
+      join.buildSide != BuildRight
+    } else {
+      join.buildSide != BuildLeft
+    }
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.coalesceBucketsInJoinEnabled) {
+      return plan
+    }
+
+    plan transform {
+      case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets)
+        if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <=
+          conf.coalesceBucketsInJoinMaxBucketRatio =>
+        val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
+        join match {
+          case j: SortMergeJoinExec =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case j: ShuffledHashJoinExec
+            // Only coalesce the buckets for shuffled hash join stream side,
+            // to avoid OOM for build side.
+            if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case other => other
+        }
+      case other => other
+    }
+  }
+}
+
+/**
+ * An extractor that extracts `SortMergeJoinExec` and `ShuffledHashJoin`,
+ * where both sides of the join have the bucketed tables,
+ * are consisted of only the scan operation,
+ * and numbers of buckets are not equal but divisible.
+ */
+object ExtractJoinWithBuckets {
+  @tailrec
+  private def isScanOperation(plan: SparkPlan): Boolean = plan match {

Review comment:
       nit: I think this should be `hasScanOperation`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660440566


   Build finished. Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
maropu commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659207696






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659210905






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660590273


   **[Test build #126121 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126121/testReport)** for PR 29079 at commit [`4c65c7f`](https://github.com/apache/spark/commit/4c65c7f6293f1a4459123d0d9cb57c00f4ebf364).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-661984831






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660429520


   **[Test build #126095 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126095/testReport)** for PR 29079 at commit [`d620940`](https://github.com/apache/spark/commit/d6209407731bbed2602c1d6a05c7c50982561faf).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r458200868



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -103,46 +119,78 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS
   }
 
   test("bucket coalescing - basic") {
-    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") {
+    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SORT_MERGE_JOIN))
       run(JoinSetting(
-        RelationSetting(4, None), RelationSetting(8, Some(4)), isSortMergeJoin = true))
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SHUFFLED_HASH_JOIN,
+        shjBuildSide = Some(BuildLeft)))
     }
-    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "false") {
-      run(JoinSetting(RelationSetting(4, None), RelationSetting(8, None), isSortMergeJoin = true))
+
+    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") {
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, None), joinOperator = BROADCAST_HASH_JOIN))

Review comment:
       @cloud-fan - updated.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -103,46 +119,78 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS
   }
 
   test("bucket coalescing - basic") {
-    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") {
+    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SORT_MERGE_JOIN))
       run(JoinSetting(
-        RelationSetting(4, None), RelationSetting(8, Some(4)), isSortMergeJoin = true))
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SHUFFLED_HASH_JOIN,
+        shjBuildSide = Some(BuildLeft)))
     }
-    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "false") {
-      run(JoinSetting(RelationSetting(4, None), RelationSetting(8, None), isSortMergeJoin = true))
+
+    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") {
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, None), joinOperator = BROADCAST_HASH_JOIN))
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN))
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN,
+        shjBuildSide = Some(BuildLeft)))
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN,
+        shjBuildSide = Some(BuildRight)))

Review comment:
       @cloud-fan - updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-661099329


   @cloud-fan - all comments are addressed, wondering is there any other things needed for this PR? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r458198597



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -178,7 +244,24 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS
         rightKeys = rCols.reverse,
         leftRelation = lRel,
         rightRelation = RelationSetting(rCols, 8, Some(4)),
-        isSortMergeJoin = true))
+        joinOperator = SORT_MERGE_JOIN,
+        shjBuildSide = None))
+
+      run(JoinSetting(
+        leftKeys = lCols.reverse,
+        rightKeys = rCols.reverse,
+        leftRelation = lRel,
+        rightRelation = RelationSetting(rCols, 8, Some(4)),

Review comment:
       oh sorry I misread the code. I see a similar test below which changes the build side and I thought one must be wrong. Actually you switch the join keys.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r456232703



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -103,46 +119,69 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS
   }
 
   test("bucket coalescing - basic") {
-    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") {
+    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = sortMergeJoin))
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = shuffledHashJoin,
+        shjBuildSide = Some(BuildLeft)))
+      // Coalescing bucket should not happen when the target is on shuffled hash join
+      // build side.
       run(JoinSetting(
-        RelationSetting(4, None), RelationSetting(8, Some(4)), isSortMergeJoin = true))
+        RelationSetting(4, None), RelationSetting(8, None), joinOperator = shuffledHashJoin,
+        shjBuildSide = Some(BuildRight)))
     }
-    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "false") {
-      run(JoinSetting(RelationSetting(4, None), RelationSetting(8, None), isSortMergeJoin = true))
+    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") {
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, None), joinOperator = broadcastHashJoin))

Review comment:
       @cloud-fan - updated with extra test for SMJ.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r458200015



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -178,7 +244,24 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS
         rightKeys = rCols.reverse,
         leftRelation = lRel,
         rightRelation = RelationSetting(rCols, 8, Some(4)),
-        isSortMergeJoin = true))
+        joinOperator = SORT_MERGE_JOIN,
+        shjBuildSide = None))
+
+      run(JoinSetting(
+        leftKeys = lCols.reverse,
+        rightKeys = rCols.reverse,
+        leftRelation = lRel,
+        rightRelation = RelationSetting(rCols, 8, Some(4)),

Review comment:
       @cloud-fan yes, thanks for reviewing so carefully. Updated the PR and addressed comments to remove duplicated tests. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r458227705



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import scala.annotation.tailrec
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
+ * if the following conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - Join keys match with output partition expressions on their respective sides.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
+ *   - The ratio of the number of buckets is less than the value set in
+ *     COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
+ */
+case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
+  private def updateNumCoalescedBucketsInScan(
+      plan: SparkPlan,
+      numCoalescedBuckets: Int): SparkPlan = {
+    plan transformUp {
+      case f: FileSourceScanExec =>
+        f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+    }
+  }
+
+  private def updateNumCoalescedBuckets(
+      join: BaseJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): BaseJoinExec = {
+    if (numCoalescedBuckets != numLeftBuckets) {
+      val leftCoalescedChild =
+        updateNumCoalescedBucketsInScan(join.left, numCoalescedBuckets)
+      join match {
+        case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild)
+      }
+    } else {
+      val rightCoalescedChild =
+        updateNumCoalescedBucketsInScan(join.right, numCoalescedBuckets)
+      join match {
+        case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild)
+      }
+    }
+  }
+
+  private def isCoalesceSHJStreamSide(
+      join: ShuffledHashJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): Boolean = {
+    if (numCoalescedBuckets == numLeftBuckets) {
+      join.buildSide != BuildRight
+    } else {
+      join.buildSide != BuildLeft
+    }
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.coalesceBucketsInJoinEnabled) {
+      return plan
+    }
+
+    plan transform {
+      case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets)
+        if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <=
+          conf.coalesceBucketsInJoinMaxBucketRatio =>
+        val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
+        join match {
+          case j: SortMergeJoinExec =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case j: ShuffledHashJoinExec
+            // Only coalesce the buckets for shuffled hash join stream side,
+            // to avoid OOM for build side.
+            if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case other => other
+        }
+      case other => other
+    }
+  }
+}
+
+/**
+ * An extractor that extracts `SortMergeJoinExec` and `ShuffledHashJoin`,
+ * where both sides of the join have the bucketed tables,
+ * are consisted of only the scan operation,
+ * and numbers of buckets are not equal but divisible.
+ */
+object ExtractJoinWithBuckets {
+  @tailrec
+  private def isScanOperation(plan: SparkPlan): Boolean = plan match {
+    case f: FilterExec => isScanOperation(f.child)
+    case p: ProjectExec => isScanOperation(p.child)
+    case _: FileSourceScanExec => true
+    case _ => false
+  }
+
+  private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = {
+    plan.collectFirst {
+      case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty &&
+          f.optionalNumCoalescedBuckets.isEmpty =>
+        f.relation.bucketSpec.get
+    }
+  }
+
+  /**
+   * The join keys should match with expressions for output partitioning. Note that
+   * the ordering does not matter because it will be handled in `EnsureRequirements`.

Review comment:
       "The order doesn't matter", looks correct. But I'm not sure why it is related to `EnsureRequirements`? `EnsureRequirements` should not modify the order of hash partitioning expressions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659375946


   Merged build finished. Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659909552


   **[Test build #126032 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126032/testReport)** for PR 29079 at commit [`d620940`](https://github.com/apache/spark/commit/d6209407731bbed2602c1d6a05c7c50982561faf).
    * This patch **fails due to an unknown error code, -9**.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-658497587


   cc @cloud-fan 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r456862018



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2659,12 +2660,24 @@ object SQLConf {
     buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
       .doc("The ratio of the number of two buckets being coalesced should be less than or " +
         "equal to this value for bucket coalescing to be applied. This configuration only " +
-        s"has an effect when '${COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key}' is set to true.")
+        s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
       .version("3.1.0")
       .intConf
       .checkValue(_ > 0, "The difference must be positive.")
       .createWithDefault(4)
 
+  val COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO =
+    buildConf("spark.sql.bucketing.coalesceBucketsInShuffledHashJoin.maxBucketRatio")
+      .doc("The ratio of the number of two buckets being coalesced should be less than or " +
+        "equal to this value for bucket coalescing to be applied. This configuration only " +
+        s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true. " +
+        "Note as coalescing reduces parallelism, there might be a higher risk for " +
+        "out of memory error at shuffled hash join build side.")
+      .version("3.1.0")
+      .intConf
+      .checkValue(_ > 0, "The difference must be positive.")
+      .createWithDefault(2)

Review comment:
       Sounds good. Updated to single ratio config for SMJ and SHJ. Thanks. cc @maropu and @viirya.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660429662






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r455724248



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -103,46 +119,69 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS
   }
 
   test("bucket coalescing - basic") {
-    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") {
+    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = sortMergeJoin))
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = shuffledHashJoin,
+        shjBuildSide = Some(BuildLeft)))
+      // Coalescing bucket should not happen when the target is on shuffled hash join
+      // build side.
       run(JoinSetting(
-        RelationSetting(4, None), RelationSetting(8, Some(4)), isSortMergeJoin = true))
+        RelationSetting(4, None), RelationSetting(8, None), joinOperator = shuffledHashJoin,
+        shjBuildSide = Some(BuildRight)))
     }
-    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "false") {
-      run(JoinSetting(RelationSetting(4, None), RelationSetting(8, None), isSortMergeJoin = true))
+    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") {
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, None), joinOperator = broadcastHashJoin))

Review comment:
       nit: test SMJ as well




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r456839329



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2659,12 +2660,24 @@ object SQLConf {
     buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
       .doc("The ratio of the number of two buckets being coalesced should be less than or " +
         "equal to this value for bucket coalescing to be applied. This configuration only " +
-        s"has an effect when '${COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key}' is set to true.")
+        s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
       .version("3.1.0")
       .intConf
       .checkValue(_ > 0, "The difference must be positive.")
       .createWithDefault(4)
 
+  val COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO =
+    buildConf("spark.sql.bucketing.coalesceBucketsInShuffledHashJoin.maxBucketRatio")
+      .doc("The ratio of the number of two buckets being coalesced should be less than or " +
+        "equal to this value for bucket coalescing to be applied. This configuration only " +
+        s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true. " +
+        "Note as coalescing reduces parallelism, there might be a higher risk for " +
+        "out of memory error at shuffled hash join build side.")
+      .version("3.1.0")
+      .intConf
+      .checkValue(_ > 0, "The difference must be positive.")
+      .createWithDefault(2)

Review comment:
       It looks a bit premature optimization to me for now. I think we can leave as one ratio config first and add it when it is required to have?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659210167


   **[Test build #125955 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125955/testReport)** for PR 29079 at commit [`11d138b`](https://github.com/apache/spark/commit/11d138bf188212f7248592c236dd1831e0b8affa).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r458018007



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -103,46 +119,78 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS
   }
 
   test("bucket coalescing - basic") {
-    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") {
+    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SORT_MERGE_JOIN))
       run(JoinSetting(
-        RelationSetting(4, None), RelationSetting(8, Some(4)), isSortMergeJoin = true))
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SHUFFLED_HASH_JOIN,
+        shjBuildSide = Some(BuildLeft)))
     }
-    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "false") {
-      run(JoinSetting(RelationSetting(4, None), RelationSetting(8, None), isSortMergeJoin = true))
+
+    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") {
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, None), joinOperator = BROADCAST_HASH_JOIN))
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN))
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN,
+        shjBuildSide = Some(BuildLeft)))
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN,
+        shjBuildSide = Some(BuildRight)))

Review comment:
       nit: this test is duplicated in `bucket coalescing shouldn't be applied to shuffled hash join build side`. We can remove it from here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659075799






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r456233046



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -103,46 +119,69 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS
   }
 
   test("bucket coalescing - basic") {
-    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") {
+    withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") {
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = sortMergeJoin))
+      run(JoinSetting(
+        RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = shuffledHashJoin,
+        shjBuildSide = Some(BuildLeft)))
+      // Coalescing bucket should not happen when the target is on shuffled hash join

Review comment:
       @imback82 - yes, extracting this to a new test - `bucket coalescing shouldn't be applied to shuffled hash join build side`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-662140658


   **[Test build #126266 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126266/testReport)** for PR 29079 at commit [`b1a8a92`](https://github.com/apache/spark/commit/b1a8a927f283b095716a638a26f9c5d5e9cc380c).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659137174


   **[Test build #125903 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125903/testReport)** for PR 29079 at commit [`11d138b`](https://github.com/apache/spark/commit/11d138bf188212f7248592c236dd1831e0b8affa).
    * This patch **fails PySpark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659909720






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660599665






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-662100916


   **[Test build #126263 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126263/testReport)** for PR 29079 at commit [`7b20049`](https://github.com/apache/spark/commit/7b200498be973c2bc5016362f7e65266d56e77c7).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659375958


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/125955/
   Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660599668


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/126121/
   Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-657288056






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu closed pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
maropu closed pull request #29079:
URL: https://github.com/apache/spark/pull/29079


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-661673387


   It's too late today. I will take another look tomorrow if this is not merged yet.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659874311






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659909723


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/126032/
   Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-658859828






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-658857701


   **[Test build #125889 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125889/testReport)** for PR 29079 at commit [`43a59b9`](https://github.com/apache/spark/commit/43a59b92a49af24794c6e42379a45ec1916a6f0f).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660599665


   Merged build finished. Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r458283470



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import scala.annotation.tailrec
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
+ * if the following conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - Join keys match with output partition expressions on their respective sides.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
+ *   - The ratio of the number of buckets is less than the value set in
+ *     COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
+ */
+case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
+  private def updateNumCoalescedBucketsInScan(
+      plan: SparkPlan,
+      numCoalescedBuckets: Int): SparkPlan = {
+    plan transformUp {
+      case f: FileSourceScanExec =>
+        f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+    }
+  }
+
+  private def updateNumCoalescedBuckets(
+      join: BaseJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): BaseJoinExec = {
+    if (numCoalescedBuckets != numLeftBuckets) {
+      val leftCoalescedChild =
+        updateNumCoalescedBucketsInScan(join.left, numCoalescedBuckets)
+      join match {
+        case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild)
+      }
+    } else {
+      val rightCoalescedChild =
+        updateNumCoalescedBucketsInScan(join.right, numCoalescedBuckets)
+      join match {
+        case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild)
+      }
+    }
+  }
+
+  private def isCoalesceSHJStreamSide(
+      join: ShuffledHashJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): Boolean = {
+    if (numCoalescedBuckets == numLeftBuckets) {
+      join.buildSide != BuildRight
+    } else {
+      join.buildSide != BuildLeft
+    }
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.coalesceBucketsInJoinEnabled) {
+      return plan
+    }
+
+    plan transform {
+      case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets)
+        if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <=
+          conf.coalesceBucketsInJoinMaxBucketRatio =>
+        val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
+        join match {
+          case j: SortMergeJoinExec =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case j: ShuffledHashJoinExec
+            // Only coalesce the buckets for shuffled hash join stream side,
+            // to avoid OOM for build side.
+            if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case other => other
+        }
+      case other => other
+    }
+  }
+}
+
+/**
+ * An extractor that extracts `SortMergeJoinExec` and `ShuffledHashJoin`,
+ * where both sides of the join have the bucketed tables,
+ * are consisted of only the scan operation,
+ * and numbers of buckets are not equal but divisible.
+ */
+object ExtractJoinWithBuckets {
+  @tailrec
+  private def isScanOperation(plan: SparkPlan): Boolean = plan match {
+    case f: FilterExec => isScanOperation(f.child)
+    case p: ProjectExec => isScanOperation(p.child)
+    case _: FileSourceScanExec => true
+    case _ => false
+  }
+
+  private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = {
+    plan.collectFirst {
+      case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty &&
+          f.optionalNumCoalescedBuckets.isEmpty =>
+        f.relation.bucketSpec.get
+    }
+  }
+
+  /**
+   * The join keys should match with expressions for output partitioning. Note that
+   * the ordering does not matter because it will be handled in `EnsureRequirements`.

Review comment:
       Oh, I see. Missed that part. Yeah, it is correct that we need to reorder it to make sure the required distribution is met.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660590187


   Addressed all comments and rebased to latest master. Thanks. cc @maropu, @cloud-fan and @viirya.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660600532


   **[Test build #126126 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126126/testReport)** for PR 29079 at commit [`4c65c7f`](https://github.com/apache/spark/commit/4c65c7f6293f1a4459123d0d9cb57c00f4ebf364).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
maropu commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-661689921


   Good night~, @viirya 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-661983489


   **[Test build #126266 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126266/testReport)** for PR 29079 at commit [`b1a8a92`](https://github.com/apache/spark/commit/b1a8a927f283b095716a638a26f9c5d5e9cc380c).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660633439






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-662141347






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659874701


   **[Test build #126032 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126032/testReport)** for PR 29079 at commit [`d620940`](https://github.com/apache/spark/commit/d6209407731bbed2602c1d6a05c7c50982561faf).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-658883356






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660429520


   **[Test build #126095 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126095/testReport)** for PR 29079 at commit [`d620940`](https://github.com/apache/spark/commit/d6209407731bbed2602c1d6a05c7c50982561faf).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r456232826



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -178,7 +235,16 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS
         rightKeys = rCols.reverse,
         leftRelation = lRel,
         rightRelation = RelationSetting(rCols, 8, Some(4)),
-        isSortMergeJoin = true))
+        joinOperator = sortMergeJoin,
+        shjBuildSide = None))
+
+      run(JoinSetting(
+        leftKeys = lCols.reverse,
+        rightKeys = rCols.reverse,
+        leftRelation = lRel,
+        rightRelation = RelationSetting(rCols, 8, Some(4)),
+        joinOperator = shuffledHashJoin,
+        shjBuildSide = Some(BuildLeft)))

Review comment:
       @imback82 - updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659875108


   Addressed all comments besides the only one that - I am still keeping two ratio configs separately (SMJ and SHJ). Let me know if I need to change this. cc @maropu and @viirya, thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659138022


   Merged build finished. Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-658996208


   **[Test build #125903 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125903/testReport)** for PR 29079 at commit [`11d138b`](https://github.com/apache/spark/commit/11d138bf188212f7248592c236dd1831e0b8affa).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659075810


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/125889/
   Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-657354091


   **[Test build #125736 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125736/testReport)** for PR 29079 at commit [`64a95d1`](https://github.com/apache/spark/commit/64a95d1a4e95ec978a00c56e97a142dedfa371c9).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660599643


   **[Test build #126121 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126121/testReport)** for PR 29079 at commit [`4c65c7f`](https://github.com/apache/spark/commit/4c65c7f6293f1a4459123d0d9cb57c00f4ebf364).
    * This patch **fails due to an unknown error code, -9**.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660633282


   **[Test build #126126 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126126/testReport)** for PR 29079 at commit [`4c65c7f`](https://github.com/apache/spark/commit/4c65c7f6293f1a4459123d0d9cb57c00f4ebf364).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r454921360



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2645,21 +2645,22 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED =
-    buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled")
+  val COALESCE_BUCKETS_IN_JOIN_ENABLED =
+    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
       .doc("When true, if two bucketed tables with the different number of buckets are joined, " +
         "the side with a bigger number of buckets will be coalesced to have the same number " +
-        "of buckets as the other side. Bucket coalescing is applied only to sort-merge joins " +
-        "and only when the bigger number of buckets is divisible by the smaller number of buckets.")
+        "of buckets as the other side. Bigger number of buckets is divisible by the smaller " +
+        "number of buckets. Bucket coalescing is applied to sort-merge joins and " +
+        "shuffled hash join.")
       .version("3.1.0")
       .booleanConf
       .createWithDefault(false)
 
-  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO =
-    buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
+  val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
+    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")

Review comment:
       @maropu Make sense to me. Separated max bucket ratio configs for SHJ and SMJ for now, and added OOM related documentation to warn users.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-657302116


   Assume you are joining two tables with 512 and 256 buckets. Without coalescing table, two tables might be shuffled to 1024 or more partitions. Building hash map is okay. When coalescing table, now you build hash map on each bucket. Each bucket now has much more data than shuffling case. It sounds more likely to OOM.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-661943351


   **[Test build #126263 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126263/testReport)** for PR 29079 at commit [`7b20049`](https://github.com/apache/spark/commit/7b200498be973c2bc5016362f7e65266d56e77c7).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-661943351


   **[Test build #126263 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126263/testReport)** for PR 29079 at commit [`7b20049`](https://github.com/apache/spark/commit/7b200498be973c2bc5016362f7e65266d56e77c7).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659138030


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/125903/
   Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r458231770



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import scala.annotation.tailrec
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
+ * if the following conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - Join keys match with output partition expressions on their respective sides.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
+ *   - The ratio of the number of buckets is less than the value set in
+ *     COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
+ */
+case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
+  private def updateNumCoalescedBucketsInScan(
+      plan: SparkPlan,
+      numCoalescedBuckets: Int): SparkPlan = {
+    plan transformUp {
+      case f: FileSourceScanExec =>
+        f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+    }
+  }
+
+  private def updateNumCoalescedBuckets(
+      join: BaseJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): BaseJoinExec = {
+    if (numCoalescedBuckets != numLeftBuckets) {
+      val leftCoalescedChild =
+        updateNumCoalescedBucketsInScan(join.left, numCoalescedBuckets)
+      join match {
+        case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild)
+      }
+    } else {
+      val rightCoalescedChild =
+        updateNumCoalescedBucketsInScan(join.right, numCoalescedBuckets)
+      join match {
+        case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild)
+      }
+    }
+  }
+
+  private def isCoalesceSHJStreamSide(
+      join: ShuffledHashJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): Boolean = {
+    if (numCoalescedBuckets == numLeftBuckets) {
+      join.buildSide != BuildRight
+    } else {
+      join.buildSide != BuildLeft
+    }
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.coalesceBucketsInJoinEnabled) {
+      return plan
+    }
+
+    plan transform {
+      case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets)
+        if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <=
+          conf.coalesceBucketsInJoinMaxBucketRatio =>
+        val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
+        join match {
+          case j: SortMergeJoinExec =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case j: ShuffledHashJoinExec
+            // Only coalesce the buckets for shuffled hash join stream side,
+            // to avoid OOM for build side.
+            if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case other => other
+        }
+      case other => other
+    }
+  }
+}
+
+/**
+ * An extractor that extracts `SortMergeJoinExec` and `ShuffledHashJoin`,
+ * where both sides of the join have the bucketed tables,
+ * are consisted of only the scan operation,
+ * and numbers of buckets are not equal but divisible.
+ */
+object ExtractJoinWithBuckets {
+  @tailrec
+  private def isScanOperation(plan: SparkPlan): Boolean = plan match {
+    case f: FilterExec => isScanOperation(f.child)
+    case p: ProjectExec => isScanOperation(p.child)
+    case _: FileSourceScanExec => true
+    case _ => false
+  }
+
+  private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = {
+    plan.collectFirst {
+      case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty &&
+          f.optionalNumCoalescedBuckets.isEmpty =>
+        f.relation.bucketSpec.get
+    }
+  }
+
+  /**
+   * The join keys should match with expressions for output partitioning. Note that
+   * the ordering does not matter because it will be handled in `EnsureRequirements`.

Review comment:
       @viirya - [`EnsureRequirements. reorderJoinPredicates`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L194) will reorder join keys to match children output partitioning. BTW this was a comment added in #28123, and I am doing some refactoring here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660599886






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-657287917


   **[Test build #125736 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125736/testReport)** for PR 29079 at commit [`64a95d1`](https://github.com/apache/spark/commit/64a95d1a4e95ec978a00c56e97a142dedfa371c9).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-657354358






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r454827741



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2645,21 +2645,22 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED =
-    buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled")
+  val COALESCE_BUCKETS_IN_JOIN_ENABLED =
+    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
       .doc("When true, if two bucketed tables with the different number of buckets are joined, " +
         "the side with a bigger number of buckets will be coalesced to have the same number " +
-        "of buckets as the other side. Bucket coalescing is applied only to sort-merge joins " +
-        "and only when the bigger number of buckets is divisible by the smaller number of buckets.")
+        "of buckets as the other side. Bigger number of buckets is divisible by the smaller " +
+        "number of buckets. Bucket coalescing is applied to sort-merge joins and " +
+        "shuffled hash join.")
       .version("3.1.0")
       .booleanConf
       .createWithDefault(false)
 
-  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO =
-    buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
+  val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
+    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")

Review comment:
       Also, I think we need to describe the risk in `.doc`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660590273


   **[Test build #126121 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126121/testReport)** for PR 29079 at commit [`4c65c7f`](https://github.com/apache/spark/commit/4c65c7f6293f1a4459123d0d9cb57c00f4ebf364).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660428967


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-658859828






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659874311






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r454827741



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2645,21 +2645,22 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED =
-    buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled")
+  val COALESCE_BUCKETS_IN_JOIN_ENABLED =
+    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
       .doc("When true, if two bucketed tables with the different number of buckets are joined, " +
         "the side with a bigger number of buckets will be coalesced to have the same number " +
-        "of buckets as the other side. Bucket coalescing is applied only to sort-merge joins " +
-        "and only when the bigger number of buckets is divisible by the smaller number of buckets.")
+        "of buckets as the other side. Bigger number of buckets is divisible by the smaller " +
+        "number of buckets. Bucket coalescing is applied to sort-merge joins and " +
+        "shuffled hash join.")
       .version("3.1.0")
       .booleanConf
       .createWithDefault(false)
 
-  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO =
-    buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
+  val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
+    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")

Review comment:
       Also, I think we need to describe the risk clearly in `.doc`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-658996208


   **[Test build #125903 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125903/testReport)** for PR 29079 at commit [`11d138b`](https://github.com/apache/spark/commit/11d138bf188212f7248592c236dd1831e0b8affa).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] imback82 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
imback82 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r455948548



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import scala.annotation.tailrec
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
+ * if the following conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - Join keys match with output partition expressions on their respective sides.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
+ *   - The ratio of the number of buckets is less than the value set in
+ *     COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO (`SortMergeJoin`) or,
+ *     COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO (`ShuffledHashJoin`).
+ */
+case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
+  private def updateNumCoalescedBuckets(
+      join: BaseJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): BaseJoinExec = {
+    if (numCoalescedBuckets != numLeftBuckets) {
+      val leftCoalescedChild = join.left transformUp {
+        case f: FileSourceScanExec =>
+          f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+      }
+      join match {
+        case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild)
+      }
+    } else {
+      val rightCoalescedChild = join.right transformUp {
+        case f: FileSourceScanExec =>
+          f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+      }
+      join match {
+        case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild)
+        case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild)
+      }
+    }
+  }
+
+  private def isCoalesceSHJStreamSide(
+      join: ShuffledHashJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): Boolean = {
+    if (numCoalescedBuckets == numLeftBuckets) {
+      join.buildSide != BuildRight
+    } else {
+      join.buildSide != BuildLeft
+    }
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.coalesceBucketsInJoinEnabled) {
+      return plan
+    }
+
+    plan transform {
+      case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) =>
+        val bucketRatio = math.max(numLeftBuckets, numRightBuckets) /
+          math.min(numLeftBuckets, numRightBuckets)
+        val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets)
+        join match {
+          case j: SortMergeJoinExec
+            if bucketRatio <= conf.coalesceBucketsInSortMergeJoinMaxBucketRatio =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case j: ShuffledHashJoinExec
+            // Only coalesce the buckets for shuffled hash join stream side,
+            // to avoid OOM for build side.
+            if bucketRatio <= conf.coalesceBucketsInShuffledHashJoinMaxBucketRatio &&
+              isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) =>
+            updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets)
+          case other => other
+        }
+      case other => other
+    }
+  }
+}
+
+/**
+ * An extractor that extracts `SortMergeJoinExec` and `ShuffledHashJoin`,
+ * where both sides of the join have the bucketed tables,
+ * are consisted of only the scan operation,
+ * and numbers of buckets are not equal but divisible.
+ */
+object ExtractJoinWithBuckets {
+  @tailrec
+  private def isScanOperation(plan: SparkPlan): Boolean = plan match {
+    case f: FilterExec => isScanOperation(f.child)
+    case p: ProjectExec => isScanOperation(p.child)
+    case _: FileSourceScanExec => true
+    case _ => false
+  }
+
+  private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = {
+    plan.collectFirst {
+      case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty &&
+          f.optionalNumCoalescedBuckets.isEmpty =>
+        f.relation.bucketSpec.get
+    }
+  }
+
+  /**
+   * The join keys should match with expressions for output partitioning. Note that
+   * the ordering does not matter because it will be handled in `EnsureRequirements`.
+   */
+  private def satisfiesOutputPartitioning(
+      keys: Seq[Expression],
+      partitioning: Partitioning): Boolean = {
+    partitioning match {
+      case HashPartitioning(exprs, _) if exprs.length == keys.length =>
+        exprs.forall(e => keys.exists(_.semanticEquals(e)))
+      case _ => false
+    }
+  }
+
+  private def isApplicable(j: BaseJoinExec): Boolean = {
+    (j.isInstanceOf[SortMergeJoinExec] ||
+      j.isInstanceOf[ShuffledHashJoinExec]) &&
+      isScanOperation(j.left) &&
+      isScanOperation(j.right) &&
+      satisfiesOutputPartitioning(j.leftKeys, j.left.outputPartitioning) &&
+      satisfiesOutputPartitioning(j.rightKeys, j.right.outputPartitioning)
+  }
+
+  private def isDivisible(numBuckets1: Int, numBuckets2: Int): Boolean = {
+    val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2))
+    // A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller
+    // number of buckets because bucket id is calculated by modding the total number of buckets.
+    numBuckets1 != numBuckets2 && large % small == 0
+  }
+
+  def unapply(plan: SparkPlan): Option[(BaseJoinExec, Int, Int)] = {
+    plan match {
+      case s: BaseJoinExec if isApplicable(s) =>

Review comment:
       nit: Use `j` for join?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-661984831






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-657288265


   @maropu, @cloud-fan @gatorsmile @sameeragarwal Could you help check this PR? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660633439






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-662141347






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-657304351


   @viirya, I see your point for coalescing reduces parallelism to cause more OOM on build side. I agree this can happen. All in all, this is a disable-by-default feature, and user can selectively enable it depending on their table size. But I think it's worth to have as it indeed helped our users in production for using shuffled hash join on bucketed tables.
   
   Re OOM issue in shuffled hash join - I think we can add a fallback mechanism when building hash map and fall back to sort merge join if the size of hash map being too big to OOM (i.e. rethink https://issues.apache.org/jira/browse/SPARK-21505), we have been running this feature in production for years, and it works well.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r456163594



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2659,12 +2660,24 @@ object SQLConf {
     buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
       .doc("The ratio of the number of two buckets being coalesced should be less than or " +
         "equal to this value for bucket coalescing to be applied. This configuration only " +
-        s"has an effect when '${COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key}' is set to true.")
+        s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
       .version("3.1.0")
       .intConf
       .checkValue(_ > 0, "The difference must be positive.")
       .createWithDefault(4)
 
+  val COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO =
+    buildConf("spark.sql.bucketing.coalesceBucketsInShuffledHashJoin.maxBucketRatio")
+      .doc("The ratio of the number of two buckets being coalesced should be less than or " +
+        "equal to this value for bucket coalescing to be applied. This configuration only " +
+        s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true. " +
+        "Note as coalescing reduces parallelism, there might be a higher risk for " +
+        "out of memory error at shuffled hash join build side.")
+      .version("3.1.0")
+      .intConf
+      .checkValue(_ > 0, "The difference must be positive.")
+      .createWithDefault(2)

Review comment:
       If we use a single sahred config for bucket coalescing and a user sets a higher value at the config, SMJ will likely perform better, but SHJ will likely get less parallelism (then, OOM?). If a plan scans too many bucketed tables and the plan has both SMJ/SHJ, I personally think it is hard to control this coalescing mechanism by using the single shared config. WDYT, @viirya ? I suggested this new config by reading [you comment about OOM](https://github.com/apache/spark/pull/29079#pullrequestreview-446923677). If you think you don't need this config, removing it is okay to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r456232535



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import scala.annotation.tailrec
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
+ * if the following conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - Join keys match with output partition expressions on their respective sides.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
+ *   - The ratio of the number of buckets is less than the value set in
+ *     COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO (`SortMergeJoin`) or,
+ *     COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO (`ShuffledHashJoin`).
+ */
+case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] {
+  private def updateNumCoalescedBuckets(
+      join: BaseJoinExec,
+      numLeftBuckets: Int,
+      numRightBucket: Int,
+      numCoalescedBuckets: Int): BaseJoinExec = {
+    if (numCoalescedBuckets != numLeftBuckets) {
+      val leftCoalescedChild = join.left transformUp {
+        case f: FileSourceScanExec =>
+          f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+      }

Review comment:
       @maropu - sure. updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659375246


   **[Test build #125955 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/125955/testReport)** for PR 29079 at commit [`11d138b`](https://github.com/apache/spark/commit/11d138bf188212f7248592c236dd1831e0b8affa).
    * This patch **fails PySpark pip packaging tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r455723207



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -19,17 +19,21 @@ package org.apache.spark.sql.execution.bucketing
 
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.catalyst.optimizer.BuildLeft
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.execution.{BinaryExecNode, FileSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, PartitionSpec}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
 import org.apache.spark.sql.types.{IntegerType, StructType}
 
-class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkSession {
+class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
+  private val sortMergeJoin = "sortMergeJoin"

Review comment:
       nit: for constant we should use `SORT_MERGE_JOIN`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-659874701


   **[Test build #126032 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126032/testReport)** for PR 29079 at commit [`d620940`](https://github.com/apache/spark/commit/d6209407731bbed2602c1d6a05c7c50982561faf).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29079:
URL: https://github.com/apache/spark/pull/29079#issuecomment-660600532


   **[Test build #126126 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/126126/testReport)** for PR 29079 at commit [`4c65c7f`](https://github.com/apache/spark/commit/4c65c7f6293f1a4459123d0d9cb57c00f4ebf364).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r456232607



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala
##########
@@ -19,17 +19,21 @@ package org.apache.spark.sql.execution.bucketing
 
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.catalyst.optimizer.BuildLeft
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide}
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.execution.{BinaryExecNode, FileSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, PartitionSpec}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
 import org.apache.spark.sql.types.{IntegerType, StructType}
 
-class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkSession {
+class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession {
+  private val sortMergeJoin = "sortMergeJoin"

Review comment:
       @cloud-fan - sure. updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29079: [SPARK-32286][SQL] Coalesce bucketed table for shuffled hash join if applicable

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29079:
URL: https://github.com/apache/spark/pull/29079#discussion_r455720125



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2659,12 +2660,24 @@ object SQLConf {
     buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
       .doc("The ratio of the number of two buckets being coalesced should be less than or " +
         "equal to this value for bucket coalescing to be applied. This configuration only " +
-        s"has an effect when '${COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key}' is set to true.")
+        s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.")
       .version("3.1.0")
       .intConf
       .checkValue(_ > 0, "The difference must be positive.")
       .createWithDefault(4)
 
+  val COALESCE_BUCKETS_IN_SHUFFLED_HASH_JOIN_MAX_BUCKET_RATIO =
+    buildConf("spark.sql.bucketing.coalesceBucketsInShuffledHashJoin.maxBucketRatio")
+      .doc("The ratio of the number of two buckets being coalesced should be less than or " +
+        "equal to this value for bucket coalescing to be applied. This configuration only " +
+        s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true. " +
+        "Note as coalescing reduces parallelism, there might be a higher risk for " +
+        "out of memory error at shuffled hash join build side.")
+      .version("3.1.0")
+      .intConf
+      .checkValue(_ > 0, "The difference must be positive.")
+      .createWithDefault(2)

Review comment:
       is it really necessary to have 2 configs for the ratio?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org