You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jerry Lam (JIRA)" <ji...@apache.org> on 2016/03/31 23:07:25 UTC

[jira] [Created] (SPARK-14309) Dataframe returns wrong results due to parsing incorrectly

Jerry Lam created SPARK-14309:
---------------------------------

             Summary: Dataframe returns wrong results due to parsing incorrectly
                 Key: SPARK-14309
                 URL: https://issues.apache.org/jira/browse/SPARK-14309
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 1.5.1
            Reporter: Jerry Lam


I observed the below behavior using dataframe. The expected answer should be 60 but there is no way to get the value unless to turn dataframe into rdd and access it in the Row.

I have include the SQL statement and it returns the correct result because I believe, it is using Hive parser.

{code}
val base = sc.parallelize(( 0 to 49).zip( 0 to 49) ++ (30 to 79).zip(50 to 99)).toDF("id", "label")
val d1 = base.where($"label" < 60).as("d1")
val d2 = base.where($"label" === 60).as("d2")
d1.join(d2, "id").show
+---+-----+-----+
| id|label|label|
+---+-----+-----+
| 40|   40|   60|
+---+-----+-----+
d1.join(d2, "id").select(d1("label")).show
+-----+
|label|
+-----+
|   40|
+-----+
(expected answer: 40, right!)

d1.join(d2, "id").map{row => row.getAs[Int](2)}
d1.join(d2, "id").select(d2("label")).show
+-----+
|label|
+-----+
|   40|
+-----+
(expected answer: 60, wrong!)
d1.join(d2, "id").select(d2("label")).explain(true)

scala> d1.join(d2, "id").select(d2("label")).explain(true)
== Parsed Logical Plan ==
Project [label#3]
 Project [id#2,label#3,label#7]
  Join Inner, Some((id#2 = id#6))
   Subquery d1
    Filter (label#3 < 60)
     Project [_1#0 AS id#2,_2#1 AS label#3]
      LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21
   Subquery d2
    Filter (label#7 = 60)
     Project [_1#0 AS id#6,_2#1 AS label#7]
      LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21

== Analyzed Logical Plan ==
label: int
Project [label#3]
 Project [id#2,label#3,label#7]
  Join Inner, Some((id#2 = id#6))
   Subquery d1
    Filter (label#3 < 60)
     Project [_1#0 AS id#2,_2#1 AS label#3]
      LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21
   Subquery d2
    Filter (label#7 = 60)
     Project [_1#0 AS id#6,_2#1 AS label#7]
      LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21

== Optimized Logical Plan ==
Project [label#3]
 Join Inner, Some((id#2 = id#6))
  Project [_1#0 AS id#2,_2#1 AS label#3]
   Filter (_2#1 < 60)
    LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21
  Project [_1#0 AS id#6]
   Filter (_2#1 = 60)
    LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21

== Physical Plan ==
TungstenProject [label#3]
 SortMergeJoin [id#2], [id#6]
  TungstenSort [id#2 ASC], false, 0
   TungstenExchange hashpartitioning(id#2)
    TungstenProject [_1#0 AS id#2,_2#1 AS label#3]
     Filter (_2#1 < 60)
      Scan PhysicalRDD[_1#0,_2#1]
  TungstenSort [id#6 ASC], false, 0
   TungstenExchange hashpartitioning(id#6)
    TungstenProject [_1#0 AS id#6]
     Filter (_2#1 = 60)
      Scan PhysicalRDD[_1#0,_2#1]

def (d1 :DataFrame, d2: DataFrame)

base.registerTempTable("base")
sqlContext.sql("select d2.label from (select * from base where label < 60) as d1 inner join (select * from base where label = 60) as d2 on d1.id = d2.id").explain(true)

== Parsed Logical Plan ==
'Project [unresolvedalias('d2.label)]
 'Join Inner, Some(('d1.id = 'd2.id))
  'Subquery d1
   'Project [unresolvedalias(*)]
    'Filter ('label < 60)
     'UnresolvedRelation [base], None
  'Subquery d2
   'Project [unresolvedalias(*)]
    'Filter ('label = 60)
     'UnresolvedRelation [base], None

== Analyzed Logical Plan ==
label: int
Project [label#15]
 Join Inner, Some((id#2 = id#14))
  Subquery d1
   Project [id#2,label#3]
    Filter (label#3 < 60)
     Subquery base
      Project [_1#0 AS id#2,_2#1 AS label#3]
       LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21
  Subquery d2
   Project [id#14,label#15]
    Filter (label#15 = 60)
     Subquery base
      Project [_1#0 AS id#14,_2#1 AS label#15]
       LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21

== Optimized Logical Plan ==
Project [label#15]
 Join Inner, Some((id#2 = id#14))
  Project [_1#0 AS id#2]
   Filter (_2#1 < 60)
    LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21
  Project [_1#0 AS id#14,_2#1 AS label#15]
   Filter (_2#1 = 60)
    LogicalRDD [_1#0,_2#1], MapPartitionsRDD[1] at rddToDataFrameHolder at <console>:21

== Physical Plan ==
TungstenProject [label#15]
 SortMergeJoin [id#2], [id#14]
  TungstenSort [id#2 ASC], false, 0
   TungstenExchange hashpartitioning(id#2)
    TungstenProject [_1#0 AS id#2]
     Filter (_2#1 < 60)
      Scan PhysicalRDD[_1#0,_2#1]
  TungstenSort [id#14 ASC], false, 0
   TungstenExchange hashpartitioning(id#14)
    TungstenProject [_1#0 AS id#14,_2#1 AS label#15]
     Filter (_2#1 = 60)
      Scan PhysicalRDD[_1#0,_2#1]

{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org