You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/29 18:29:44 UTC

spark git commit: [SPARK-22916][SQL][FOLLOW-UP] Update the Description of Join Selection

Repository: spark
Updated Branches:
  refs/heads/master 0d60b3213 -> e30b34f7b


[SPARK-22916][SQL][FOLLOW-UP] Update the Description of Join Selection

## What changes were proposed in this pull request?
This PR is to update the description of the join algorithm changes.

## How was this patch tested?
N/A

Author: gatorsmile <ga...@gmail.com>

Closes #20420 from gatorsmile/followUp22916.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e30b34f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e30b34f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e30b34f7

Branch: refs/heads/master
Commit: e30b34f7bd9a687eb43d636fffeb98fe235fcbf4
Parents: 0d60b32
Author: gatorsmile <ga...@gmail.com>
Authored: Mon Jan 29 10:29:42 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Jan 29 10:29:42 2018 -0800

----------------------------------------------------------------------
 .../spark/sql/execution/SparkStrategies.scala   | 60 +++++++++++++++-----
 1 file changed, 47 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e30b34f7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ce512bc..82b4eb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -91,23 +91,58 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
    * Select the proper physical plan for join based on joining keys and size of logical plan.
    *
    * At first, uses the [[ExtractEquiJoinKeys]] pattern to find joins where at least some of the
-   * predicates can be evaluated by matching join keys. If found,  Join implementations are chosen
+   * predicates can be evaluated by matching join keys. If found, join implementations are chosen
    * with the following precedence:
    *
-   * - Broadcast: We prefer to broadcast the join side with an explicit broadcast hint(e.g. the
-   *     user applied the [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame).
-   *     If both sides have the broadcast hint, we prefer to broadcast the side with a smaller
-   *     estimated physical size. If neither one of the sides has the broadcast hint,
-   *     we only broadcast the join side if its estimated physical size that is smaller than
-   *     the user-configurable [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold.
+   * - Broadcast hash join (BHJ):
+   *     BHJ is not supported for full outer join. For right outer join, we only can broadcast the
+   *     left side. For left outer, left semi, left anti and the internal join type ExistenceJoin,
+   *     we only can broadcast the right side. For inner like join, we can broadcast both sides.
+   *     Normally, BHJ can perform faster than the other join algorithms when the broadcast side is
+   *     small. However, broadcasting tables is a network-intensive operation. It could cause OOM
+   *     or perform worse than the other join algorithms, especially when the build/broadcast side
+   *     is big.
+   *
+   *     For the supported cases, users can specify the broadcast hint (e.g. the user applied the
+   *     [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame) and session-based
+   *     [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to adjust whether BHJ is used and
+   *     which join side is broadcast.
+   *
+   *     1) Broadcast the join side with the broadcast hint, even if the size is larger than
+   *     [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint (only when the type
+   *     is inner like join), the side with a smaller estimated physical size will be broadcast.
+   *     2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and broadcast the side
+   *     whose estimated physical size is smaller than the threshold. If both sides are below the
+   *     threshold, broadcast the smaller side. If neither is smaller, BHJ is not used.
+   *
    * - Shuffle hash join: if the average size of a single partition is small enough to build a hash
    *     table.
+   *
    * - Sort merge: if the matching join keys are sortable.
    *
    * If there is no joining keys, Join implementations are chosen with the following precedence:
-   * - BroadcastNestedLoopJoin: if one side of the join could be broadcasted
-   * - CartesianProduct: for Inner join
-   * - BroadcastNestedLoopJoin
+   * - BroadcastNestedLoopJoin (BNLJ):
+   *     BNLJ supports all the join types but the impl is OPTIMIZED for the following scenarios:
+   *     For right outer join, the left side is broadcast. For left outer, left semi, left anti
+   *     and the internal join type ExistenceJoin, the right side is broadcast. For inner like
+   *     joins, either side is broadcast.
+   *
+   *     Like BHJ, users still can specify the broadcast hint and session-based
+   *     [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to impact which side is broadcast.
+   *
+   *     1) Broadcast the join side with the broadcast hint, even if the size is larger than
+   *     [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint (i.e., just for
+   *     inner-like join), the side with a smaller estimated physical size will be broadcast.
+   *     2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and broadcast the side
+   *     whose estimated physical size is smaller than the threshold. If both sides are below the
+   *     threshold, broadcast the smaller side. If neither is smaller, BNLJ is not used.
+   *
+   * - CartesianProduct: for inner like join, CartesianProduct is the fallback option.
+   *
+   * - BroadcastNestedLoopJoin (BNLJ):
+   *     For the other join types, BNLJ is the fallback option. Here, we just pick the broadcast
+   *     side with the broadcast hint. If neither side has a hint, we broadcast the side with
+   *     the smaller estimated physical size.
    */
   object JoinSelection extends Strategy with PredicateHelper {
 
@@ -140,8 +175,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
     }
 
     private def canBuildRight(joinType: JoinType): Boolean = joinType match {
-      case _: InnerLike | LeftOuter | LeftSemi | LeftAnti => true
-      case j: ExistenceJoin => true
+      case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true
       case _ => false
     }
 
@@ -244,7 +278,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
       // --- Without joining keys ------------------------------------------------------------
 
-      // Pick BroadcastNestedLoopJoin if one side could be broadcasted
+      // Pick BroadcastNestedLoopJoin if one side could be broadcast
       case j @ logical.Join(left, right, joinType, condition)
           if canBroadcastByHints(joinType, left, right) =>
         val buildSide = broadcastSideByHints(joinType, left, right)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org