You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "gurmukh singh (JIRA)" <ji...@apache.org> on 2016/09/01 22:53:20 UTC

[jira] [Comment Edited] (SPARK-17211) Broadcast join produces incorrect results on EMR with large driver memory

    [ https://issues.apache.org/jira/browse/SPARK-17211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456867#comment-15456867 ] 

gurmukh singh edited comment on SPARK-17211 at 9/1/16 10:52 PM:
----------------------------------------------------------------

Thanks Davies.

Can see the issue with the offsets:

It picks some random values beyond offset and then tries to do a join on rows, which will obviosuly return NULL as those rows are not there:

scala> val bc=sc.broadcast(df1)

scala> bc.value.view.take(10).foreach(println)
(abc,1)
(bcd,2)
(dcb,5)

scala> bc.value.view.toDF
res13: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> bc.value.view.toDF("gid", "id")
res14: org.apache.spark.sql.DataFrame = [gid: string, id: int]

scala> bc.value.view.toDF("gid", "id").show()
+---+---+
|gid| id|
+---+---+
|abc|  1|
|bcd|  2|
|dcb|  5|
+---+---+

scala> df1.join(bc.value.view.toDF("gid", "id"), "id").show()
+---+---+----+
| id|gid| gid|
+---+---+----+
|  1|123|null|
|  2|234|null|
|  5|432|null|
+---+---+----+


scala> df1.join(bc.value.view.toDF("gidyy", "id"), "id").show()
+---+---+-----+
| id|gid|gidyy|
+---+---+-----+
|  1|123| null|
|  2|234| null|
|  5|432| null|
+---+---+-----+


scala> bc.value.view.toDF("gid", "id").show()
+---+---+
|gid| id|
+---+---+
|abc|  1|
|bcd|  2|
|dcb|  5|
+---+---+


scala> df1.show()
+---+---+
|gid| id|
+---+---+
|123|  1|
|234|  2|
|432|  5|
+---+---+


scala> df1.join(bc.value.view.toDF("gidyy", "id"), "id").show()
+---+---+-----+
| id|gid|gidyy|
+---+---+-----+
|  1|123| null|
|  2|234| null|
|  5|432| null|
+---+---+-----+


scala> bc.value.view.toDF("gid", "id").join(df1, "id").show()
+-------+----+---+
|     id| gid|gid|
+-------+----+---+
|6513249|null|123|	<----- Look at the "id" where is it picking from ? is is fetching some out of address locations, which it should not be
|6579042|null|234|
|6447972|null|432|
+-------+----+---+


scala> bc.value.view.toDF("gidy", "id").join(df1, "id").show()
+-------+----+---+
|     id|gidy|gid|
+-------+----+---+
|6513249|null|123|
|6579042|null|234|
|6447972|null|432|
+-------+----+---+


was (Author: gurmukhd):
Thanks Davies.

Can see the issue with the offsets:

*** Interestingly **** 

It picks some random values beyond offset and then tries to do a join on rows, which will obviosuly return NULL as those rows are not there:

scala> val bc=sc.broadcast(df1)

scala> bc.value.view.take(10).foreach(println)
(abc,1)
(bcd,2)
(dcb,5)

scala> bc.value.view.toDF
res13: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> bc.value.view.toDF("gid", "id")
res14: org.apache.spark.sql.DataFrame = [gid: string, id: int]

scala> bc.value.view.toDF("gid", "id").show()
+---+---+
|gid| id|
+---+---+
|abc|  1|
|bcd|  2|
|dcb|  5|
+---+---+

scala> df1.join(bc.value.view.toDF("gid", "id"), "id").show()
+---+---+----+
| id|gid| gid|
+---+---+----+
|  1|123|null|
|  2|234|null|
|  5|432|null|
+---+---+----+


scala> df1.join(bc.value.view.toDF("gidyy", "id"), "id").show()
+---+---+-----+
| id|gid|gidyy|
+---+---+-----+
|  1|123| null|
|  2|234| null|
|  5|432| null|
+---+---+-----+


scala> bc.value.view.toDF("gid", "id").show()
+---+---+
|gid| id|
+---+---+
|abc|  1|
|bcd|  2|
|dcb|  5|
+---+---+


scala> df1.show()
+---+---+
|gid| id|
+---+---+
|123|  1|
|234|  2|
|432|  5|
+---+---+


scala> df1.join(bc.value.view.toDF("gidyy", "id"), "id").show()
+---+---+-----+
| id|gid|gidyy|
+---+---+-----+
|  1|123| null|
|  2|234| null|
|  5|432| null|
+---+---+-----+


scala> bc.value.view.toDF("gid", "id").join(df1, "id").show()
+-------+----+---+
|     id| gid|gid|
+-------+----+---+
|6513249|null|123|	<----- Look at the "id" where is it picking from ? is is fetching some out of address locations, which it should not be
|6579042|null|234|
|6447972|null|432|
+-------+----+---+


scala> bc.value.view.toDF("gidy", "id").join(df1, "id").show()
+-------+----+---+
|     id|gidy|gid|
+-------+----+---+
|6513249|null|123|
|6579042|null|234|
|6447972|null|432|
+-------+----+---+

> Broadcast join produces incorrect results on EMR with large driver memory
> -------------------------------------------------------------------------
>
>                 Key: SPARK-17211
>                 URL: https://issues.apache.org/jira/browse/SPARK-17211
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.0.0
>            Reporter: Jarno Seppanen
>            Assignee: Davies Liu
>
> Broadcast join produces incorrect columns in join result, see below for an example. The same join but without using broadcast gives the correct columns.
> Running PySpark on YARN on Amazon EMR 5.0.0.
> {noformat}
> import pyspark.sql.functions as func
> keys = [
>     (54000000, 0),
>     (54000001, 1),
>     (54000002, 2),
> ]
> keys_df = spark.createDataFrame(keys, ['key_id', 'value']).coalesce(1)
> keys_df.show()
> # +--------+-----+
> # |  key_id|value|
> # +--------+-----+
> # |54000000|    0|
> # |54000001|    1|
> # |54000002|    2|
> # +--------+-----+
> data = [
>     (54000002,    1),
>     (54000000,    2),
>     (54000001,    3),
> ]
> data_df = spark.createDataFrame(data, ['key_id', 'foo'])
> data_df.show()
> # +--------+---+                                                                  
> # |  key_id|foo|
> # +--------+---+
> # |54000002|  1|
> # |54000000|  2|
> # |54000001|  3|
> # +--------+---+
> ### INCORRECT ###
> data_df.join(func.broadcast(keys_df), 'key_id').show()
> # +--------+---+--------+                                                         
> # |  key_id|foo|   value|
> # +--------+---+--------+
> # |54000002|  1|54000002|
> # |54000000|  2|54000000|
> # |54000001|  3|54000001|
> # +--------+---+--------+
> ### CORRECT ###
> data_df.join(keys_df, 'key_id').show()
> # +--------+---+-----+
> # |  key_id|foo|value|
> # +--------+---+-----+
> # |54000000|  2|    0|
> # |54000001|  3|    1|
> # |54000002|  1|    2|
> # +--------+---+-----+
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org