You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2014/12/17 21:15:13 UTC

[jira] [Resolved] (SPARK-3926) result of JavaRDD collectAsMap() is not serializable

     [ https://issues.apache.org/jira/browse/SPARK-3926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Josh Rosen resolved SPARK-3926.
-------------------------------
       Resolution: Fixed
    Fix Version/s:     (was: 1.1.2)
                   1.2.1

I've merged this into {{branch-1.2}} for inclusion in 1.2.1, so I'm marking this as Fixed.

> result of JavaRDD collectAsMap() is not serializable
> ----------------------------------------------------
>
>                 Key: SPARK-3926
>                 URL: https://issues.apache.org/jira/browse/SPARK-3926
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API
>    Affects Versions: 1.0.2, 1.1.0, 1.1.1, 1.2.0
>         Environment: CentOS / Spark 1.1 / Hadoop Hortonworks 2.4.0.2.1.2.0-402
>            Reporter: Antoine Amend
>            Assignee: Sean Owen
>             Fix For: 1.3.0, 1.2.1
>
>
> Using the Java API, I want to collect the result of a RDD<String, String> as a HashMap using collectAsMap function:
> Map<String, String> map = myJavaRDD.collectAsMap();
> This works fine, but when passing this map to another function, such as...
> myOtherJavaRDD.mapToPair(new CustomFunction(map))
> ...this leads to the following error:
> Exception in thread "main" org.apache.spark.SparkException: Task not serializable
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> 	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
> 	at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
> 	at org.apache.spark.rdd.RDD.map(RDD.scala:270)
> 	at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:99)
> 	at org.apache.spark.api.java.JavaPairRDD.mapToPair(JavaPairRDD.scala:44)
> 	../.. MY CLASS ../..
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
> 	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> 	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.io.NotSerializableException: scala.collection.convert.Wrappers$MapWrapper
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> 	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
> 	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
> at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
> This seems to be due to WrapAsJava.scala being non serializable
> ../..
>   implicit def mapAsJavaMap[A, B](m: Map[A, B]): ju.Map[A, B] = m match {
>     //case JConcurrentMapWrapper(wrapped) => wrapped
>     case JMapWrapper(wrapped) => wrapped.asInstanceOf[ju.Map[A, B]]
>     case _ => new MapWrapper(m)
>   }
> ../..
> The workaround is to manually wrapper this map into another one (serialized)
> Map<String, String> map = myJavaRDD.collectAsMap();
> Map<String, String> tmp = new HashMap<String, String>(map);
> myOtherJavaRDD.mapToPair(new CustomFunction(tmp))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org