You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/07/11 13:26:48 UTC

[spark] branch branch-3.0 updated: [SPARK-32220][SQL][3.0] SHUFFLE_REPLICATE_NL Hint should not change Non-Cartesian Product join result

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 10568ad  [SPARK-32220][SQL][3.0] SHUFFLE_REPLICATE_NL Hint should not change Non-Cartesian Product join result
10568ad is described below

commit 10568ad13a437edd4a2aca36358ff717b15e1104
Author: angerszhu <an...@gmail.com>
AuthorDate: Sat Jul 11 06:24:27 2020 -0700

    [SPARK-32220][SQL][3.0] SHUFFLE_REPLICATE_NL Hint should not change Non-Cartesian Product join result
    
    ### What changes were proposed in this pull request?
    In current Join Hint strategies, if we use SHUFFLE_REPLICATE_NL hint, it will directly convert join to Cartesian Product Join and loss join condition making result not correct.
    
    For Example:
    ```
    spark-sql> select * from test4 order by a asc;
    1 2
    Time taken: 1.063 seconds, Fetched 4 row(s)20/07/08 14:11:25 INFO SparkSQLCLIDriver: Time taken: 1.063 seconds, Fetched 4 row(s)
    spark-sql>select * from test5 order by a asc
    1 2
    2 2
    Time taken: 1.18 seconds, Fetched 24 row(s)20/07/08 14:13:59 INFO SparkSQLCLIDriver: Time taken: 1.18 seconds, Fetched 24 row(s)spar
    spark-sql>select /*+ shuffle_replicate_nl(test4) */ * from test4 join test5 where test4.a = test5.a order by test4.a asc ;
    1 2 1 2
    1 2 2 2
    Time taken: 0.351 seconds, Fetched 2 row(s)
    20/07/08 14:18:16 INFO SparkSQLCLIDriver: Time taken: 0.351 seconds, Fetched 2 row(s)
    ```
    
    ### Why are the changes needed?
    Fix wrong data result
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    Added UT
    
    Closes #29070 from AngersZhuuuu/SPARK-32220-branch-3.0.
    
    Authored-by: angerszhu <an...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/sql/execution/SparkStrategies.scala      |  4 ++--
 .../scala/org/apache/spark/sql/JoinHintSuite.scala | 27 ++++++++++++++++++++++
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index a983bc8..e041c54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -244,7 +244,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       //   4. Pick cartesian product if join type is inner like.
       //   5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
       //      other choice.
-      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, hint) =>
+      case p @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, hint) =>
         def createBroadcastHashJoin(buildLeft: Boolean, buildRight: Boolean) = {
           val wantToBuildLeft = canBuildLeft(joinType) && buildLeft
           val wantToBuildRight = canBuildRight(joinType) && buildRight
@@ -286,7 +286,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
         def createCartesianProduct() = {
           if (joinType.isInstanceOf[InnerLike]) {
-            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
+            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), p.condition)))
           } else {
             None
           }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
index f68c416..71f7a70 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
@@ -570,4 +570,31 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
       assert(joinHints == expectedHints)
     }
   }
+
+  test("SPARK-32220: Non Cartesian Product Join Result Correct with SHUFFLE_REPLICATE_NL hint") {
+    withTempView("t1", "t2") {
+      Seq((1, "4"), (2, "2")).toDF("key", "value").createTempView("t1")
+      Seq((1, "1"), (2, "12.3"), (2, "123")).toDF("key", "value").createTempView("t2")
+      val df1 = sql("SELECT /*+ shuffle_replicate_nl(t1) */ * from t1 join t2 ON t1.key = t2.key")
+      val df2 = sql("SELECT * from t1 join t2 ON t1.key = t2.key")
+      assert(df1.collect().size == df2.collect().size)
+
+      val df3 = sql("SELECT /*+ shuffle_replicate_nl(t1) */ * from t1 join t2")
+      val df4 = sql("SELECT * from t1 join t2")
+      assert(df3.collect().size == df4.collect().size)
+
+      val df5 = sql("SELECT /*+ shuffle_replicate_nl(t1) */ * from t1 join t2 ON t1.key < t2.key")
+      val df6 = sql("SELECT * from t1 join t2 ON t1.key < t2.key")
+      assert(df5.collect().size == df6.collect().size)
+
+      val df7 = sql("SELECT /*+ shuffle_replicate_nl(t1) */ * from t1 join t2 ON t1.key < 2")
+      val df8 = sql("SELECT * from t1 join t2 ON t1.key < 2")
+      assert(df7.collect().size == df8.collect().size)
+
+
+      val df9 = sql("SELECT /*+ shuffle_replicate_nl(t1) */ * from t1 join t2 ON t2.key < 2")
+      val df10 = sql("SELECT * from t1 join t2 ON t2.key < 2")
+      assert(df9.collect().size == df10.collect().size)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org