You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Everett Anderson (JIRA)" <ji...@apache.org> on 2017/03/23 17:16:41 UTC

[jira] [Commented] (SPARK-20073) Unexpected Cartesian product when using eqNullSafe in join with a derived table

    [ https://issues.apache.org/jira/browse/SPARK-20073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938810#comment-15938810 ] 

Everett Anderson commented on SPARK-20073:
------------------------------------------

With the local master in spark-shell and cross joins enabled, here are the query plans for the cases above:

{noformat}
innerJoinSqlNullSafeEqOp.explain
== Physical Plan ==
CartesianProduct
:- *Filter (variant_count#11L > 1)
:  +- *HashAggregate(keys=[name#3], functions=[count(distinct named_struct(name, name#3, group, group#4, data, data#5)#172)])
:     +- Exchange hashpartitioning(name#3, 200)
:        +- *HashAggregate(keys=[name#3], functions=[partial_count(distinct named_struct(name, name#3, group, group#4, data, data#5)#172)])
:           +- *HashAggregate(keys=[name#3, named_struct(name, name#3, group, group#4, data, data#5)#172], functions=[])
:              +- Exchange hashpartitioning(name#3, named_struct(name, name#3, group, group#4, data, data#5)#172, 200)
:                 +- *HashAggregate(keys=[name#3, named_struct(name, name#3, group, group#4, data, data#5) AS named_struct(name, name#3, group, group#4, data, data#5)#172], functions=[])
:                    +- Scan ExistingRDD[name#3,group#4,data#5]
+- Scan ExistingRDD[name#103,group#104,data#105]
{noformat}

vs

{noformat}
manualVarCountsInnerJoinSqlNullSafeEqOp.explain
== Physical Plan ==
*SortMergeJoin [coalesce(name#139, )], [coalesce(name#3, )], Inner, (name#139 <=> name#3)
:- *Sort [coalesce(name#139, ) ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(coalesce(name#139, ), 200)
:     +- Scan ExistingRDD[name#139,variant_count#140]
+- *Sort [coalesce(name#3, ) ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(coalesce(name#3, ), 200)
      +- Scan ExistingRDD[name#3,group#4,data#5]
{noformat}



> Unexpected Cartesian product when using eqNullSafe in join with a derived table
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-20073
>                 URL: https://issues.apache.org/jira/browse/SPARK-20073
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.0.2, 2.1.0
>            Reporter: Everett Anderson
>
> It appears that if you try to join tables A and B when B is derived from A and you use the eqNullSafe / <=> operator for the join condition, Spark performs a Cartesian product.
> However, if you perform the join on tables of the same data when they don't have a relationship, the expected non-Cartesian product join occurs.
> {noformat}
> // Create some fake data.
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Dataset
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions
> val peopleRowsRDD = sc.parallelize(Seq(
>     Row("Fred", 8, 1),
>     Row("Fred", 8, 2),
>     Row(null, 10, 3),
>     Row(null, 10, 4),
>     Row("Amy", 12, 5),
>     Row("Amy", 12, 6)))
>     
> val peopleSchema = StructType(Seq(
>     StructField("name", StringType, nullable = true),
>     StructField("group", IntegerType, nullable = true),
>     StructField("data", IntegerType, nullable = true)))
>     
> val people = spark.createDataFrame(peopleRowsRDD, peopleSchema)
> people.createOrReplaceTempView("people")
> scala> people.show
> +----+-----+----+
> |name|group|data|
> +----+-----+----+
> |Fred|    8|   1|
> |Fred|    8|   2|
> |null|   10|   3|
> |null|   10|   4|
> | Amy|   12|   5|
> | Amy|   12|   6|
> +----+-----+----+
> // Now create a derived table from that table. It doesn't matter much what.
> val variantCounts = spark.sql("select name, count(distinct(name, group, data)) as variant_count from people group by name having variant_count > 1")
> variantCounts.show
> +----+-------------+                                                            
> |name|variant_count|
> +----+-------------+
> |Fred|            2|
> |null|            2|
> | Amy|            2|
> +----+-------------+
> // Now try an inner join using the regular equalTo that drops nulls. This works fine.
> val innerJoinEqualTo = variantCounts.join(people, variantCounts("name").equalTo(people("name")))
> innerJoinEqualTo.show
> +----+-------------+----+-----+----+                                            
> |name|variant_count|name|group|data|
> +----+-------------+----+-----+----+
> |Fred|            2|Fred|    8|   1|
> |Fred|            2|Fred|    8|   2|
> | Amy|            2| Amy|   12|   5|
> | Amy|            2| Amy|   12|   6|
> +----+-------------+----+-----+----+
> // Okay now lets switch to the <=> operator
> //
> // If you haven't set spark.sql.crossJoin.enabled=true, you'll get an error like
> // "Cartesian joins could be prohibitively expensive and are disabled by default. To explicitly enable them, please set spark.sql.crossJoin.enabled = true;"
> //
> // if you have enabled them, you'll get the table below.
> //
> // However, we really don't want or expect a Cartesian product!
> val innerJoinSqlNullSafeEqOp = variantCounts.join(people, variantCounts("name")<=>(people("name")))
> innerJoinSqlNullSafeEqOp.show
> +----+-------------+----+-----+----+                                            
> |name|variant_count|name|group|data|
> +----+-------------+----+-----+----+
> |Fred|            2|Fred|    8|   1|
> |Fred|            2|Fred|    8|   2|
> |Fred|            2|null|   10|   3|
> |Fred|            2|null|   10|   4|
> |Fred|            2| Amy|   12|   5|
> |Fred|            2| Amy|   12|   6|
> |null|            2|Fred|    8|   1|
> |null|            2|Fred|    8|   2|
> |null|            2|null|   10|   3|
> |null|            2|null|   10|   4|
> |null|            2| Amy|   12|   5|
> |null|            2| Amy|   12|   6|
> | Amy|            2|Fred|    8|   1|
> | Amy|            2|Fred|    8|   2|
> | Amy|            2|null|   10|   3|
> | Amy|            2|null|   10|   4|
> | Amy|            2| Amy|   12|   5|
> | Amy|            2| Amy|   12|   6|
> +----+-------------+----+-----+----+
> // Okay, let's try to construct the exact same variantCount table manually
> // so it has no relationship to the original.
> val variantCountRowsRDD = sc.parallelize(Seq(
>     Row("Fred", 2),
>     Row(null, 2),
>     Row("Amy", 2)))
>     
> val variantCountSchema = StructType(Seq(
>     StructField("name", StringType, nullable = true),
>     StructField("variant_count", IntegerType, nullable = true)))
>     
> val manualVariantCounts = spark.createDataFrame(variantCountRowsRDD, variantCountSchema)
> // Now perform the same join with the null-safe equals operator.
> val manualVarCountsInnerJoinSqlNullSafeEqOp = manualVariantCounts.join(people, manualVariantCounts("name")<=>(people("name")))
> manualVarCountsInnerJoinSqlNullSafeEqOp.show
> +----+-------------+----+-----+----+
> |name|variant_count|name|group|data|
> +----+-------------+----+-----+----+
> |Fred|            2|Fred|    8|   1|
> |Fred|            2|Fred|    8|   2|
> | Amy|            2| Amy|   12|   5|
> | Amy|            2| Amy|   12|   6|
> |null|            2|null|   10|   3|
> |null|            2|null|   10|   4|
> +----+-------------+----+-----+----+
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org