You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by mailforledkk <ma...@126.com> on 2014/04/10 13:20:43 UTC
回复: RDD top method exception
hi ?, all?? ? i found the reason why my code result that exception , it all because of the wrong using of ?KryoSerializer , when i use java serializer , it will be ok . may be i should register?BoundedPriorityQueue[(String,(Int,Int))] in Kryo like that? ?kryo.register(classOf[BoundedPriorityQueue[(String,(Int,Int))]]);
mailforledkk
?发件人:?mailforledkk发送时间:?2014-04-09?19:28收件人:?user主题:?RDD top method exception
hi, all?? ? i use rdd top to get the top 5 value from an array, the code like this :
val arr = Array[(String,(Int,Int))](("1232_1231",(1->1)),("12324_1232",(1->1)))
val arr2 = Array[(String,(Int,Int))](("1232_1231",(1->2)),("12324_1232",(1->5)),("12324_12312",(1->5)))
val signalType = ctx.getSparkContext.makeRDD(arr2)
val remainSignalC:RDD[(String,(Int,Int))] = ctx.getSparkContext.makeRDD(arr)//actionContext.getRdd("remainSignalC")
val rec = signalType.union(remainSignalC)
val rec2 = rec.reduceByKey((v1,v2)=>(v1._1+v2._1,v1._2+v2._2))
val top = rec2.top(5)(Ordering.by(f=>f._2._1))
the matter is that :java.lang.ClassCastException: scala.collection.immutable.Nil$ cannot be cast to org.apache.spark.util.BoundedPriorityQueue
at org.apache.spark.rdd.RDD$$anonfun$top$2.apply(RDD.scala:873)
at org.apache.spark.rdd.RDD$$anonfun$6.apply(RDD.scala:671)
at org.apache.spark.rdd.RDD$$anonfun$6.apply(RDD.scala:668)
at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:859)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
while i debug the source code , i find the queue1's value is Nil , now is this a bug of spark ? ? can anyone help me !!!
mailforledkk