You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/29 18:29:44 UTC
spark git commit: [SPARK-22916][SQL][FOLLOW-UP] Update the
Description of Join Selection
Repository: spark
Updated Branches:
refs/heads/master 0d60b3213 -> e30b34f7b
[SPARK-22916][SQL][FOLLOW-UP] Update the Description of Join Selection
## What changes were proposed in this pull request?
This PR is to update the description of the join algorithm changes.
## How was this patch tested?
N/A
Author: gatorsmile <ga...@gmail.com>
Closes #20420 from gatorsmile/followUp22916.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e30b34f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e30b34f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e30b34f7
Branch: refs/heads/master
Commit: e30b34f7bd9a687eb43d636fffeb98fe235fcbf4
Parents: 0d60b32
Author: gatorsmile <ga...@gmail.com>
Authored: Mon Jan 29 10:29:42 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Jan 29 10:29:42 2018 -0800
----------------------------------------------------------------------
.../spark/sql/execution/SparkStrategies.scala | 60 +++++++++++++++-----
1 file changed, 47 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e30b34f7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ce512bc..82b4eb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -91,23 +91,58 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* Select the proper physical plan for join based on joining keys and size of logical plan.
*
* At first, uses the [[ExtractEquiJoinKeys]] pattern to find joins where at least some of the
- * predicates can be evaluated by matching join keys. If found, Join implementations are chosen
+ * predicates can be evaluated by matching join keys. If found, join implementations are chosen
* with the following precedence:
*
- * - Broadcast: We prefer to broadcast the join side with an explicit broadcast hint(e.g. the
- * user applied the [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame).
- * If both sides have the broadcast hint, we prefer to broadcast the side with a smaller
- * estimated physical size. If neither one of the sides has the broadcast hint,
- * we only broadcast the join side if its estimated physical size that is smaller than
- * the user-configurable [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold.
+ * - Broadcast hash join (BHJ):
+ * BHJ is not supported for full outer join. For right outer join, we only can broadcast the
+ * left side. For left outer, left semi, left anti and the internal join type ExistenceJoin,
+ * we only can broadcast the right side. For inner like join, we can broadcast both sides.
+ * Normally, BHJ can perform faster than the other join algorithms when the broadcast side is
+ * small. However, broadcasting tables is a network-intensive operation. It could cause OOM
+ * or perform worse than the other join algorithms, especially when the build/broadcast side
+ * is big.
+ *
+ * For the supported cases, users can specify the broadcast hint (e.g. the user applied the
+ * [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame) and session-based
+ * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to adjust whether BHJ is used and
+ * which join side is broadcast.
+ *
+ * 1) Broadcast the join side with the broadcast hint, even if the size is larger than
+ * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint (only when the type
+ * is inner like join), the side with a smaller estimated physical size will be broadcast.
+ * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and broadcast the side
+ * whose estimated physical size is smaller than the threshold. If both sides are below the
+ * threshold, broadcast the smaller side. If neither is smaller, BHJ is not used.
+ *
* - Shuffle hash join: if the average size of a single partition is small enough to build a hash
* table.
+ *
* - Sort merge: if the matching join keys are sortable.
*
* If there is no joining keys, Join implementations are chosen with the following precedence:
- * - BroadcastNestedLoopJoin: if one side of the join could be broadcasted
- * - CartesianProduct: for Inner join
- * - BroadcastNestedLoopJoin
+ * - BroadcastNestedLoopJoin (BNLJ):
+ * BNLJ supports all the join types but the impl is OPTIMIZED for the following scenarios:
+ * For right outer join, the left side is broadcast. For left outer, left semi, left anti
+ * and the internal join type ExistenceJoin, the right side is broadcast. For inner like
+ * joins, either side is broadcast.
+ *
+ * Like BHJ, users still can specify the broadcast hint and session-based
+ * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to impact which side is broadcast.
+ *
+ * 1) Broadcast the join side with the broadcast hint, even if the size is larger than
+ * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint (i.e., just for
+ * inner-like join), the side with a smaller estimated physical size will be broadcast.
+ * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and broadcast the side
+ * whose estimated physical size is smaller than the threshold. If both sides are below the
+ * threshold, broadcast the smaller side. If neither is smaller, BNLJ is not used.
+ *
+ * - CartesianProduct: for inner like join, CartesianProduct is the fallback option.
+ *
+ * - BroadcastNestedLoopJoin (BNLJ):
+ * For the other join types, BNLJ is the fallback option. Here, we just pick the broadcast
+ * side with the broadcast hint. If neither side has a hint, we broadcast the side with
+ * the smaller estimated physical size.
*/
object JoinSelection extends Strategy with PredicateHelper {
@@ -140,8 +175,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
private def canBuildRight(joinType: JoinType): Boolean = joinType match {
- case _: InnerLike | LeftOuter | LeftSemi | LeftAnti => true
- case j: ExistenceJoin => true
+ case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true
case _ => false
}
@@ -244,7 +278,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// --- Without joining keys ------------------------------------------------------------
- // Pick BroadcastNestedLoopJoin if one side could be broadcasted
+ // Pick BroadcastNestedLoopJoin if one side could be broadcast
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org