You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuming Wang (JIRA)" <ji...@apache.org> on 2018/12/04 09:53:00 UTC

[jira] [Commented] (SPARK-26231) Dataframes inner join on double datatype columns resulting in Cartesian product

    [ https://issues.apache.org/jira/browse/SPARK-26231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708486#comment-16708486 ] 

Yuming Wang commented on SPARK-26231:
-------------------------------------

The reason is that resolve duplicate expression has some issue. I will open a pull request shortly.

> Dataframes inner join on double datatype columns resulting in Cartesian product
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-26231
>                 URL: https://issues.apache.org/jira/browse/SPARK-26231
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.0, 1.6.1
>            Reporter: Shrikant
>            Priority: Major
>
> Following code snippet explains the bug. The join on the Double columns results in catersian , when both columns typecasted to String it works.
> please see the explain plan belolw
> Error: scala> cartesianJoinErr.explain()
> == Physical Plan ==
> CartesianProduct
> :- ConvertToSafe
> :  +- Project [name#143,group#144,data#145,name#143 AS name1#146]
> :     +- Filter (name#143 = name#143)
> :        +- Scan ExistingRDD[name#143,group#144,data#145]
> +- Scan ExistingRDD[name#147,group#148,data#149]
> -----------------------------------------------------------
> After conversion to String explain plan
> stringColJoinWorks.explain()
> == Physical Plan ==
> SortMergeJoin [name1String#151], [name2String#152]
> :- Sort [name1String#151 ASC], false, 0
> :  +- TungstenExchange hashpartitioning(name1String#151,200), None
> :     +- Project [name#143,group#144,data#145,cast(name#143 as string) AS name1String#151]
> :        +- Scan ExistingRDD[name#143,group#144,data#145]
> +- Sort [name2String#152 ASC], false, 0
>    +- TungstenExchange hashpartitioning(name2String#152,200), None
>       +- Project [name#153,group#154,data#155,cast(name#153 as string) AS name2String#152]
>          +- Scan ExistingRDD[name#153,group#154,data#155]
>  
>  
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Dataset
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions
> val doubleRDD = sc.parallelize(Seq(
>     Row(11111.0, 2, 1),
>     Row(22222.0, 8, 2),
>     Row(33333.0, 10, 3),
>     Row(44444.0, 10, 4)))
>     
> val testSchema = StructType(Seq(
>     StructField("name", DoubleType, nullable = true),
>     StructField("group", IntegerType, nullable = true),
>     StructField("data", IntegerType, nullable = true)))
>     
> val doubleRDDCartesian = sqlContext.createDataFrame(doubleRDD, testSchema)
> val cartNewCol = doubleRDDCartesian.select($"name" , $"group", $"data")
> val newColName1DF = cartNewCol.withColumn("name1", $"name")
> val cartesianJoinErr = newColName1DF.join(doubleRDDCartesian, newColName1DF("name1")===(doubleRDDCartesian("name")))
> cartesianJoinErr.show
> cartesianJoinErr.explain()
> //Convert both into StringType
> val stringColDF1 = doubleRDDCartesian.withColumn("name1String",$"name".cast("String"))
> val stringColDF2 = cartNewCol.withColumn("name2String", $"name".cast("String"))
> val stringColJoinWorks = stringColDF1.join(stringColDF2, stringColDF1("name1String")===(stringColDF2("name2String")))
> stringColJoinWorks.show
> stringColJoinWorks.explain()
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org