You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "gurmukh singh (JIRA)" <ji...@apache.org> on 2016/08/31 22:20:21 UTC

[jira] [Comment Edited] (SPARK-17211) Broadcast join produces incorrect results on EMR with large driver memory

    [ https://issues.apache.org/jira/browse/SPARK-17211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15453541#comment-15453541 ] 

gurmukh singh edited comment on SPARK-17211 at 8/31/16 10:19 PM:
-----------------------------------------------------------------

Hi

I can see this in Apache Spark 2.0 as well, running with same node configurations as above mentioned.

Apache Hadoop 2.72., Spark 2.0:
-------------------------------------------

[hadoop@sp1 ~]$ hadoop version
Hadoop 2.7.2
Subversion Unknown -r Unknown
Compiled by root on 2016-05-16T03:56Z
Compiled with protoc 2.5.0
From source with checksum d0fda26633fa762bff87ec759ebe689c
This command was run using /opt/cluster/hadoop-2.7.2/share/hadoop/common/hadoop-common-2.7.2.jar

[hadoop@sp1 ~]$ spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0
      /_/

Branch
Compiled by user jenkins on 2016-07-19T21:16:09Z
Revision

[hadoop@sp1 hadoop]$ spark-shell --master yarn --deploy-mode client --num-executors 20 --executor-cores 2 --executor-memory 12g --driver-memory 48g --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.sql.shuffle.partitions=1024 --conf spark.yarn.maxAppAttempts=1
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/08/31 04:29:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/08/31 04:29:49 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
16/08/31 04:30:19 WARN spark.SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://10.0.0.227:4040
Spark context available as 'sc' (master = yarn, app id = application_1472617754154_0001).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_101)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val a1 = Array((123,1),(234,2),(432,5))
a1: Array[(Int, Int)] = Array((123,1), (234,2), (432,5))

scala> val a2 = Array(("abc",1),("bcd",2),("dcb",5))
a2: Array[(String, Int)] = Array((abc,1), (bcd,2), (dcb,5))

scala> val df1 = sc.parallelize(a1).toDF("gid","id")
df1: org.apache.spark.sql.DataFrame = [gid: int, id: int]

scala> val df2 = sc.parallelize(a2).toDF("gname","id")
df2: org.apache.spark.sql.DataFrame = [gname: string, id: int]


scala> df1.join(df2,"id").show()
+---+---+-----+
| id|gid|gname|
+---+---+-----+
|  1|123|  abc|
|  2|234|  bcd|
|  5|432|  dcb|
+---+---+-----+

scala> val df2 = sc.parallelize(a2).toDF("gname","id")
df2: org.apache.spark.sql.DataFrame = [gname: string, id: int]

scala> df1.join(broadcast(df2),"id").show()
+---+---+-----+
| id|gid|gname|
+---+---+-----+
|  1|123| null|
|  2|234| null|
|  5|432| null|
+---+---+-----+

scala> broadcast(df1).join(df2,"id").show()
+---+---+-----+
| id|gid|gname|
+---+---+-----+
|  0|  1|  abc|
|  0|  2|  bcd|
|  0|  5|  dcb|
+---+---+-----+

If I reduce the driver memory, this works as well. 

It works on Apache spark 1.6 

As lot of things have changed in Spark 2.0, it needs to be looked upon. It should give error or OOM, instead of returning NULL or ZERO values.

[~himanish] Although, it will be interesting to understand the use case that on a node with 61 GB, executing with driver memory=48GB, leaving just 12 GB for so many other things, when there are other overheads on the system.




was (Author: gurmukhd):
Hi

I can see this in Apache Spark 2.0 as well running with same node configurations as above mentioned.

Apache Hadoop 2.72., Spark 2.0:
-------------------------------------------

[hadoop@sp1 ~]$ hadoop version
Hadoop 2.7.2
Subversion Unknown -r Unknown
Compiled by root on 2016-05-16T03:56Z
Compiled with protoc 2.5.0
From source with checksum d0fda26633fa762bff87ec759ebe689c
This command was run using /opt/cluster/hadoop-2.7.2/share/hadoop/common/hadoop-common-2.7.2.jar

[hadoop@sp1 ~]$ spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0
      /_/

Branch
Compiled by user jenkins on 2016-07-19T21:16:09Z
Revision

[hadoop@sp1 hadoop]$ spark-shell --master yarn --deploy-mode client --num-executors 20 --executor-cores 2 --executor-memory 12g --driver-memory 48g --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.sql.shuffle.partitions=1024 --conf spark.yarn.maxAppAttempts=1
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/08/31 04:29:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/08/31 04:29:49 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
16/08/31 04:30:19 WARN spark.SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://10.0.0.227:4040
Spark context available as 'sc' (master = yarn, app id = application_1472617754154_0001).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_101)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val a1 = Array((123,1),(234,2),(432,5))
a1: Array[(Int, Int)] = Array((123,1), (234,2), (432,5))

scala> val a2 = Array(("abc",1),("bcd",2),("dcb",5))
a2: Array[(String, Int)] = Array((abc,1), (bcd,2), (dcb,5))

scala> val df1 = sc.parallelize(a1).toDF("gid","id")
df1: org.apache.spark.sql.DataFrame = [gid: int, id: int]

scala> val df2 = sc.parallelize(a2).toDF("gname","id")
df2: org.apache.spark.sql.DataFrame = [gname: string, id: int]


scala> df1.join(df2,"id").show()
+---+---+-----+
| id|gid|gname|
+---+---+-----+
|  1|123|  abc|
|  2|234|  bcd|
|  5|432|  dcb|
+---+---+-----+

scala> val df2 = sc.parallelize(a2).toDF("gname","id")
df2: org.apache.spark.sql.DataFrame = [gname: string, id: int]

scala> df1.join(broadcast(df2),"id").show()
+---+---+-----+
| id|gid|gname|
+---+---+-----+
|  1|123| null|
|  2|234| null|
|  5|432| null|
+---+---+-----+

scala> broadcast(df1).join(df2,"id").show()
+---+---+-----+
| id|gid|gname|
+---+---+-----+
|  0|  1|  abc|
|  0|  2|  bcd|
|  0|  5|  dcb|
+---+---+-----+

If I reduce the driver memory, this works as well. 

As lot of things have changed in Spark 2.0, it needs to be looked upon. It should give error or OOM, instead of returning NULL or ZERO values.

[~himanish] Although, it will be interesting to understand the use case that on a node with 61 GB, executing with driver memory=48GB, leaving just 12 GB for so many other things, when there are other overheads on the system.

It works on Apache spark 1.6 

> Broadcast join produces incorrect results on EMR with large driver memory
> -------------------------------------------------------------------------
>
>                 Key: SPARK-17211
>                 URL: https://issues.apache.org/jira/browse/SPARK-17211
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.0.0
>            Reporter: Jarno Seppanen
>
> Broadcast join produces incorrect columns in join result, see below for an example. The same join but without using broadcast gives the correct columns.
> Running PySpark on YARN on Amazon EMR 5.0.0.
> {noformat}
> import pyspark.sql.functions as func
> keys = [
>     (54000000, 0),
>     (54000001, 1),
>     (54000002, 2),
> ]
> keys_df = spark.createDataFrame(keys, ['key_id', 'value']).coalesce(1)
> keys_df.show()
> # +--------+-----+
> # |  key_id|value|
> # +--------+-----+
> # |54000000|    0|
> # |54000001|    1|
> # |54000002|    2|
> # +--------+-----+
> data = [
>     (54000002,    1),
>     (54000000,    2),
>     (54000001,    3),
> ]
> data_df = spark.createDataFrame(data, ['key_id', 'foo'])
> data_df.show()
> # +--------+---+                                                                  
> # |  key_id|foo|
> # +--------+---+
> # |54000002|  1|
> # |54000000|  2|
> # |54000001|  3|
> # +--------+---+
> ### INCORRECT ###
> data_df.join(func.broadcast(keys_df), 'key_id').show()
> # +--------+---+--------+                                                         
> # |  key_id|foo|   value|
> # +--------+---+--------+
> # |54000002|  1|54000002|
> # |54000000|  2|54000000|
> # |54000001|  3|54000001|
> # +--------+---+--------+
> ### CORRECT ###
> data_df.join(keys_df, 'key_id').show()
> # +--------+---+-----+
> # |  key_id|foo|value|
> # +--------+---+-----+
> # |54000000|  2|    0|
> # |54000001|  3|    1|
> # |54000002|  1|    2|
> # +--------+---+-----+
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org