You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Dhimant Jayswal <dh...@gmail.com> on 2014/04/09 07:07:39 UTC
java.io.NotSerializableException exception - custom Accumulator
Hi ,
I am getting java.io.NotSerializableException exception while executing
following program.
import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext
import org.apache.spark.AccumulatorParam
object App {
class Vector (val data: Array[Double]) {}
implicit object VectorAP extends AccumulatorParam[Vector] {
def zero(v: Vector) : Vector = new Vector(new Array(v.data.size))
def addInPlace(v1: Vector, v2: Vector) : Vector = {
for (i <- 0 to v1.data.size-1) v1.data(i) += v2.data(i)
return v1
}
}
def main(sc:SparkContext) {
val vectorAcc = sc.accumulator(new Vector(Array(0, 0)))
val accum = sc.accumulator(0)
val file = sc.textFile("/user/root/data/SourceFiles/a.txt", 10)
file.foreach(line => {println(line); accum+=1; vectorAcc.add(new
Vector(Array(1,1 ))) ;})
println(accum.value)
println(vectorAcc.value.data)
println("=================" )
}
}
--------------------------------------------------------------------------------------------------
scala> App.main(sc)
14/04/09 01:02:05 INFO storage.MemoryStore: ensureFreeSpace(130760) called
with curMem=0, maxMem=308713881
14/04/09 01:02:05 INFO storage.MemoryStore: Block broadcast_0 stored as
values to memory (estimated size 127.7 KB, free 294.3 MB)
14/04/09 01:02:07 INFO mapred.FileInputFormat: Total input paths to process
: 1
14/04/09 01:02:07 INFO spark.SparkContext: Starting job: foreach at
<console>:30
14/04/09 01:02:07 INFO scheduler.DAGScheduler: Got job 0 (foreach at
<console>:30) with 11 output partitions (allowLocal=false)
14/04/09 01:02:07 INFO scheduler.DAGScheduler: Final stage: Stage 0
(foreach at <console>:30)
14/04/09 01:02:07 INFO scheduler.DAGScheduler: Parents of final stage:
List()
14/04/09 01:02:07 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/09 01:02:07 INFO scheduler.DAGScheduler: Submitting Stage 0
(MappedRDD[1] at textFile at <console>:29), which has no missing parents
14/04/09 01:02:07 INFO scheduler.DAGScheduler: Failed to run foreach at
<console>:30
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$App$
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)