You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Dhimant Jayswal <dh...@gmail.com> on 2014/04/09 07:07:39 UTC

java.io.NotSerializableException exception - custom Accumulator

Hi ,

I am getting java.io.NotSerializableException exception while executing
following program.

import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext
import org.apache.spark.AccumulatorParam
object App {
  class Vector (val data: Array[Double]) {}
  implicit object VectorAP extends AccumulatorParam[Vector]  {
    def zero(v: Vector) : Vector = new Vector(new Array(v.data.size))
    def addInPlace(v1: Vector, v2: Vector) : Vector = {
      for (i <- 0 to v1.data.size-1) v1.data(i) += v2.data(i)
      return v1
    }
  }
  def main(sc:SparkContext) {
    val vectorAcc = sc.accumulator(new Vector(Array(0, 0)))
    val accum = sc.accumulator(0)
    val file = sc.textFile("/user/root/data/SourceFiles/a.txt", 10)
    file.foreach(line => {println(line); accum+=1; vectorAcc.add(new
Vector(Array(1,1 ))) ;})
    println(accum.value)
    println(vectorAcc.value.data)
    println("=================" )
  }
}

--------------------------------------------------------------------------------------------------

scala> App.main(sc)
14/04/09 01:02:05 INFO storage.MemoryStore: ensureFreeSpace(130760) called
with curMem=0, maxMem=308713881
14/04/09 01:02:05 INFO storage.MemoryStore: Block broadcast_0 stored as
values to memory (estimated size 127.7 KB, free 294.3 MB)
14/04/09 01:02:07 INFO mapred.FileInputFormat: Total input paths to process
: 1
14/04/09 01:02:07 INFO spark.SparkContext: Starting job: foreach at
<console>:30
14/04/09 01:02:07 INFO scheduler.DAGScheduler: Got job 0 (foreach at
<console>:30) with 11 output partitions (allowLocal=false)
14/04/09 01:02:07 INFO scheduler.DAGScheduler: Final stage: Stage 0
(foreach at <console>:30)
14/04/09 01:02:07 INFO scheduler.DAGScheduler: Parents of final stage:
List()
14/04/09 01:02:07 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/09 01:02:07 INFO scheduler.DAGScheduler: Submitting Stage 0
(MappedRDD[1] at textFile at <console>:29), which has no missing parents
14/04/09 01:02:07 INFO scheduler.DAGScheduler: Failed to run foreach at
<console>:30
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$App$
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
        at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
        at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
        at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)