You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by mailforledkk <ma...@126.com> on 2014/04/10 13:20:43 UTC

回复: RDD top method exception








hi ?, all?? ? i found the reason why my code result that exception , it all because of the wrong using of ?KryoSerializer , when i use java serializer , it will be ok . may be i should register?BoundedPriorityQueue[(String,(Int,Int))] in Kryo like that? ?kryo.register(classOf[BoundedPriorityQueue[(String,(Int,Int))]]);

mailforledkk
?发件人:?mailforledkk发送时间:?2014-04-09?19:28收件人:?user主题:?RDD top method exception
hi, all?? ? i use rdd top to get the top 5 value from an array, the code like this :
val arr = Array[(String,(Int,Int))](("1232_1231",(1->1)),("12324_1232",(1->1)))

    val arr2 = Array[(String,(Int,Int))](("1232_1231",(1->2)),("12324_1232",(1->5)),("12324_12312",(1->5)))

    val signalType = ctx.getSparkContext.makeRDD(arr2)

    val remainSignalC:RDD[(String,(Int,Int))] = ctx.getSparkContext.makeRDD(arr)//actionContext.getRdd("remainSignalC")

    val rec = signalType.union(remainSignalC)

    val rec2 = rec.reduceByKey((v1,v2)=>(v1._1+v2._1,v1._2+v2._2))

    val top = rec2.top(5)(Ordering.by(f=>f._2._1))




the matter is that :java.lang.ClassCastException: scala.collection.immutable.Nil$ cannot be cast to org.apache.spark.util.BoundedPriorityQueue

	at org.apache.spark.rdd.RDD$$anonfun$top$2.apply(RDD.scala:873)

	at org.apache.spark.rdd.RDD$$anonfun$6.apply(RDD.scala:671)

	at org.apache.spark.rdd.RDD$$anonfun$6.apply(RDD.scala:668)

	at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)

	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:859)

	at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)

	at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)

	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)

	at akka.actor.ActorCell.invoke(ActorCell.scala:456)

	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)

	at akka.dispatch.Mailbox.run(Mailbox.scala:219)

	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



while i debug the source code , i find the queue1's value is Nil , now is this a bug of spark ? ? can anyone help me !!!
mailforledkk