You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Abhishek Modi <ab...@gmail.com> on 2016/02/16 10:12:43 UTC
Unusually large deserialisation time
I'm doing a mapPartitions on a rdd cached in memory followed by a reduce.
Here is my code snippet
// myRdd is an rdd consisting of Tuple2[Int,Long]
myRdd.mapPartitions(rangify).reduce( (x,y) => (x._1+y._1,x._2 ++ y._2))
//The rangify function
def rangify(l: Iterator[ Tuple2[Int,Long] ]) : Iterator[ Tuple2[Long, List
[ ArrayBuffer[ Tuple2[Long,Long] ] ] ] ]= {
var sum=0L
val mylist=ArrayBuffer[ Tuple2[Long,Long] ]()
if(l.isEmpty)
return List( (0L,List [ ArrayBuffer[ Tuple2[Long,Long] ] ]
())).toIterator
var prev= -1000L
var begin= -1000L
for (x <- l){
sum+=x._1
if(prev<0){
prev=x._2
begin=x._2
}
else if(x._2==prev+1)
prev=x._2
else {
list+=((begin,prev))
prev=x._2
begin=x._2
}
}
mylist+= ((begin,prev))
List((sum, List(mylist) ) ).toIterator
}
The rdd is cached in memory. I'm using 20 executors with 1 core for each
executor. The cached rdd has 60 blocks. The problem is for every 2-3 runs
of the job, there is a task which has an abnormally large deserialisation
time. Screenshot attached
Thank you,
Abhishek