You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2014/12/28 23:16:13 UTC

[jira] [Updated] (SPARK-4882) pyspark broadcast breaks if spark serializer configuration set to KryoSerializer running under Mesos

     [ https://issues.apache.org/jira/browse/SPARK-4882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Josh Rosen updated SPARK-4882:
------------------------------
    Description: 
This issue plagued me weeks ago, and finally hit a point where I just had to find a solution!

My spark-defaults.conf file had this property set

spark.serializer                org.apache.spark.serializer.KryoSerializer

The following example IN LOCAL mode works fine
(from https://github.com/apache/spark/blob/master/python/pyspark/broadcast.py)

{code}
    >>> from pyspark.context import SparkContext
    >>> sc = SparkContext('local', 'test')
    >>> b = sc.broadcast([1, 2, 3, 4, 5])
    >>> b.value
    [1, 2, 3, 4, 5]
    >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
    [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
    >>> b.unpersist()
{code}

However, when I initialize the SparkContext pointing to my Mesos cluster,
I get the following stack trace

{code}
14/12/18 08:08:37 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 3, 10.20.100.202, PROCESS_LOCAL, 1120 bytes)
14/12/18 08:08:46 INFO storage.BlockManagerMasterActor: Registering block manager 10.20.100.202:55734 with 1060.3 MB RAM, BlockManagerId(20141217-015001-1278350346-5050-28-3, 10.20.100.202, 55734)
14/12/18 08:08:47 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on 10.20.100.202:55734 (size: 6.3 KB, free: 1060.3 MB)
14/12/18 08:08:47 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.20.100.202:55734 (size: 68.0 B, free: 1060.3 MB)
14/12/18 08:08:47 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3, 10.20.100.202): java.lang.NullPointerException
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:589)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:232)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:228)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:228)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:203)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:203)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1459)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:202)
{code}

I found out that local mode works fine rather painfully, since I had originally been running Spark under Mesos, and was trying every which way to try to find out why I was hitting an NPE.
Only when I found the local example did I make progress and eventually tracked it down to the KryoSerializer configs.

When I commented out the `spark.serializer` configuration (and thus used the default JavaSerializer), the broadcast finally works!

I don't even know if KryoSerializer is an appropriate setting for a pyspark program (seems like no?).
Even so, who is to say that I wouldn't be running Java/Scala programs in tandem (using the same spark-defaults file), which presumedly would want to benefit from the KryoSerializer. 
Albeit, a workaround seems to be to override the `spark.serializer` setting in my pyspark code or change the defaults.

thanks,
Fi


  was:
This issue plagued me weeks ago, and finally hit a point where I just had to find a solution!

My spark-defaults.conf file had this property set

spark.serializer                org.apache.spark.serializer.KryoSerializer

The following example IN LOCAL mode works fine
(from https://github.com/apache/spark/blob/master/python/pyspark/broadcast.py)

    >>> from pyspark.context import SparkContext
    >>> sc = SparkContext('local', 'test')
    >>> b = sc.broadcast([1, 2, 3, 4, 5])
    >>> b.value
    [1, 2, 3, 4, 5]
    >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
    [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
    >>> b.unpersist()

However, when I initialize the SparkContext pointing to my Mesos cluster,
I get the following stack trace

14/12/18 08:08:37 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 3, 10.20.100.202, PROCESS_LOCAL, 1120 bytes)
14/12/18 08:08:46 INFO storage.BlockManagerMasterActor: Registering block manager 10.20.100.202:55734 with 1060.3 MB RAM, BlockManagerId(20141217-015001-1278350346-5050-28-3, 10.20.100.202, 55734)
14/12/18 08:08:47 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on 10.20.100.202:55734 (size: 6.3 KB, free: 1060.3 MB)
14/12/18 08:08:47 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.20.100.202:55734 (size: 68.0 B, free: 1060.3 MB)
14/12/18 08:08:47 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3, 10.20.100.202): java.lang.NullPointerException
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:589)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:232)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:228)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:228)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:203)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:203)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1459)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:202)

I found out that local mode works fine rather painfully, since I had originally been running Spark under Mesos, and was trying every which way to try to find out why I was hitting an NPE.
Only when I found the local example did I make progress and eventually tracked it down to the KryoSerializer configs.

When I commented out the `spark.serializer` configuration (and thus used the default JavaSerializer), the broadcast finally works!

I don't even know if KryoSerializer is an appropriate setting for a pyspark program (seems like no?).
Even so, who is to say that I wouldn't be running Java/Scala programs in tandem (using the same spark-defaults file), which presumedly would want to benefit from the KryoSerializer. 
Albeit, a workaround seems to be to override the `spark.serializer` setting in my pyspark code or change the defaults.

thanks,
Fi



> pyspark broadcast breaks if spark serializer configuration set to KryoSerializer running under Mesos
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-4882
>                 URL: https://issues.apache.org/jira/browse/SPARK-4882
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.1.1
>            Reporter: Fi
>              Labels: broadcast, kryo, npe, pyspark, serializers
>
> This issue plagued me weeks ago, and finally hit a point where I just had to find a solution!
> My spark-defaults.conf file had this property set
> spark.serializer                org.apache.spark.serializer.KryoSerializer
> The following example IN LOCAL mode works fine
> (from https://github.com/apache/spark/blob/master/python/pyspark/broadcast.py)
> {code}
>     >>> from pyspark.context import SparkContext
>     >>> sc = SparkContext('local', 'test')
>     >>> b = sc.broadcast([1, 2, 3, 4, 5])
>     >>> b.value
>     [1, 2, 3, 4, 5]
>     >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
>     [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>     >>> b.unpersist()
> {code}
> However, when I initialize the SparkContext pointing to my Mesos cluster,
> I get the following stack trace
> {code}
> 14/12/18 08:08:37 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 3, 10.20.100.202, PROCESS_LOCAL, 1120 bytes)
> 14/12/18 08:08:46 INFO storage.BlockManagerMasterActor: Registering block manager 10.20.100.202:55734 with 1060.3 MB RAM, BlockManagerId(20141217-015001-1278350346-5050-28-3, 10.20.100.202, 55734)
> 14/12/18 08:08:47 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on 10.20.100.202:55734 (size: 6.3 KB, free: 1060.3 MB)
> 14/12/18 08:08:47 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.20.100.202:55734 (size: 68.0 B, free: 1060.3 MB)
> 14/12/18 08:08:47 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3, 10.20.100.202): java.lang.NullPointerException
> at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:589)
> at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:232)
> at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:228)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:228)
> at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:203)
> at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:203)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1459)
> at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:202)
> {code}
> I found out that local mode works fine rather painfully, since I had originally been running Spark under Mesos, and was trying every which way to try to find out why I was hitting an NPE.
> Only when I found the local example did I make progress and eventually tracked it down to the KryoSerializer configs.
> When I commented out the `spark.serializer` configuration (and thus used the default JavaSerializer), the broadcast finally works!
> I don't even know if KryoSerializer is an appropriate setting for a pyspark program (seems like no?).
> Even so, who is to say that I wouldn't be running Java/Scala programs in tandem (using the same spark-defaults file), which presumedly would want to benefit from the KryoSerializer. 
> Albeit, a workaround seems to be to override the `spark.serializer` setting in my pyspark code or change the defaults.
> thanks,
> Fi



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org