You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Everett Anderson (JIRA)" <ji...@apache.org> on 2017/03/23 17:13:41 UTC

[jira] [Created] (SPARK-20073) Unexpected Cartesian product when using eqNullSafe in join with a derived table

Everett Anderson created SPARK-20073:
----------------------------------------

             Summary: Unexpected Cartesian product when using eqNullSafe in join with a derived table
                 Key: SPARK-20073
                 URL: https://issues.apache.org/jira/browse/SPARK-20073
             Project: Spark
          Issue Type: Bug
          Components: Optimizer
    Affects Versions: 2.1.0, 2.0.2
            Reporter: Everett Anderson


It appears that if you try to join tables A and B when B is derived from A and you use the eqNullSafe / <=> operator for the join condition, Spark performs a Cartesian product.

However, if you perform the join on tables of the same data when they don't have a relationship, the expected non-Cartesian product join occurs.

{noformat}
// Create some fake data.

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions

val peopleRowsRDD = sc.parallelize(Seq(
    Row("Fred", 8, 1),
    Row("Fred", 8, 2),
    Row(null, 10, 3),
    Row(null, 10, 4),
    Row("Amy", 12, 5),
    Row("Amy", 12, 6)))
    
val peopleSchema = StructType(Seq(
    StructField("name", StringType, nullable = true),
    StructField("group", IntegerType, nullable = true),
    StructField("data", IntegerType, nullable = true)))
    
val people = spark.createDataFrame(peopleRowsRDD, peopleSchema)

people.createOrReplaceTempView("people")

scala> people.show
+----+-----+----+
|name|group|data|
+----+-----+----+
|Fred|    8|   1|
|Fred|    8|   2|
|null|   10|   3|
|null|   10|   4|
| Amy|   12|   5|
| Amy|   12|   6|
+----+-----+----+

// Now create a derived table from that table. It doesn't matter much what.
val variantCounts = spark.sql("select name, count(distinct(name, group, data)) as variant_count from people group by name having variant_count > 1")

variantCounts.show
+----+-------------+                                                            
|name|variant_count|
+----+-------------+
|Fred|            2|
|null|            2|
| Amy|            2|
+----+-------------+

// Now try an inner join using the regular equalTo that drops nulls. This works fine.

val innerJoinEqualTo = variantCounts.join(people, variantCounts("name").equalTo(people("name")))
innerJoinEqualTo.show

+----+-------------+----+-----+----+                                            
|name|variant_count|name|group|data|
+----+-------------+----+-----+----+
|Fred|            2|Fred|    8|   1|
|Fred|            2|Fred|    8|   2|
| Amy|            2| Amy|   12|   5|
| Amy|            2| Amy|   12|   6|
+----+-------------+----+-----+----+

// Okay now lets switch to the <=> operator
//
// If you haven't set spark.sql.crossJoin.enabled=true, you'll get an error like
// "Cartesian joins could be prohibitively expensive and are disabled by default. To explicitly enable them, please set spark.sql.crossJoin.enabled = true;"
//
// if you have enabled them, you'll get the table below.
//
// However, we really don't want or expect a Cartesian product!

val innerJoinSqlNullSafeEqOp = variantCounts.join(people, variantCounts("name")<=>(people("name")))
innerJoinSqlNullSafeEqOp.show

+----+-------------+----+-----+----+                                            
|name|variant_count|name|group|data|
+----+-------------+----+-----+----+
|Fred|            2|Fred|    8|   1|
|Fred|            2|Fred|    8|   2|
|Fred|            2|null|   10|   3|
|Fred|            2|null|   10|   4|
|Fred|            2| Amy|   12|   5|
|Fred|            2| Amy|   12|   6|
|null|            2|Fred|    8|   1|
|null|            2|Fred|    8|   2|
|null|            2|null|   10|   3|
|null|            2|null|   10|   4|
|null|            2| Amy|   12|   5|
|null|            2| Amy|   12|   6|
| Amy|            2|Fred|    8|   1|
| Amy|            2|Fred|    8|   2|
| Amy|            2|null|   10|   3|
| Amy|            2|null|   10|   4|
| Amy|            2| Amy|   12|   5|
| Amy|            2| Amy|   12|   6|
+----+-------------+----+-----+----+

// Okay, let's try to construct the exact same variantCount table manually
// so it has no relationship to the original.

val variantCountRowsRDD = sc.parallelize(Seq(
    Row("Fred", 2),
    Row(null, 2),
    Row("Amy", 2)))
    
val variantCountSchema = StructType(Seq(
    StructField("name", StringType, nullable = true),
    StructField("variant_count", IntegerType, nullable = true)))
    
val manualVariantCounts = spark.createDataFrame(variantCountRowsRDD, variantCountSchema)

// Now perform the same join with the null-safe equals operator.
val manualVarCountsInnerJoinSqlNullSafeEqOp = manualVariantCounts.join(people, manualVariantCounts("name")<=>(people("name")))
manualVarCountsInnerJoinSqlNullSafeEqOp.show

+----+-------------+----+-----+----+
|name|variant_count|name|group|data|
+----+-------------+----+-----+----+
|Fred|            2|Fred|    8|   1|
|Fred|            2|Fred|    8|   2|
| Amy|            2| Amy|   12|   5|
| Amy|            2| Amy|   12|   6|
|null|            2|null|   10|   3|
|null|            2|null|   10|   4|
+----+-------------+----+-----+----+

{noformat}






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org