You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tony Zhang (JIRA)" <ji...@apache.org> on 2019/06/17 02:33:01 UTC

[jira] [Commented] (SPARK-10892) Join with Data Frame returns wrong results

    [ https://issues.apache.org/jira/browse/SPARK-10892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16865245#comment-16865245 ] 

Tony Zhang commented on SPARK-10892:
------------------------------------

This issue still exists in latest code as of today's Spark code (Spark 3.0.0). Took some time debugging and here are my observations:

prcp("value") and tmin("value") are referring to the same Column instance thus `select` on the joined Dataframe returns the same value `3`. The same Column instance is from the same LogicalPlan held by the Dataframe instance, which is created in the `load()` method. This LogicalPlan instance is reused in each `filter()` call, then provides the same Column instance with the same column name, like "value".

This can be confirmed with statement below:

 
{code:java}
scala> prcp("value").expr == tmin("value").expr
res5: Boolean = true
{code}
 

 
{code:java}
scala> prcp.join(tmin, "date_str").select(prcp("value"), tmin("value")).explain(true)
== Parsed Logical Plan ==
Project [value#9L, value#9L]
{code}
 

 
{code:java}
 
scala> prcp.explain(true)
== Parsed Logical Plan ==
'Filter ('metric = PRCP)
+- RelationV2[date_str#7, metric#8, value#9L]

scala> tmin.explain(true)
== Parsed Logical Plan ==
'Filter ('metric = TMIN)
+- RelationV2[date_str#7, metric#8, value#9L]{code}
 

This also explains why there's no such problem if you join the two Dataset created seperately (using 2 toDF() calls, or using 2 different load() calls etc.), because in that case, 2 different LogicalPlan instances will be created thus prcp("value") and tmin("value") will refer to the different column instance.

I made a local hack to create a new LogicalPlan when user calls `filter()` with different condition expression, then the problem is gone. However I think there should be a thorough fix that properly manipulate LogicalPlan instances in Dataset, instead of reusing the same instance across all functions. I will leave it to spark SQL experts. Hope my debugging above helps a little.

I'm new to spark, so please let me know if I missed sth here :)

> Join with Data Frame returns wrong results
> ------------------------------------------
>
>                 Key: SPARK-10892
>                 URL: https://issues.apache.org/jira/browse/SPARK-10892
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.4.1, 1.5.0
>            Reporter: Ofer Mendelevitch
>            Priority: Critical
>         Attachments: data.json
>
>
> I'm attaching a simplified reproducible example of the problem:
> 1. Loading a JSON file from HDFS as a Data Frame
> 2. Creating 3 data frames: PRCP, TMIN, TMAX
> 3. Joining the data frames together. Each of those has a column "value" with the same name, so renaming them after the join.
> 4. The output seems incorrect; the first column has the correct values, but the two other columns seem to have a copy of the values from the first column.
> Here's the sample code:
> {code}
> import org.apache.spark.sql._
> val sqlc = new SQLContext(sc)
> val weather = sqlc.read.format("json").load("data.json")
> val prcp = weather.filter("metric = 'PRCP'").as("prcp").cache()
> val tmin = weather.filter("metric = 'TMIN'").as("tmin").cache()
> val tmax = weather.filter("metric = 'TMAX'").as("tmax").cache()
> prcp.filter("year=2012 and month=10").show()
> tmin.filter("year=2012 and month=10").show()
> tmax.filter("year=2012 and month=10").show()
> val out = (prcp.join(tmin, "date_str").join(tmax, "date_str")
>           .select(prcp("year"), prcp("month"), prcp("day"), prcp("date_str"),
>             prcp("value").alias("PRCP"), tmin("value").alias("TMIN"),
>             tmax("value").alias("TMAX")) )
> out.filter("year=2012 and month=10").show()
> {code}
> The output is:
> {code}
> +--------+---+------+-----+-----------+-----+----+
> |date_str|day|metric|month|    station|value|year|
> +--------+---+------+-----+-----------+-----+----+
> |20121001|  1|  PRCP|   10|USW00023272|    0|2012|
> |20121002|  2|  PRCP|   10|USW00023272|    0|2012|
> |20121003|  3|  PRCP|   10|USW00023272|    0|2012|
> |20121004|  4|  PRCP|   10|USW00023272|    0|2012|
> |20121005|  5|  PRCP|   10|USW00023272|    0|2012|
> |20121006|  6|  PRCP|   10|USW00023272|    0|2012|
> |20121007|  7|  PRCP|   10|USW00023272|    0|2012|
> |20121008|  8|  PRCP|   10|USW00023272|    0|2012|
> |20121009|  9|  PRCP|   10|USW00023272|    0|2012|
> |20121010| 10|  PRCP|   10|USW00023272|    0|2012|
> |20121011| 11|  PRCP|   10|USW00023272|    3|2012|
> |20121012| 12|  PRCP|   10|USW00023272|    0|2012|
> |20121013| 13|  PRCP|   10|USW00023272|    0|2012|
> |20121014| 14|  PRCP|   10|USW00023272|    0|2012|
> |20121015| 15|  PRCP|   10|USW00023272|    0|2012|
> |20121016| 16|  PRCP|   10|USW00023272|    0|2012|
> |20121017| 17|  PRCP|   10|USW00023272|    0|2012|
> |20121018| 18|  PRCP|   10|USW00023272|    0|2012|
> |20121019| 19|  PRCP|   10|USW00023272|    0|2012|
> |20121020| 20|  PRCP|   10|USW00023272|    0|2012|
> +--------+---+------+-----+-----------+-----+——+
> +--------+---+------+-----+-----------+-----+----+
> |date_str|day|metric|month|    station|value|year|
> +--------+---+------+-----+-----------+-----+----+
> |20121001|  1|  TMIN|   10|USW00023272|  139|2012|
> |20121002|  2|  TMIN|   10|USW00023272|  178|2012|
> |20121003|  3|  TMIN|   10|USW00023272|  144|2012|
> |20121004|  4|  TMIN|   10|USW00023272|  144|2012|
> |20121005|  5|  TMIN|   10|USW00023272|  139|2012|
> |20121006|  6|  TMIN|   10|USW00023272|  128|2012|
> |20121007|  7|  TMIN|   10|USW00023272|  122|2012|
> |20121008|  8|  TMIN|   10|USW00023272|  122|2012|
> |20121009|  9|  TMIN|   10|USW00023272|  139|2012|
> |20121010| 10|  TMIN|   10|USW00023272|  128|2012|
> |20121011| 11|  TMIN|   10|USW00023272|  122|2012|
> |20121012| 12|  TMIN|   10|USW00023272|  117|2012|
> |20121013| 13|  TMIN|   10|USW00023272|  122|2012|
> |20121014| 14|  TMIN|   10|USW00023272|  128|2012|
> |20121015| 15|  TMIN|   10|USW00023272|  128|2012|
> |20121016| 16|  TMIN|   10|USW00023272|  156|2012|
> |20121017| 17|  TMIN|   10|USW00023272|  139|2012|
> |20121018| 18|  TMIN|   10|USW00023272|  161|2012|
> |20121019| 19|  TMIN|   10|USW00023272|  133|2012|
> |20121020| 20|  TMIN|   10|USW00023272|  122|2012|
> +--------+---+------+-----+-----------+-----+——+
> +--------+---+------+-----+-----------+-----+----+
> |date_str|day|metric|month|    station|value|year|
> +--------+---+------+-----+-----------+-----+----+
> |20121001|  1|  TMAX|   10|USW00023272|  322|2012|
> |20121002|  2|  TMAX|   10|USW00023272|  344|2012|
> |20121003|  3|  TMAX|   10|USW00023272|  222|2012|
> |20121004|  4|  TMAX|   10|USW00023272|  189|2012|
> |20121005|  5|  TMAX|   10|USW00023272|  194|2012|
> |20121006|  6|  TMAX|   10|USW00023272|  200|2012|
> |20121007|  7|  TMAX|   10|USW00023272|  167|2012|
> |20121008|  8|  TMAX|   10|USW00023272|  183|2012|
> |20121009|  9|  TMAX|   10|USW00023272|  194|2012|
> |20121010| 10|  TMAX|   10|USW00023272|  183|2012|
> |20121011| 11|  TMAX|   10|USW00023272|  139|2012|
> |20121012| 12|  TMAX|   10|USW00023272|  161|2012|
> |20121013| 13|  TMAX|   10|USW00023272|  211|2012|
> |20121014| 14|  TMAX|   10|USW00023272|  189|2012|
> |20121015| 15|  TMAX|   10|USW00023272|  233|2012|
> |20121016| 16|  TMAX|   10|USW00023272|  211|2012|
> |20121017| 17|  TMAX|   10|USW00023272|  278|2012|
> |20121018| 18|  TMAX|   10|USW00023272|  294|2012|
> |20121019| 19|  TMAX|   10|USW00023272|  194|2012|
> |20121020| 20|  TMAX|   10|USW00023272|  183|2012|
> +--------+---+------+-----+-----------+-----+——+
> {code}
> And the join output is:
> {code}
> +----+-----+---+--------+----+----+----+
> |year|month|day|date_str|PRCP|TMIN|TMAX|
> +----+-----+---+--------+----+----+----+
> |2012|   10|  1|20121001|   0|   0|   0|
> |2012|   10|  2|20121002|   0|   0|   0|
> |2012|   10|  3|20121003|   0|   0|   0|
> |2012|   10|  4|20121004|   0|   0|   0|
> |2012|   10|  5|20121005|   0|   0|   0|
> |2012|   10|  6|20121006|   0|   0|   0|
> |2012|   10|  7|20121007|   0|   0|   0|
> |2012|   10|  8|20121008|   0|   0|   0|
> |2012|   10|  9|20121009|   0|   0|   0|
> |2012|   10| 10|20121010|   0|   0|   0|
> |2012|   10| 11|20121011|   3|   3|   3|
> |2012|   10| 12|20121012|   0|   0|   0|
> |2012|   10| 13|20121013|   0|   0|   0|
> |2012|   10| 14|20121014|   0|   0|   0|
> |2012|   10| 15|20121015|   0|   0|   0|
> |2012|   10| 16|20121016|   0|   0|   0|
> |2012|   10| 17|20121017|   0|   0|   0|
> |2012|   10| 18|20121018|   0|   0|   0|
> |2012|   10| 19|20121019|   0|   0|   0|
> |2012|   10| 20|20121020|   0|   0|   0|
> +----+-----+---+--------+----+----+——+
> {code}
> Attachment:
> - data.json file that is read from HDFS



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org