You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/07/11 13:26:48 UTC
[spark] branch branch-3.0 updated: [SPARK-32220][SQL][3.0]
SHUFFLE_REPLICATE_NL Hint should not change Non-Cartesian Product join
result
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 10568ad [SPARK-32220][SQL][3.0] SHUFFLE_REPLICATE_NL Hint should not change Non-Cartesian Product join result
10568ad is described below
commit 10568ad13a437edd4a2aca36358ff717b15e1104
Author: angerszhu <an...@gmail.com>
AuthorDate: Sat Jul 11 06:24:27 2020 -0700
[SPARK-32220][SQL][3.0] SHUFFLE_REPLICATE_NL Hint should not change Non-Cartesian Product join result
### What changes were proposed in this pull request?
In current Join Hint strategies, if we use SHUFFLE_REPLICATE_NL hint, it will directly convert join to Cartesian Product Join and loss join condition making result not correct.
For Example:
```
spark-sql> select * from test4 order by a asc;
1 2
Time taken: 1.063 seconds, Fetched 4 row(s)20/07/08 14:11:25 INFO SparkSQLCLIDriver: Time taken: 1.063 seconds, Fetched 4 row(s)
spark-sql>select * from test5 order by a asc
1 2
2 2
Time taken: 1.18 seconds, Fetched 24 row(s)20/07/08 14:13:59 INFO SparkSQLCLIDriver: Time taken: 1.18 seconds, Fetched 24 row(s)spar
spark-sql>select /*+ shuffle_replicate_nl(test4) */ * from test4 join test5 where test4.a = test5.a order by test4.a asc ;
1 2 1 2
1 2 2 2
Time taken: 0.351 seconds, Fetched 2 row(s)
20/07/08 14:18:16 INFO SparkSQLCLIDriver: Time taken: 0.351 seconds, Fetched 2 row(s)
```
### Why are the changes needed?
Fix wrong data result
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
Added UT
Closes #29070 from AngersZhuuuu/SPARK-32220-branch-3.0.
Authored-by: angerszhu <an...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../spark/sql/execution/SparkStrategies.scala | 4 ++--
.../scala/org/apache/spark/sql/JoinHintSuite.scala | 27 ++++++++++++++++++++++
2 files changed, 29 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index a983bc8..e041c54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -244,7 +244,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// 4. Pick cartesian product if join type is inner like.
// 5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
// other choice.
- case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, hint) =>
+ case p @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, hint) =>
def createBroadcastHashJoin(buildLeft: Boolean, buildRight: Boolean) = {
val wantToBuildLeft = canBuildLeft(joinType) && buildLeft
val wantToBuildRight = canBuildRight(joinType) && buildRight
@@ -286,7 +286,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def createCartesianProduct() = {
if (joinType.isInstanceOf[InnerLike]) {
- Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
+ Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), p.condition)))
} else {
None
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
index f68c416..71f7a70 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
@@ -570,4 +570,31 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
assert(joinHints == expectedHints)
}
}
+
+ test("SPARK-32220: Non Cartesian Product Join Result Correct with SHUFFLE_REPLICATE_NL hint") {
+ withTempView("t1", "t2") {
+ Seq((1, "4"), (2, "2")).toDF("key", "value").createTempView("t1")
+ Seq((1, "1"), (2, "12.3"), (2, "123")).toDF("key", "value").createTempView("t2")
+ val df1 = sql("SELECT /*+ shuffle_replicate_nl(t1) */ * from t1 join t2 ON t1.key = t2.key")
+ val df2 = sql("SELECT * from t1 join t2 ON t1.key = t2.key")
+ assert(df1.collect().size == df2.collect().size)
+
+ val df3 = sql("SELECT /*+ shuffle_replicate_nl(t1) */ * from t1 join t2")
+ val df4 = sql("SELECT * from t1 join t2")
+ assert(df3.collect().size == df4.collect().size)
+
+ val df5 = sql("SELECT /*+ shuffle_replicate_nl(t1) */ * from t1 join t2 ON t1.key < t2.key")
+ val df6 = sql("SELECT * from t1 join t2 ON t1.key < t2.key")
+ assert(df5.collect().size == df6.collect().size)
+
+ val df7 = sql("SELECT /*+ shuffle_replicate_nl(t1) */ * from t1 join t2 ON t1.key < 2")
+ val df8 = sql("SELECT * from t1 join t2 ON t1.key < 2")
+ assert(df7.collect().size == df8.collect().size)
+
+
+ val df9 = sql("SELECT /*+ shuffle_replicate_nl(t1) */ * from t1 join t2 ON t2.key < 2")
+ val df10 = sql("SELECT * from t1 join t2 ON t2.key < 2")
+ assert(df9.collect().size == df10.collect().size)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org