You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "david bernuau (Jira)" <ji...@apache.org> on 2020/08/24 15:30:00 UTC

[jira] [Updated] (SPARK-32693) Compare two dataframes with same schema except nullable property

     [ https://issues.apache.org/jira/browse/SPARK-32693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

david bernuau updated SPARK-32693:
----------------------------------
    Description: 
My aim is to compare two dataframes with very close schemas : same number of fields, with the same names, types and metadata. The only difference comes from the fact that a given field might be nullable in one dataframe and not in the other.

Here is the code that i used :
{code:java}
val session = SparkSession.builder().getOrCreate()
 import org.apache.spark.sql.Row
 import java.sql.Timestamp
 import scala.collection.JavaConverters._
case class A(g: Timestamp, h: Option[Timestamp], i: Int)
 case class B(e: Int, f: Seq[A])
 case class C(g: Timestamp, h: Option[Timestamp], i: Option[Int])
 case class D(e: Option[Int], f: Seq[C])
val schema1 = StructType(
 Array(
 StructField("a", IntegerType, false), StructField("b", IntegerType, false), StructField("c", IntegerType, false)
 )
 )
 val rowSeq1: List[Row] = List(
 Row(10, 1, 1), Row(10, 50, 2)
 )
 val df1 = session.createDataFrame(rowSeq1.asJava, schema1)
 df1.printSchema()
val schema2 = StructType(
 Array(
 StructField("a", IntegerType), StructField("b", IntegerType), StructField("c", IntegerType)
 )
 )
 val rowSeq2: List[Row] = List(
 Row(10, 1, 1)
 )
 val df2 = session.createDataFrame(rowSeq2.asJava, schema2)
 df2.printSchema()
println(s"Number of records for first case : ${color}{df1.except(df2).count()}")
val schema3 = StructType(
 Array(
 StructField("a", IntegerType, false), StructField("b", IntegerType, false), StructField("c", IntegerType, false), StructField("d", ArrayType(StructType(Array(
 StructField("e", IntegerType, false), StructField("f", ArrayType(StructType(Array(
 StructField("g", TimestampType), StructField("h", TimestampType), StructField("i", IntegerType, false)
 ))))
 ))))
 )
 )
 val date1 = new Timestamp(1597589638L)
 val date2 = new Timestamp(1597599638L)
 val rowSeq3: List[Row] = List(
 Row(10, 1, 1, Seq(B(100, Seq(A(date1, None, 1))))), Row(10, 50, 2, Seq(B(101, Seq(A(date2, None, 2)))))
 )
 val df3 = session.createDataFrame(rowSeq3.asJava, schema3)
 df3.printSchema()
val schema4 = StructType(
 Array(
 StructField("a", IntegerType), StructField("b", IntegerType), StructField("b", IntegerType), StructField("d", ArrayType(StructType(Array(
 StructField("e", IntegerType), StructField("f", ArrayType(StructType(Array(
 StructField("g", TimestampType), StructField("h", TimestampType), StructField("i", IntegerType)
 ))))
 ))))
 )
 )
 val rowSeq4: List[Row] = List(
 Row(10, 1, 1, Seq(D(Some(100), Seq(C(date1, None, Some(1))))))
 )
 val df4 = session.createDataFrame(rowSeq4.asJava, schema3)
 df4.printSchema()
println(s"Number of records for second case : ${color}{df3.except(df4).count()}")
{code}
The preceeding code shows what seems to me a bug in Spark :
 * If you consider two dataframes (df1 and df2) having exactly the same schema, except fields are not nullable for the first dataframe and are nullable for the second. Then, doing df1.except(df2).count() works well.
 * Now, if you consider two other dataframes (df3 and df4) having the same schema (with fields nullable on one side and not on the other). If these two dataframes contain nested fields, then, this time, the action df3.except(df4).count gives the following exception : java.lang.IllegalArgumentException: requirement failed: Join keys from two sides should have same types

  was:
My aim is to compare two dataframes with very close schemas : same number of fields, with the same names, types and metadata. The only difference comes from the fact that a given field might be nullable in one dataframe and not in the other.

Here is the code that i used :

```

{color:#cc7832}val {color}session = SparkSession.builder().getOrCreate()
{color:#cc7832}import {color}org.apache.spark.sql.Row
{color:#cc7832}import {color}java.sql.Timestamp
{color:#cc7832}import {color}scala.collection.JavaConverters._

{color:#cc7832}case class {color}A(g: Timestamp{color:#cc7832},
{color} h: Option[Timestamp]{color:#cc7832},
{color} i: {color:#cc7832}Int{color})
{color:#cc7832}case class {color}B(e: {color:#cc7832}Int, {color}f: {color:#4e807d}Seq{color}[A])
{color:#cc7832}case class {color}C(g: Timestamp{color:#cc7832},
{color} h: Option[Timestamp]{color:#cc7832},
{color} i: Option[{color:#cc7832}Int{color}])
{color:#cc7832}case class {color}D(e: Option[{color:#cc7832}Int{color}]{color:#cc7832}, {color}f: {color:#4e807d}Seq{color}[C])

{color:#cc7832}val {color}schema1 = StructType(
 Array(
 StructField({color:#6a8759}"a"{color}{color:#cc7832}, {color}IntegerType{color:#cc7832}, false{color}){color:#cc7832},
{color} StructField({color:#6a8759}"b"{color}{color:#cc7832}, {color}IntegerType{color:#cc7832}, false{color}){color:#cc7832},
{color} StructField({color:#6a8759}"c"{color}{color:#cc7832}, {color}IntegerType{color:#cc7832}, false{color})
 )
)
{color:#cc7832}val {color}rowSeq1: {color:#4e807d}List{color}[Row] = {color:#9876aa}List{color}(
 Row({color:#6897bb}10{color}{color:#cc7832}, {color}{color:#6897bb}1{color}{color:#cc7832}, {color}{color:#6897bb}1{color}){color:#cc7832},
{color} Row({color:#6897bb}10{color}{color:#cc7832}, {color}{color:#6897bb}50{color}{color:#cc7832}, {color}{color:#6897bb}2{color})
)
{color:#cc7832}val {color}df1 = session.createDataFrame(rowSeq1.asJava{color:#cc7832}, {color}schema1)
df1.printSchema()

{color:#cc7832}val {color}schema2 = StructType(
 Array(
 StructField({color:#6a8759}"a"{color}{color:#cc7832}, {color}IntegerType){color:#cc7832},
{color} StructField({color:#6a8759}"b"{color}{color:#cc7832}, {color}IntegerType){color:#cc7832},
{color} StructField({color:#6a8759}"c"{color}{color:#cc7832}, {color}IntegerType)
 )
)
{color:#cc7832}val {color}rowSeq2: {color:#4e807d}List{color}[Row] = {color:#9876aa}List{color}(
 Row({color:#6897bb}10{color}{color:#cc7832}, {color}{color:#6897bb}1{color}{color:#cc7832}, {color}{color:#6897bb}1{color})
)
{color:#cc7832}val {color}df2 = session.createDataFrame(rowSeq2.asJava{color:#cc7832}, {color}schema2)
df2.printSchema()

println({color:#6a8759}s"Number of records for first case : {color}{color:#00b8bb}${color}{df1.except(df2).count()}{color:#6a8759}"{color})

{color:#cc7832}val {color}schema3 = StructType(
 Array(
 StructField({color:#6a8759}"a"{color}{color:#cc7832}, {color}IntegerType{color:#cc7832}, false{color}){color:#cc7832},
{color} StructField({color:#6a8759}"b"{color}{color:#cc7832}, {color}IntegerType{color:#cc7832}, false{color}){color:#cc7832},
{color} StructField({color:#6a8759}"c"{color}{color:#cc7832}, {color}IntegerType{color:#cc7832}, false{color}){color:#cc7832},
{color} StructField({color:#6a8759}"d"{color}{color:#cc7832}, {color}ArrayType(StructType(Array(
 StructField({color:#6a8759}"e"{color}{color:#cc7832}, {color}IntegerType{color:#cc7832}, false{color}){color:#cc7832},
{color} StructField({color:#6a8759}"f"{color}{color:#cc7832}, {color}ArrayType(StructType(Array(
 StructField({color:#6a8759}"g"{color}{color:#cc7832}, {color}TimestampType){color:#cc7832},
{color} StructField({color:#6a8759}"h"{color}{color:#cc7832}, {color}TimestampType){color:#cc7832},
{color} StructField({color:#6a8759}"i"{color}{color:#cc7832}, {color}IntegerType{color:#cc7832}, false{color})
 ))))
 ))))
 )
)
{color:#cc7832}val {color}date1 = {color:#cc7832}new {color}Timestamp({color:#6897bb}1597589638L{color})
{color:#cc7832}val {color}date2 = {color:#cc7832}new {color}Timestamp({color:#6897bb}1597599638L{color})
{color:#cc7832}val {color}rowSeq3: {color:#4e807d}List{color}[Row] = {color:#9876aa}List{color}(
 Row({color:#6897bb}10{color}{color:#cc7832}, {color}{color:#6897bb}1{color}{color:#cc7832}, {color}{color:#6897bb}1{color}{color:#cc7832},
{color} {color:#9876aa}Seq{color}(B({color:#6897bb}100{color}{color:#cc7832}, {color}{color:#9876aa}Seq{color}(A(date1{color:#cc7832}, {color}None{color:#cc7832}, {color}{color:#6897bb}1{color}))))){color:#cc7832},
{color} Row({color:#6897bb}10{color}{color:#cc7832}, {color}{color:#6897bb}50{color}{color:#cc7832}, {color}{color:#6897bb}2{color}{color:#cc7832},
{color} {color:#9876aa}Seq{color}(B({color:#6897bb}101{color}{color:#cc7832}, {color}{color:#9876aa}Seq{color}(A(date2{color:#cc7832}, {color}None{color:#cc7832}, {color}{color:#6897bb}2{color})))))
)
{color:#cc7832}val {color}df3 = session.createDataFrame(rowSeq3.asJava{color:#cc7832}, {color}schema3)
df3.printSchema()

{color:#cc7832}val {color}schema4 = StructType(
 Array(
 StructField({color:#6a8759}"a"{color}{color:#cc7832}, {color}IntegerType){color:#cc7832},
{color} StructField({color:#6a8759}"b"{color}{color:#cc7832}, {color}IntegerType){color:#cc7832},
{color} StructField({color:#6a8759}"b"{color}{color:#cc7832}, {color}IntegerType){color:#cc7832},
{color} StructField({color:#6a8759}"d"{color}{color:#cc7832}, {color}ArrayType(StructType(Array(
 StructField({color:#6a8759}"e"{color}{color:#cc7832}, {color}IntegerType){color:#cc7832},
{color} StructField({color:#6a8759}"f"{color}{color:#cc7832}, {color}ArrayType(StructType(Array(
 StructField({color:#6a8759}"g"{color}{color:#cc7832}, {color}TimestampType){color:#cc7832},
{color} StructField({color:#6a8759}"h"{color}{color:#cc7832}, {color}TimestampType){color:#cc7832},
{color} StructField({color:#6a8759}"i"{color}{color:#cc7832}, {color}IntegerType)
 ))))
 ))))
 )
)
{color:#cc7832}val {color}rowSeq4: {color:#4e807d}List{color}[Row] = {color:#9876aa}List{color}(
 Row({color:#6897bb}10{color}{color:#cc7832}, {color}{color:#6897bb}1{color}{color:#cc7832}, {color}{color:#6897bb}1{color}{color:#cc7832},
{color} {color:#9876aa}Seq{color}(D(Some({color:#6897bb}100{color}){color:#cc7832}, {color}{color:#9876aa}Seq{color}(C(date1{color:#cc7832}, {color}None{color:#cc7832}, {color}Some({color:#6897bb}1{color}))))))
)
{color:#cc7832}val {color}df4 = session.createDataFrame(rowSeq4.asJava{color:#cc7832}, {color}schema3)
df4.printSchema()

println({color:#6a8759}s"Number of records for second case : {color}{color:#00b8bb}${color}{df3.except(df4).count()}{color:#6a8759}"{color})

```

The preceeding code shows what seems to me a bug in Spark :
 * If you consider two dataframes (df1 and df2) having exactly the same schema, except fields are not nullable for the first dataframe and are nullable for the second. Then, doing df1.except(df2).count() works well.
 * Now, if you consider two other dataframes (df3 and df4) having the same schema (with fields nullable on one side and not on the other). If these two dataframes contain nested fields, then, this time, the action df3.except(df4).count gives the following exception : java.lang.IllegalArgumentException: requirement failed: Join keys from two sides should have same types


> Compare two dataframes with same schema except nullable property
> ----------------------------------------------------------------
>
>                 Key: SPARK-32693
>                 URL: https://issues.apache.org/jira/browse/SPARK-32693
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.4
>            Reporter: david bernuau
>            Priority: Minor
>
> My aim is to compare two dataframes with very close schemas : same number of fields, with the same names, types and metadata. The only difference comes from the fact that a given field might be nullable in one dataframe and not in the other.
> Here is the code that i used :
> {code:java}
> val session = SparkSession.builder().getOrCreate()
>  import org.apache.spark.sql.Row
>  import java.sql.Timestamp
>  import scala.collection.JavaConverters._
> case class A(g: Timestamp, h: Option[Timestamp], i: Int)
>  case class B(e: Int, f: Seq[A])
>  case class C(g: Timestamp, h: Option[Timestamp], i: Option[Int])
>  case class D(e: Option[Int], f: Seq[C])
> val schema1 = StructType(
>  Array(
>  StructField("a", IntegerType, false), StructField("b", IntegerType, false), StructField("c", IntegerType, false)
>  )
>  )
>  val rowSeq1: List[Row] = List(
>  Row(10, 1, 1), Row(10, 50, 2)
>  )
>  val df1 = session.createDataFrame(rowSeq1.asJava, schema1)
>  df1.printSchema()
> val schema2 = StructType(
>  Array(
>  StructField("a", IntegerType), StructField("b", IntegerType), StructField("c", IntegerType)
>  )
>  )
>  val rowSeq2: List[Row] = List(
>  Row(10, 1, 1)
>  )
>  val df2 = session.createDataFrame(rowSeq2.asJava, schema2)
>  df2.printSchema()
> println(s"Number of records for first case : ${color}{df1.except(df2).count()}")
> val schema3 = StructType(
>  Array(
>  StructField("a", IntegerType, false), StructField("b", IntegerType, false), StructField("c", IntegerType, false), StructField("d", ArrayType(StructType(Array(
>  StructField("e", IntegerType, false), StructField("f", ArrayType(StructType(Array(
>  StructField("g", TimestampType), StructField("h", TimestampType), StructField("i", IntegerType, false)
>  ))))
>  ))))
>  )
>  )
>  val date1 = new Timestamp(1597589638L)
>  val date2 = new Timestamp(1597599638L)
>  val rowSeq3: List[Row] = List(
>  Row(10, 1, 1, Seq(B(100, Seq(A(date1, None, 1))))), Row(10, 50, 2, Seq(B(101, Seq(A(date2, None, 2)))))
>  )
>  val df3 = session.createDataFrame(rowSeq3.asJava, schema3)
>  df3.printSchema()
> val schema4 = StructType(
>  Array(
>  StructField("a", IntegerType), StructField("b", IntegerType), StructField("b", IntegerType), StructField("d", ArrayType(StructType(Array(
>  StructField("e", IntegerType), StructField("f", ArrayType(StructType(Array(
>  StructField("g", TimestampType), StructField("h", TimestampType), StructField("i", IntegerType)
>  ))))
>  ))))
>  )
>  )
>  val rowSeq4: List[Row] = List(
>  Row(10, 1, 1, Seq(D(Some(100), Seq(C(date1, None, Some(1))))))
>  )
>  val df4 = session.createDataFrame(rowSeq4.asJava, schema3)
>  df4.printSchema()
> println(s"Number of records for second case : ${color}{df3.except(df4).count()}")
> {code}
> The preceeding code shows what seems to me a bug in Spark :
>  * If you consider two dataframes (df1 and df2) having exactly the same schema, except fields are not nullable for the first dataframe and are nullable for the second. Then, doing df1.except(df2).count() works well.
>  * Now, if you consider two other dataframes (df3 and df4) having the same schema (with fields nullable on one side and not on the other). If these two dataframes contain nested fields, then, this time, the action df3.except(df4).count gives the following exception : java.lang.IllegalArgumentException: requirement failed: Join keys from two sides should have same types



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org