You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by yash datta <sa...@gmail.com> on 2015/08/31 12:44:51 UTC

KryoSerializer for closureSerializer in DAGScheduler

Hi devs,

Curently the only supported serializer for serializing tasks in
DAGScheduler.scala is JavaSerializer.


val taskBinaryBytes: Array[Byte] = stage match {
  case stage: ShuffleMapStage =>
    closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
  case stage: ResultStage =>
    closureSerializer.serialize((stage.rdd,
stage.resultOfJob.get.func) : AnyRef).array()
  }

taskBinary = sc.broadcast(taskBinaryBytes)


Could somebody give me pointers as to what all is involved if we want to
change it to KryoSerializer ?



One suggestion here

http://apache-spark-developers-list.1001551.n3.nabble.com/bug-using-kryo-as-closure-serializer-td6473.html

 was to use chill-scala ' s KryoSerializer
for closureSerializer :

private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()



But on digging the code it looks like KryoSerializer being used is from
twitter chill library only.

in KryoSerializer.scala :

val instantiator = new EmptyScalaKryoInstantiator
val kryo = instantiator.newKryo()

----------------------------------------

package com.twitter.chill
class EmptyScalaKryoInstantiator() extends com.twitter.chill.KryoInstantiator {
  override def newKryo() : com.twitter.chill.KryoBase = { /* compiled code */ }
}



I am working on a low latency job and much of the time is spent in
serializing result stage rdd (~140 ms ) and the serialized size is 2.8 mb.
Thoughts ? Is this reasonable ? Wanted to check if shifting to
kryoserializer helps here.

I am serializing a UnionRDD which is created by code like this :


rdds here is a list of schemaRDDs


val condition = 'column === indexValue

val selectFields = UnresolvedAttribute("ts") :: fieldClass.selectFields

val sddpp = rdds.par.map(x => x.where(condition).select(selectFields: _*))



val rddpp = sddpp.map(x => new PartitionPruningRDD(x, partitioner.func))


val unioned = new UnionRDD(sqlContext.sparkContext, rddpp.toList)


My partitioner above selects one partition (from 100 partitions) per RDD
from the list of RDDs passed to UnionRDD, and UnionRDD finally created has
127 partitions

Calling unioned.collect leads to serialization of UnionRDD.

I am using spark 1.2.1


Any help regarding this will be highly appreciated.


Best
Yash Datta


-- 
When events unfold with calm and ease
When the winds that blow are merely breeze
Learn from nature, from birds and bees
Live your life in love, and let joy not cease.

Re: KryoSerializer for closureSerializer in DAGScheduler

Posted by yash datta <sa...@gmail.com>.
Thanks josh ... i'll take a look
On 31 Aug 2015 19:21, "Josh Rosen" <ro...@gmail.com> wrote:

> There are currently a few known issues with using KryoSerializer as the
> closure serializer, so it's going to require some changes to Spark if we
> want to properly support this. See
> https://github.com/apache/spark/pull/6361 and
> https://issues.apache.org/jira/browse/SPARK-7708 for some discussion of
> the difficulties here.
>
> On Mon, Aug 31, 2015 at 3:44 AM, yash datta <sa...@gmail.com> wrote:
>
>> Hi devs,
>>
>> Curently the only supported serializer for serializing tasks in
>> DAGScheduler.scala is JavaSerializer.
>>
>>
>> val taskBinaryBytes: Array[Byte] = stage match {
>>   case stage: ShuffleMapStage =>
>>     closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
>>   case stage: ResultStage =>
>>     closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()
>>   }
>>
>> taskBinary = sc.broadcast(taskBinaryBytes)
>>
>>
>> Could somebody give me pointers as to what all is involved if we want to
>> change it to KryoSerializer ?
>>
>>
>>
>> One suggestion here
>>
>>
>> http://apache-spark-developers-list.1001551.n3.nabble.com/bug-using-kryo-as-closure-serializer-td6473.html
>>
>>  was to use chill-scala ' s KryoSerializer
>> for closureSerializer :
>>
>> private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
>>
>>
>>
>> But on digging the code it looks like KryoSerializer being used is from
>> twitter chill library only.
>>
>> in KryoSerializer.scala :
>>
>> val instantiator = new EmptyScalaKryoInstantiator
>> val kryo = instantiator.newKryo()
>>
>> ----------------------------------------
>>
>> package com.twitter.chill
>> class EmptyScalaKryoInstantiator() extends com.twitter.chill.KryoInstantiator {
>>   override def newKryo() : com.twitter.chill.KryoBase = { /* compiled code */ }
>> }
>>
>>
>>
>> I am working on a low latency job and much of the time is spent in
>> serializing result stage rdd (~140 ms ) and the serialized size is 2.8 mb.
>> Thoughts ? Is this reasonable ? Wanted to check if shifting to
>> kryoserializer helps here.
>>
>> I am serializing a UnionRDD which is created by code like this :
>>
>>
>> rdds here is a list of schemaRDDs
>>
>>
>> val condition = 'column === indexValue
>>
>> val selectFields = UnresolvedAttribute("ts") :: fieldClass.selectFields
>>
>> val sddpp = rdds.par.map(x => x.where(condition).select(selectFields: _*))
>>
>>
>>
>> val rddpp = sddpp.map(x => new PartitionPruningRDD(x, partitioner.func))
>>
>>
>> val unioned = new UnionRDD(sqlContext.sparkContext, rddpp.toList)
>>
>>
>> My partitioner above selects one partition (from 100 partitions) per RDD
>> from the list of RDDs passed to UnionRDD, and UnionRDD finally created has
>> 127 partitions
>>
>> Calling unioned.collect leads to serialization of UnionRDD.
>>
>> I am using spark 1.2.1
>>
>>
>> Any help regarding this will be highly appreciated.
>>
>>
>> Best
>> Yash Datta
>>
>>
>> --
>> When events unfold with calm and ease
>> When the winds that blow are merely breeze
>> Learn from nature, from birds and bees
>> Live your life in love, and let joy not cease.
>>
>
>

Re: KryoSerializer for closureSerializer in DAGScheduler

Posted by Josh Rosen <ro...@gmail.com>.
There are currently a few known issues with using KryoSerializer as the
closure serializer, so it's going to require some changes to Spark if we
want to properly support this. See https://github.com/apache/spark/pull/6361
and https://issues.apache.org/jira/browse/SPARK-7708 for some discussion of
the difficulties here.

On Mon, Aug 31, 2015 at 3:44 AM, yash datta <sa...@gmail.com> wrote:

> Hi devs,
>
> Curently the only supported serializer for serializing tasks in
> DAGScheduler.scala is JavaSerializer.
>
>
> val taskBinaryBytes: Array[Byte] = stage match {
>   case stage: ShuffleMapStage =>
>     closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
>   case stage: ResultStage =>
>     closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()
>   }
>
> taskBinary = sc.broadcast(taskBinaryBytes)
>
>
> Could somebody give me pointers as to what all is involved if we want to
> change it to KryoSerializer ?
>
>
>
> One suggestion here
>
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/bug-using-kryo-as-closure-serializer-td6473.html
>
>  was to use chill-scala ' s KryoSerializer
> for closureSerializer :
>
> private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
>
>
>
> But on digging the code it looks like KryoSerializer being used is from
> twitter chill library only.
>
> in KryoSerializer.scala :
>
> val instantiator = new EmptyScalaKryoInstantiator
> val kryo = instantiator.newKryo()
>
> ----------------------------------------
>
> package com.twitter.chill
> class EmptyScalaKryoInstantiator() extends com.twitter.chill.KryoInstantiator {
>   override def newKryo() : com.twitter.chill.KryoBase = { /* compiled code */ }
> }
>
>
>
> I am working on a low latency job and much of the time is spent in
> serializing result stage rdd (~140 ms ) and the serialized size is 2.8 mb.
> Thoughts ? Is this reasonable ? Wanted to check if shifting to
> kryoserializer helps here.
>
> I am serializing a UnionRDD which is created by code like this :
>
>
> rdds here is a list of schemaRDDs
>
>
> val condition = 'column === indexValue
>
> val selectFields = UnresolvedAttribute("ts") :: fieldClass.selectFields
>
> val sddpp = rdds.par.map(x => x.where(condition).select(selectFields: _*))
>
>
>
> val rddpp = sddpp.map(x => new PartitionPruningRDD(x, partitioner.func))
>
>
> val unioned = new UnionRDD(sqlContext.sparkContext, rddpp.toList)
>
>
> My partitioner above selects one partition (from 100 partitions) per RDD
> from the list of RDDs passed to UnionRDD, and UnionRDD finally created has
> 127 partitions
>
> Calling unioned.collect leads to serialization of UnionRDD.
>
> I am using spark 1.2.1
>
>
> Any help regarding this will be highly appreciated.
>
>
> Best
> Yash Datta
>
>
> --
> When events unfold with calm and ease
> When the winds that blow are merely breeze
> Learn from nature, from birds and bees
> Live your life in love, and let joy not cease.
>