You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lihu <li...@gmail.com> on 2014/07/19 09:31:10 UTC

Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext

Hi,
    Everyone.  I have a piece of following code. When I run it,
it occurred the error just like below, it seem that the SparkContext is not
serializable, but i do not try to use the SparkContext except the broadcast.
    [In fact, this code is in the MLLib, I just try to broadcast the
 centerArrays ]

    it can success in the redeceBykey operation, but failed at the collect
operation, this confused me.


INFO DAGScheduler: Failed to run collect at KMeans.scala:235
[error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task not
serializable: java.io.NotSerializableException:
org.apache.spark.SparkContext
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: org.apache.spark.SparkContext
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)




private def initKMeansParallel(data: RDD[Array[Double]]):
Array[ClusterCenters] = {

    @transient val sc = data.sparkContext           // I try to add
the transient
annotation here, but it doesn't work

    // Initialize each run's center to a random point
    val seed = new XORShiftRandom().nextInt()
    val sample = data.takeSample(true, runs, seed).toSeq
    val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))

    // On each step, sample 2 * k points on average for each run with
probability proportional
    // to their squared distance from that run's current centers
    for (step <- 0 until initializationSteps) {
      val centerArrays = sc.broadcast(centers.map(_.toArray))
      val sumCosts = data.flatMap { point =>
        for (r <- 0 until runs) yield (r,
KMeans.pointCost(centerArrays.value(r), point))
      }.reduceByKey(_ + _).collectAsMap()
                                    //can pass at this point
      val chosen = data.mapPartitionsWithIndex { (index, points) =>
        val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
        for {
          p <- points
          r <- 0 until runs
          if rand.nextDouble() < KMeans.pointCost(centerArrays.value(r), p)
* 2 * k / sumCosts(r)
        } yield (r, p)
      }.collect()
                                                    // failed at this
point.
      for ((r, p) <- chosen) {
        centers(r) += p
      }
    }

Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext

Posted by Tathagata Das <ta...@gmail.com>.
You can set the Java option "-Dsun.io.serialization.extendedDebugInfo=true" to
have more information about the object be printed. It will help you trace
down the how the SparkContext is getting included in some kind of closure.

TD


On Thu, Jul 24, 2014 at 9:48 AM, lihu <li...@gmail.com> wrote:

> ​Which code do you used, do you caused by your own code or something in
> spark itself?
>
>
> On Tue, Jul 22, 2014 at 8:50 AM, hsy541@gmail.com <hs...@gmail.com>
> wrote:
>
>> I have the same problem
>>
>>
>> On Sat, Jul 19, 2014 at 12:31 AM, lihu <li...@gmail.com> wrote:
>>
>>> Hi,
>>>     Everyone.  I have a piece of following code. When I run it,
>>> it occurred the error just like below, it seem that the SparkContext is not
>>> serializable, but i do not try to use the SparkContext except the broadcast.
>>>     [In fact, this code is in the MLLib, I just try to broadcast the
>>>  centerArrays ]
>>>
>>>     it can success in the redeceBykey operation, but failed at the
>>> collect operation, this confused me.
>>>
>>>
>>> INFO DAGScheduler: Failed to run collect at KMeans.scala:235
>>> [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
>>> not serializable: java.io.NotSerializableException:
>>> org.apache.spark.SparkContext
>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>> java.io.NotSerializableException: org.apache.spark.SparkContext
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>>  at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>>
>>>
>>>
>>>
>>> private def initKMeansParallel(data: RDD[Array[Double]]):
>>> Array[ClusterCenters] = {
>>>
>>>     @transient val sc = data.sparkContext           // I try to add the transient
>>> annotation here, but it doesn't work
>>>
>>>     // Initialize each run's center to a random point
>>>     val seed = new XORShiftRandom().nextInt()
>>>     val sample = data.takeSample(true, runs, seed).toSeq
>>>     val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
>>>
>>>     // On each step, sample 2 * k points on average for each run with
>>> probability proportional
>>>     // to their squared distance from that run's current centers
>>>     for (step <- 0 until initializationSteps) {
>>>       val centerArrays = sc.broadcast(centers.map(_.toArray))
>>>       val sumCosts = data.flatMap { point =>
>>>         for (r <- 0 until runs) yield (r,
>>> KMeans.pointCost(centerArrays.value(r), point))
>>>       }.reduceByKey(_ + _).collectAsMap()
>>>                                         //can pass at this point
>>>       val chosen = data.mapPartitionsWithIndex { (index, points) =>
>>>         val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
>>>         for {
>>>           p <- points
>>>           r <- 0 until runs
>>>           if rand.nextDouble() < KMeans.pointCost(centerArrays.value(r),
>>> p) * 2 * k / sumCosts(r)
>>>         } yield (r, p)
>>>       }.collect()
>>>                                                         // failed at this
>>> point.
>>>       for ((r, p) <- chosen) {
>>>         centers(r) += p
>>>       }
>>>     }
>>>
>>>
>>>
>>>
>>>
>>
>
>
> --
> *Best Wishes!*
>
>  *Li Hu(李浒) | Graduate Student*
>
> *Institute for Interdisciplinary Information Sciences(IIIS
> <http://iiis.tsinghua.edu.cn/>) *
> *Tsinghua University, China*
>
> *Email: lihu723@gmail.com <li...@gmail.com>*
> *Tel  : +86 15120081920 <%2B86%2015120081920>*
> *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
> <http://iiis.tsinghua.edu.cn/zh/lihu/>*
>
>
>

Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext

Posted by lihu <li...@gmail.com>.
​Which code do you used, do you caused by your own code or something in
spark itself?


On Tue, Jul 22, 2014 at 8:50 AM, hsy541@gmail.com <hs...@gmail.com> wrote:

> I have the same problem
>
>
> On Sat, Jul 19, 2014 at 12:31 AM, lihu <li...@gmail.com> wrote:
>
>> Hi,
>>     Everyone.  I have a piece of following code. When I run it,
>> it occurred the error just like below, it seem that the SparkContext is not
>> serializable, but i do not try to use the SparkContext except the broadcast.
>>     [In fact, this code is in the MLLib, I just try to broadcast the
>>  centerArrays ]
>>
>>     it can success in the redeceBykey operation, but failed at the
>> collect operation, this confused me.
>>
>>
>> INFO DAGScheduler: Failed to run collect at KMeans.scala:235
>> [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
>> not serializable: java.io.NotSerializableException:
>> org.apache.spark.SparkContext
>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>> java.io.NotSerializableException: org.apache.spark.SparkContext
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>  at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>
>>
>>
>>
>> private def initKMeansParallel(data: RDD[Array[Double]]):
>> Array[ClusterCenters] = {
>>
>>     @transient val sc = data.sparkContext           // I try to add the transient
>> annotation here, but it doesn't work
>>
>>     // Initialize each run's center to a random point
>>     val seed = new XORShiftRandom().nextInt()
>>     val sample = data.takeSample(true, runs, seed).toSeq
>>     val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
>>
>>     // On each step, sample 2 * k points on average for each run with
>> probability proportional
>>     // to their squared distance from that run's current centers
>>     for (step <- 0 until initializationSteps) {
>>       val centerArrays = sc.broadcast(centers.map(_.toArray))
>>       val sumCosts = data.flatMap { point =>
>>         for (r <- 0 until runs) yield (r,
>> KMeans.pointCost(centerArrays.value(r), point))
>>       }.reduceByKey(_ + _).collectAsMap()
>>                                       //can pass at this point
>>       val chosen = data.mapPartitionsWithIndex { (index, points) =>
>>         val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
>>         for {
>>           p <- points
>>           r <- 0 until runs
>>           if rand.nextDouble() < KMeans.pointCost(centerArrays.value(r),
>> p) * 2 * k / sumCosts(r)
>>         } yield (r, p)
>>       }.collect()
>>                                                       // failed at this
>> point.
>>       for ((r, p) <- chosen) {
>>         centers(r) += p
>>       }
>>     }
>>
>>
>>
>>
>>
>


-- 
*Best Wishes!*

 *Li Hu(李浒) | Graduate Student*

*Institute for Interdisciplinary Information Sciences(IIIS
<http://iiis.tsinghua.edu.cn/>) *
*Tsinghua University, China*

*Email: lihu723@gmail.com <li...@gmail.com>*
*Tel  : +86 15120081920*
*Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
<http://iiis.tsinghua.edu.cn/zh/lihu/>*

Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
I have the same problem


On Sat, Jul 19, 2014 at 12:31 AM, lihu <li...@gmail.com> wrote:

> Hi,
>     Everyone.  I have a piece of following code. When I run it,
> it occurred the error just like below, it seem that the SparkContext is not
> serializable, but i do not try to use the SparkContext except the broadcast.
>     [In fact, this code is in the MLLib, I just try to broadcast the
>  centerArrays ]
>
>     it can success in the redeceBykey operation, but failed at the collect
> operation, this confused me.
>
>
> INFO DAGScheduler: Failed to run collect at KMeans.scala:235
> [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
> not serializable: java.io.NotSerializableException:
> org.apache.spark.SparkContext
> org.apache.spark.SparkException: Job aborted: Task not serializable:
> java.io.NotSerializableException: org.apache.spark.SparkContext
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>
>
>
>
> private def initKMeansParallel(data: RDD[Array[Double]]):
> Array[ClusterCenters] = {
>
>     @transient val sc = data.sparkContext           // I try to add the transient
> annotation here, but it doesn't work
>
>     // Initialize each run's center to a random point
>     val seed = new XORShiftRandom().nextInt()
>     val sample = data.takeSample(true, runs, seed).toSeq
>     val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
>
>     // On each step, sample 2 * k points on average for each run with
> probability proportional
>     // to their squared distance from that run's current centers
>     for (step <- 0 until initializationSteps) {
>       val centerArrays = sc.broadcast(centers.map(_.toArray))
>       val sumCosts = data.flatMap { point =>
>         for (r <- 0 until runs) yield (r,
> KMeans.pointCost(centerArrays.value(r), point))
>       }.reduceByKey(_ + _).collectAsMap()
>                                       //can pass at this point
>       val chosen = data.mapPartitionsWithIndex { (index, points) =>
>         val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
>         for {
>           p <- points
>           r <- 0 until runs
>           if rand.nextDouble() < KMeans.pointCost(centerArrays.value(r),
> p) * 2 * k / sumCosts(r)
>         } yield (r, p)
>       }.collect()
>                                                       // failed at this
> point.
>       for ((r, p) <- chosen) {
>         centers(r) += p
>       }
>     }
>
>
>
>
>