You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2020/08/29 21:52:34 UTC

[spark] branch branch-3.0 updated: [SPARK-32693][SQL][3.0] Compare two dataframes with same schema except nullable property

This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0dff99b  [SPARK-32693][SQL][3.0] Compare two dataframes with same schema except nullable property
0dff99b is described below

commit 0dff99b172e0d85c1855fe7a027931bdaf9b3aa7
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Sun Aug 30 06:50:44 2020 +0900

    [SPARK-32693][SQL][3.0] Compare two dataframes with same schema except nullable property
    
    ### What changes were proposed in this pull request?
    
    This PR changes key data types check in `HashJoin` to use `sameType`. This backports #29555 to branch-3.0.
    
    ### Why are the changes needed?
    
    Looks at the resolving condition of `SetOperation`, it requires only each left data types should be `sameType` as the right ones. Logically the `EqualTo` expression in equi-join, also requires only left data type `sameType` as right data type. Then `HashJoin` requires left keys data type exactly the same as right keys data type, looks not reasonable.
    
    It makes inconsistent results when doing `except` between two dataframes.
    
    If two dataframes don't have nested fields, even their field nullable property different, `HashJoin` passes the key type check because it checks field individually so field nullable property is ignored.
    
    If two dataframes have nested fields like struct, `HashJoin` fails the key type check because now it compare two struct types and nullable property now affects.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Making consistent `except` operation between dataframes.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #29575 from viirya/SPARK-32693-3.0.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
 .../spark/sql/execution/joins/HashJoin.scala       |  7 ++--
 .../org/apache/spark/sql/DataFrameJoinSuite.scala  | 39 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 305741e..931a75a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -80,8 +80,11 @@ trait HashJoin {
   }
 
   protected lazy val (buildKeys, streamedKeys) = {
-    require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType),
-      "Join keys from two sides should have same types")
+    require(leftKeys.length == rightKeys.length &&
+      leftKeys.map(_.dataType)
+        .zip(rightKeys.map(_.dataType))
+        .forall(types => types._1.sameType(types._2)),
+      "Join keys from two sides should have same length and types")
     val lkeys = bindReferences(HashJoin.rewriteKeyExpr(leftKeys), left.output)
     val rkeys = bindReferences(HashJoin.rewriteKeyExpr(rightKeys), right.output)
     buildSide match {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 6b772e5..a49f95f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Filter, HintInfo, Join, JoinHint, LogicalPlan, Project}
@@ -29,6 +31,7 @@ import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
 
 class DataFrameJoinSuite extends QueryTest
   with SharedSparkSession
@@ -418,4 +421,40 @@ class DataFrameJoinSuite extends QueryTest
       }
     }
   }
+
+  test("SPARK-32693: Compare two dataframes with same schema except nullable property") {
+    val schema1 = StructType(
+      StructField("a", IntegerType, false) ::
+        StructField("b", IntegerType, false) ::
+        StructField("c", IntegerType, false) :: Nil)
+    val rowSeq1: List[Row] = List(Row(10, 1, 1), Row(10, 50, 2))
+    val df1 = spark.createDataFrame(rowSeq1.asJava, schema1)
+
+    val schema2 = StructType(
+      StructField("a", IntegerType) ::
+        StructField("b", IntegerType) ::
+        StructField("c", IntegerType) :: Nil)
+    val rowSeq2: List[Row] = List(Row(10, 1, 1))
+    val df2 = spark.createDataFrame(rowSeq2.asJava, schema2)
+
+    checkAnswer(df1.except(df2), Row(10, 50, 2))
+
+    val schema3 = StructType(
+      StructField("a", IntegerType, false) ::
+        StructField("b", IntegerType, false) ::
+        StructField("c", IntegerType, false) ::
+        StructField("d", schema1, false) :: Nil)
+    val rowSeq3: List[Row] = List(Row(10, 1, 1, Row(10, 1, 1)), Row(10, 50, 2, Row(10, 50, 2)))
+    val df3 = spark.createDataFrame(rowSeq3.asJava, schema3)
+
+    val schema4 = StructType(
+      StructField("a", IntegerType) ::
+        StructField("b", IntegerType) ::
+        StructField("b", IntegerType) ::
+        StructField("d", schema2) :: Nil)
+    val rowSeq4: List[Row] = List(Row(10, 1, 1, Row(10, 1, 1)))
+    val df4 = spark.createDataFrame(rowSeq4.asJava, schema4)
+
+    checkAnswer(df3.except(df4), Row(10, 50, 2, Row(10, 50, 2)))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org