You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jerry Lam (JIRA)" <ji...@apache.org> on 2016/03/31 23:07:25 UTC
[jira] [Created] (SPARK-14309) Dataframe returns wrong results due
to parsing incorrectly
Jerry Lam created SPARK-14309:
---------------------------------
Summary: Dataframe returns wrong results due to parsing incorrectly
Key: SPARK-14309
URL: https://issues.apache.org/jira/browse/SPARK-14309
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 1.5.1
Reporter: Jerry Lam
I observed the below behavior using dataframe. The expected answer should be 60 but there is no way to get the value unless to turn dataframe into rdd and access it in the Row.
I have include the SQL statement and it returns the correct result because I believe, it is using Hive parser.
{code}
val base = sc.parallelize(( 0 to 49).zip( 0 to 49) ++ (30 to 79).zip(50 to 99)).toDF("id", "label")
val d1 = base.where($"label" < 60).as("d1")
val d2 = base.where($"label" === 60).as("d2")
d1.join(d2, "id").show
+---+-----+-----+
| id|label|label|
+---+-----+-----+
| 40| 40| 60|
+---+-----+-----+
d1.join(d2, "id").select(d1("label")).show
+-----+
|label|
+-----+
| 40|
+-----+
(expected answer: 40, right!)
d1.join(d2, "id").map{row => row.getAs[Int](2)}
d1.join(d2, "id").select(d2("label")).show
+-----+
|label|
+-----+
| 40|
+-----+
(expected answer: 60, wrong!)
d1.join(d2, "id").select(d2("label")).explain(true)
scala> d1.join(d2, "id").select(d2("label")).explain(true)
== Parsed Logical Plan ==
Project [label#3]
Project [id#2,label#3,label#7]
Join Inner, Some((id#2 = id#6))
Subquery d1
Filter (label#3 < 60)
Project [_1#0 AS id#2,_2#1 AS label#3]
LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21
Subquery d2
Filter (label#7 = 60)
Project [_1#0 AS id#6,_2#1 AS label#7]
LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21
== Analyzed Logical Plan ==
label: int
Project [label#3]
Project [id#2,label#3,label#7]
Join Inner, Some((id#2 = id#6))
Subquery d1
Filter (label#3 < 60)
Project [_1#0 AS id#2,_2#1 AS label#3]
LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21
Subquery d2
Filter (label#7 = 60)
Project [_1#0 AS id#6,_2#1 AS label#7]
LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21
== Optimized Logical Plan ==
Project [label#3]
Join Inner, Some((id#2 = id#6))
Project [_1#0 AS id#2,_2#1 AS label#3]
Filter (_2#1 < 60)
LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21
Project [_1#0 AS id#6]
Filter (_2#1 = 60)
LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21
== Physical Plan ==
TungstenProject [label#3]
SortMergeJoin [id#2], [id#6]
TungstenSort [id#2 ASC], false, 0
TungstenExchange hashpartitioning(id#2)
TungstenProject [_1#0 AS id#2,_2#1 AS label#3]
Filter (_2#1 < 60)
Scan PhysicalRDD[_1#0,_2#1]
TungstenSort [id#6 ASC], false, 0
TungstenExchange hashpartitioning(id#6)
TungstenProject [_1#0 AS id#6]
Filter (_2#1 = 60)
Scan PhysicalRDD[_1#0,_2#1]
def (d1 :DataFrame, d2: DataFrame)
base.registerTempTable("base")
sqlContext.sql("select d2.label from (select * from base where label < 60) as d1 inner join (select * from base where label = 60) as d2 on d1.id = d2.id").explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias('d2.label)]
'Join Inner, Some(('d1.id = 'd2.id))
'Subquery d1
'Project [unresolvedalias(*)]
'Filter ('label < 60)
'UnresolvedRelation [base], None
'Subquery d2
'Project [unresolvedalias(*)]
'Filter ('label = 60)
'UnresolvedRelation [base], None
== Analyzed Logical Plan ==
label: int
Project [label#15]
Join Inner, Some((id#2 = id#14))
Subquery d1
Project [id#2,label#3]
Filter (label#3 < 60)
Subquery base
Project [_1#0 AS id#2,_2#1 AS label#3]
LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21
Subquery d2
Project [id#14,label#15]
Filter (label#15 = 60)
Subquery base
Project [_1#0 AS id#14,_2#1 AS label#15]
LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21
== Optimized Logical Plan ==
Project [label#15]
Join Inner, Some((id#2 = id#14))
Project [_1#0 AS id#2]
Filter (_2#1 < 60)
LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21
Project [_1#0 AS id#14,_2#1 AS label#15]
Filter (_2#1 = 60)
LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21
== Physical Plan ==
TungstenProject [label#15]
SortMergeJoin [id#2], [id#14]
TungstenSort [id#2 ASC], false, 0
TungstenExchange hashpartitioning(id#2)
TungstenProject [_1#0 AS id#2]
Filter (_2#1 < 60)
Scan PhysicalRDD[_1#0,_2#1]
TungstenSort [id#14 ASC], false, 0
TungstenExchange hashpartitioning(id#14)
TungstenProject [_1#0 AS id#14,_2#1 AS label#15]
Filter (_2#1 = 60)
Scan PhysicalRDD[_1#0,_2#1]
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org