You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aureliano Buendia <bu...@gmail.com> on 2014/01/03 17:48:40 UTC

Turning kryo on does not decrease binary output

Hi,

I'm trying to call saveAsObjectFile() on an RDD[*(Int, Int, Double Double)*],
expecting the output binary to be smaller, but it is exactly the same size
of when kryo is not on.

I've checked the log, and there is no trace of kryo related errors.

The code looks something like:

class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.setRegistrationRequired(true)
    kryo.register(classOf[*(Int, Int, Double Double)*])
  }
}
System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "MyRegistrator")

At the end, I tried to call:

kryo.setRegistrationRequired(*true*)

to make sure my class gets registered. But I found errors like:

Exception in thread "DAGScheduler" com.esotericsoftware.kryo.KryoException:
java.lang.IllegalArgumentException: Class is not registered:
*scala.math.Numeric$IntIsIntegral$*
Note: To register this class use:
kryo.register(scala.math.Numeric$IntIsIntegral$.class);

It appears many scala internal types have to be registered in order to have
full kryo support.

Any idea why my simple tuple type should not get kryo benefits?

Re: Turning kryo on does not decrease binary output

Posted by Aureliano Buendia <bu...@gmail.com>.
Andrew, according to http://stackoverflow.com/a/17241273/1136722 , what you
described is the old way of doing this.


On Fri, Jan 3, 2014 at 5:43 PM, Andrew Ash <an...@andrewash.com> wrote:

> For hadoop properties I find the most reliable way to be to set them on a
> Configuration object and use a method on SparkContext that accepts that
> conf object.
>
> From working code:
>
> import org.apache.hadoop.io.LongWritable
> import org.apache.hadoop.io.Text
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
>
> def nlLZOfile(path: String) = {
>     val conf = new Configuration
>     conf.set("textinputformat.record.delimiter", "\n")
>     sc.newAPIHadoopFile(path,
> classOf[com.hadoop.mapreduce.LzoTextInputFormat], classOf[LongWritable],
> classOf[Text], conf)
>       .map(_._2.toString)
> }
>
>
> On Fri, Jan 3, 2014 at 12:34 PM, Aureliano Buendia <bu...@gmail.com>wrote:
>
>> Thanks for clarifying this.
>>
>> I tried setting hadoop properties before constructing SparkContext, but
>> it had no effect.
>>
>> Where is the right place to set these properties?
>>
>>
>> On Fri, Jan 3, 2014 at 4:56 PM, Guillaume Pitel <
>> guillaume.pitel@exensa.com> wrote:
>>
>>>  Hi,
>>>
>>> I believe Kryo is only use during RDD serialization (i.e. communication
>>> between nodes), not for saving. If you want to compress output, you can use
>>> GZip or snappy codec like that :
>>>
>>> val codec = "org.apache.hadoop.io.compress.SnappyCodec" // for snappy
>>> val codec = "org.apache.hadoop.io.compress.GzipCodec" // for gzip
>>>
>>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress",
>>> "true")
>>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.codec",
>>> codec)
>>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.type",
>>> "BLOCK")
>>>
>>> (That's for HDP2, for HDP1, the keys are different)
>>> Regards
>>> Guillaume
>>>
>>>   Hi,
>>>
>>>  I'm trying to call saveAsObjectFile() on an RDD[*(Int, Int, Double
>>> Double)*], expecting the output binary to be smaller, but it is exactly
>>> the same size of when kryo is not on.
>>>
>>>  I've checked the log, and there is no trace of kryo related errors.
>>>
>>>  The code looks something like:
>>>
>>> class MyRegistrator extends KryoRegistrator {
>>>   override def registerClasses(kryo: Kryo) {
>>>     kryo.setRegistrationRequired(true)
>>>     kryo.register(classOf[*(Int, Int, Double Double)*])
>>>   }
>>> }
>>>  System.setProperty("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>> System.setProperty("spark.kryo.registrator", "MyRegistrator")
>>>
>>>  At the end, I tried to call:
>>>
>>> kryo.setRegistrationRequired(*true*)
>>>
>>>  to make sure my class gets registered. But I found errors like:
>>>
>>> Exception in thread "DAGScheduler"
>>> com.esotericsoftware.kryo.KryoException:
>>> java.lang.IllegalArgumentException: Class is not registered:
>>> *scala.math.Numeric$IntIsIntegral$*
>>> Note: To register this class use:
>>> kryo.register(scala.math.Numeric$IntIsIntegral$.class);
>>>
>>>  It appears many scala internal types have to be registered in order to
>>> have full kryo support.
>>>
>>>  Any idea why my simple tuple type should not get kryo benefits?
>>>
>>>
>>>
>>> --
>>>    [image: eXenSa]
>>>  *Guillaume PITEL, Président*
>>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>>
>>>  eXenSa S.A.S. <http://www.exensa.com/>
>>>  41, rue Périer - 92120 Montrouge - FRANCE
>>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>>
>>
>>
>

Re: Turning kryo on does not decrease binary output

Posted by Andrew Ash <an...@andrewash.com>.
For hadoop properties I find the most reliable way to be to set them on a
Configuration object and use a method on SparkContext that accepts that
conf object.

>From working code:

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

def nlLZOfile(path: String) = {
    val conf = new Configuration
    conf.set("textinputformat.record.delimiter", "\n")
    sc.newAPIHadoopFile(path,
classOf[com.hadoop.mapreduce.LzoTextInputFormat], classOf[LongWritable],
classOf[Text], conf)
      .map(_._2.toString)
}


On Fri, Jan 3, 2014 at 12:34 PM, Aureliano Buendia <bu...@gmail.com>wrote:

> Thanks for clarifying this.
>
> I tried setting hadoop properties before constructing SparkContext, but it
> had no effect.
>
> Where is the right place to set these properties?
>
>
> On Fri, Jan 3, 2014 at 4:56 PM, Guillaume Pitel <
> guillaume.pitel@exensa.com> wrote:
>
>>  Hi,
>>
>> I believe Kryo is only use during RDD serialization (i.e. communication
>> between nodes), not for saving. If you want to compress output, you can use
>> GZip or snappy codec like that :
>>
>> val codec = "org.apache.hadoop.io.compress.SnappyCodec" // for snappy
>> val codec = "org.apache.hadoop.io.compress.GzipCodec" // for gzip
>>
>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress",
>> "true")
>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.codec",
>> codec)
>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.type",
>> "BLOCK")
>>
>> (That's for HDP2, for HDP1, the keys are different)
>> Regards
>> Guillaume
>>
>>   Hi,
>>
>>  I'm trying to call saveAsObjectFile() on an RDD[*(Int, Int, Double
>> Double)*], expecting the output binary to be smaller, but it is exactly
>> the same size of when kryo is not on.
>>
>>  I've checked the log, and there is no trace of kryo related errors.
>>
>>  The code looks something like:
>>
>> class MyRegistrator extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo) {
>>     kryo.setRegistrationRequired(true)
>>     kryo.register(classOf[*(Int, Int, Double Double)*])
>>   }
>> }
>>  System.setProperty("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> System.setProperty("spark.kryo.registrator", "MyRegistrator")
>>
>>  At the end, I tried to call:
>>
>> kryo.setRegistrationRequired(*true*)
>>
>>  to make sure my class gets registered. But I found errors like:
>>
>> Exception in thread "DAGScheduler"
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.IllegalArgumentException: Class is not registered:
>> *scala.math.Numeric$IntIsIntegral$*
>> Note: To register this class use:
>> kryo.register(scala.math.Numeric$IntIsIntegral$.class);
>>
>>  It appears many scala internal types have to be registered in order to
>> have full kryo support.
>>
>>  Any idea why my simple tuple type should not get kryo benefits?
>>
>>
>>
>> --
>>    [image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>
>>  eXenSa S.A.S. <http://www.exensa.com/>
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>

Re: Turning kryo on does not decrease binary output

Posted by Aureliano Buendia <bu...@gmail.com>.
it seems saveAsObjectFile(), saveAsSequenceFile() and saveAsHadoopFile()
are written in a rather dirty and inconsistent way.

saveAsObjectFile calls saveAsSequenceFile, but does not pass the codec:

def saveAsObjectFile(path: String) {
    this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
      .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
      .*saveAsSequenceFile*(path)
  }

so the codec is set to None:

def saveAsSequenceFile(path: String, *codec*: Option[Class[_ <:
CompressionCodec]] = None) {
    ...
      self.*saveAsHadoopFile*(path, keyClass, valueClass, format, jobConf,
*codec*)
    ...
  }

saveAsHadoopFile only applies compression when the codec is available, and
it does not seem to respect the global hadoop compression properties:

def saveAsHadoopFile(
      path: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: OutputFormat[_, _]],
      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
      codec: Option[Class[_ <: CompressionCodec]] = None) {
    conf.setOutputKeyClass(keyClass)
    conf.setOutputValueClass(valueClass)
    // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9
due to what may be a generics bug
    conf.set("mapred.output.format.class", outputFormatClass.getName)






*for (c <- codec) {      conf.setCompressMapOutput(true)
conf.set("mapred.output.compress", "true")
conf.setMapOutputCompressorClass(c)
conf.set("mapred.output.compression.codec", c.getCanonicalName)
conf.set("mapred.output.compression.type",
CompressionType.BLOCK.toString)    }*
    conf.setOutputCommitter(classOf[FileOutputCommitter])
    FileOutputFormat.setOutputPath(conf,
SparkHadoopWriter.createPathFromString(path, conf))
    saveAsHadoopDataset(conf)
  }



On Fri, Jan 3, 2014 at 9:49 PM, Aureliano Buendia <bu...@gmail.com>wrote:

>
>
>
> On Fri, Jan 3, 2014 at 8:25 PM, Guillaume Pitel <
> guillaume.pitel@exensa.com> wrote:
>
>>  Have you tried with the mapred.* properties ? If saveAsObjectFile uses
>> saveAsSequenceFile, maybe it uses the old API ?
>>
>
> None of spark.hadoop.mapred.* and spark.hadoop.mapreduce.* approaches
> cause compression with saveAsObject. (Using spark 0.8.1)
>
>
>>
>> Guillaume
>>
>>   But why is that hadoop compression doesn't work for saveAsObject(),
>> but it does work (according to Guillaume) for saveAsHadoopFile()?
>>
>>
>> --
>>    [image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>
>>  eXenSa S.A.S. <http://www.exensa.com/>
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>

Re: Turning kryo on does not decrease binary output

Posted by Aureliano Buendia <bu...@gmail.com>.
On Fri, Jan 3, 2014 at 8:25 PM, Guillaume Pitel
<gu...@exensa.com>wrote:

>  Have you tried with the mapred.* properties ? If saveAsObjectFile uses
> saveAsSequenceFile, maybe it uses the old API ?
>

None of spark.hadoop.mapred.* and spark.hadoop.mapreduce.* approaches cause
compression with saveAsObject. (Using spark 0.8.1)


>
> Guillaume
>
>   But why is that hadoop compression doesn't work for saveAsObject(), but
> it does work (according to Guillaume) for saveAsHadoopFile()?
>
>
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>
>  eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>

Re: Turning kryo on does not decrease binary output

Posted by Guillaume Pitel <gu...@exensa.com>.
Have you tried with the mapred.* properties ? If saveAsObjectFile uses
saveAsSequenceFile, maybe it uses the old API ?

Guillaume
> But why is that hadoop compression doesn't work for saveAsObject(), but it
> does work (according to Guillaume) for saveAsHadoopFile()?
>

-- 
eXenSa

	
*Guillaume PITEL, Président*
+33(0)6 25 48 86 80 / +33(0)9 70 44 67 53

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


Re: Turning kryo on does not decrease binary output

Posted by Aureliano Buendia <bu...@gmail.com>.
On Fri, Jan 3, 2014 at 7:41 PM, Imran Rashid <im...@quantifind.com> wrote:

> I think a lot of the confusion is cleared up with a quick look at the code:
>
>
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L901
>
> saveAsObjectFile is just a thin wrapper around saveAsSequenceFile, which
> makes a null key and calls the java serializer.
>
> if you want to use kryo, just do the same thing yourself, but use the kryo
> serializer in place of the java one.
>

Thanks!

But why is that hadoop compression doesn't work for saveAsObject(), but it
does work (according to Guillaume) for saveAsHadoopFile()?


>
>
>
>
> On Fri, Jan 3, 2014 at 1:33 PM, Aureliano Buendia <bu...@gmail.com>wrote:
>
>>
>>
>>
>> On Fri, Jan 3, 2014 at 7:26 PM, Guillaume Pitel <
>> guillaume.pitel@exensa.com> wrote:
>>
>>>  Actually, the interesting part in hadoop files is the sequencefile
>>> format which allows to split the data in various blocks. Other files in
>>> HDFS are single-blocks. They do not scale
>>>
>>
>> But the output of saveAsObjectFile looks like: part-00000, part-00001,
>> part-00002,... . It does output split data, making it scalable, no?
>>
>>
>>>
>>> An ObjectFile cannot be naturally splitted.
>>>
>>> Usually, in Hadoop when storing a sequence of elements instead of a
>>> sequence of key,value the trick is to store key,null
>>>
>>> I don't know what's the most effective way to do that in scala/spark.
>>> Actually that would be a good thing to add it to RDD[U]
>>>
>>> Guillaume
>>>
>>>
>>>
>>>
>>> On Fri, Jan 3, 2014 at 7:10 PM, Andrew Ash <an...@andrewash.com> wrote:
>>>
>>>> saveAsHadoopFile and saveAsNewAPIHadoopFile are on PairRDDFunctions
>>>> which uses some Scala magic to become available when you have an that's
>>>> RDD[Key, Value]
>>>>
>>>>
>>>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L648
>>>>
>>>
>>>  I see. So if my data is of RDD[Value] type, I cannot use compression?
>>> Why does it have to be of RDD[Key, Value] in order to save it in hadoop?
>>>
>>>  Also, doesn't saveAsObjectFile("hdfs://...") save data in hadoop? This
>>> is confusing.
>>>
>>>  I'm only interested in saving data on s3 ("s3n://..."), does it matter
>>> if I use saveAsHadoopFile, or saveAsObjectFile?
>>>
>>>
>>>>
>>>>
>>> --
>>>    [image: eXenSa]
>>>  *Guillaume PITEL, Président*
>>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>>
>>>  eXenSa S.A.S. <http://www.exensa.com/>
>>>  41, rue Périer - 92120 Montrouge - FRANCE
>>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>>
>>
>>
>

Re: Turning kryo on does not decrease binary output

Posted by Imran Rashid <im...@quantifind.com>.
I think a lot of the confusion is cleared up with a quick look at the code:

https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L901

saveAsObjectFile is just a thin wrapper around saveAsSequenceFile, which
makes a null key and calls the java serializer.

if you want to use kryo, just do the same thing yourself, but use the kryo
serializer in place of the java one.




On Fri, Jan 3, 2014 at 1:33 PM, Aureliano Buendia <bu...@gmail.com>wrote:

>
>
>
> On Fri, Jan 3, 2014 at 7:26 PM, Guillaume Pitel <
> guillaume.pitel@exensa.com> wrote:
>
>>  Actually, the interesting part in hadoop files is the sequencefile
>> format which allows to split the data in various blocks. Other files in
>> HDFS are single-blocks. They do not scale
>>
>
> But the output of saveAsObjectFile looks like: part-00000, part-00001,
> part-00002,... . It does output split data, making it scalable, no?
>
>
>>
>> An ObjectFile cannot be naturally splitted.
>>
>> Usually, in Hadoop when storing a sequence of elements instead of a
>> sequence of key,value the trick is to store key,null
>>
>> I don't know what's the most effective way to do that in scala/spark.
>> Actually that would be a good thing to add it to RDD[U]
>>
>> Guillaume
>>
>>
>>
>>
>> On Fri, Jan 3, 2014 at 7:10 PM, Andrew Ash <an...@andrewash.com> wrote:
>>
>>> saveAsHadoopFile and saveAsNewAPIHadoopFile are on PairRDDFunctions
>>> which uses some Scala magic to become available when you have an that's
>>> RDD[Key, Value]
>>>
>>>
>>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L648
>>>
>>
>>  I see. So if my data is of RDD[Value] type, I cannot use compression?
>> Why does it have to be of RDD[Key, Value] in order to save it in hadoop?
>>
>>  Also, doesn't saveAsObjectFile("hdfs://...") save data in hadoop? This
>> is confusing.
>>
>>  I'm only interested in saving data on s3 ("s3n://..."), does it matter
>> if I use saveAsHadoopFile, or saveAsObjectFile?
>>
>>
>>>
>>>
>> --
>>    [image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>
>>  eXenSa S.A.S. <http://www.exensa.com/>
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>

Re: Turning kryo on does not decrease binary output

Posted by Guillaume Pitel <gu...@exensa.com>.
I thought it didn't split the files. Seems I'm wrong. Maybe it's a matter of
size then.

In this case, yes it's scalable. After all it's a RDD initially.


> On Fri, Jan 3, 2014 at 7:26 PM, Guillaume Pitel <guillaume.pitel@exensa.com
> <ma...@exensa.com>> wrote:
>
>     Actually, the interesting part in hadoop files is the sequencefile format
>     which allows to split the data in various blocks. Other files in HDFS are
>     single-blocks. They do not scale
>
>
> But the output of saveAsObjectFile looks like: part-00000, part-00001,
> part-00002,... . It does output split data, making it scalable, no?
>  
>

-- 
eXenSa

	
*Guillaume PITEL, Président*
+33(0)6 25 48 86 80 / +33(0)9 70 44 67 53

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


Re: Turning kryo on does not decrease binary output

Posted by Aureliano Buendia <bu...@gmail.com>.
On Fri, Jan 3, 2014 at 7:26 PM, Guillaume Pitel
<gu...@exensa.com>wrote:

>  Actually, the interesting part in hadoop files is the sequencefile
> format which allows to split the data in various blocks. Other files in
> HDFS are single-blocks. They do not scale
>

But the output of saveAsObjectFile looks like: part-00000, part-00001,
part-00002,... . It does output split data, making it scalable, no?


>
> An ObjectFile cannot be naturally splitted.
>
> Usually, in Hadoop when storing a sequence of elements instead of a
> sequence of key,value the trick is to store key,null
>
> I don't know what's the most effective way to do that in scala/spark.
> Actually that would be a good thing to add it to RDD[U]
>
> Guillaume
>
>
>
>
> On Fri, Jan 3, 2014 at 7:10 PM, Andrew Ash <an...@andrewash.com> wrote:
>
>> saveAsHadoopFile and saveAsNewAPIHadoopFile are on PairRDDFunctions which
>> uses some Scala magic to become available when you have an that's RDD[Key,
>> Value]
>>
>>
>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L648
>>
>
>  I see. So if my data is of RDD[Value] type, I cannot use compression?
> Why does it have to be of RDD[Key, Value] in order to save it in hadoop?
>
>  Also, doesn't saveAsObjectFile("hdfs://...") save data in hadoop? This
> is confusing.
>
>  I'm only interested in saving data on s3 ("s3n://..."), does it matter
> if I use saveAsHadoopFile, or saveAsObjectFile?
>
>
>>
>>
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>
>  eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>

Re: Turning kryo on does not decrease binary output

Posted by Guillaume Pitel <gu...@exensa.com>.
Actually, the interesting part in hadoop files is the sequencefile format which
allows to split the data in various blocks. Other files in HDFS are
single-blocks. They do not scale

An ObjectFile cannot be naturally splitted.

Usually, in Hadoop when storing a sequence of elements instead of a sequence of
key,value the trick is to store key,null

I don't know what's the most effective way to do that in scala/spark. Actually
that would be a good thing to add it to RDD[U]

Guillaume
>
>
>
> On Fri, Jan 3, 2014 at 7:10 PM, Andrew Ash <andrew@andrewash.com
> <ma...@andrewash.com>> wrote:
>
>     saveAsHadoopFile and saveAsNewAPIHadoopFile are on PairRDDFunctions which
>     uses some Scala magic to become available when you have an that's RDD[Key,
>     Value]
>
>     https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L648
>
>
> I see. So if my data is of RDD[Value] type, I cannot use compression? Why does
> it have to be of RDD[Key, Value] in order to save it in hadoop?
>
> Also, doesn't saveAsObjectFile("hdfs://...") save data in hadoop? This is
> confusing.
>
> I'm only interested in saving data on s3 ("s3n://..."), does it matter if I
> use saveAsHadoopFile, or saveAsObjectFile?
>  
>
>

-- 
eXenSa

	
*Guillaume PITEL, Président*
+33(0)6 25 48 86 80 / +33(0)9 70 44 67 53

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


Re: Turning kryo on does not decrease binary output

Posted by Aureliano Buendia <bu...@gmail.com>.
On Fri, Jan 3, 2014 at 7:10 PM, Andrew Ash <an...@andrewash.com> wrote:

> saveAsHadoopFile and saveAsNewAPIHadoopFile are on PairRDDFunctions which
> uses some Scala magic to become available when you have an that's RDD[Key,
> Value]
>
>
> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L648
>

I see. So if my data is of RDD[Value] type, I cannot use compression? Why
does it have to be of RDD[Key, Value] in order to save it in hadoop?

Also, doesn't saveAsObjectFile("hdfs://...") save data in hadoop? This is
confusing.

I'm only interested in saving data on s3 ("s3n://..."), does it matter if I
use saveAsHadoopFile, or saveAsObjectFile?


>
> Agreed, something like Chill would make this much easier for the default
> cases.
>

It seems chill is already in use:

https://github.com/apache/incubator-spark/blob/3713f8129a618a633a7aca8c944960c3e7ac9d3b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala#L26

But what we need is something like chill-hadoop:

https://github.com/twitter/chill/tree/develop/chill-hadoop


>
>
> On Fri, Jan 3, 2014 at 2:04 PM, Aureliano Buendia <bu...@gmail.com>wrote:
>
>> RDD only defines saveAsTextFile and saveAsObjectFile. I think
>> saveAsHadoopFile and saveAsNewAPIHadoopFile belong to the older versions.
>>
>> saveAsObjectFile definitely outputs hadoop format.
>>
>> I'm not trying to save big objects by saveAsObjectFile, I'm just trying
>> to minimize the java serialization overhead when saving to a binary file.
>>
>> I can see spark can benefit from something like
>> https://github.com/twitter/chill in this matter.
>>
>>
>> On Fri, Jan 3, 2014 at 6:42 PM, Guillaume Pitel <
>> guillaume.pitel@exensa.com> wrote:
>>
>>>  Hi,
>>>
>>> After a little bit of thinking, I'm not sure anymore if saveAsObjectFile
>>> uses the spark.hadoop.*
>>>
>>> Also, I did write a mistake. The use of *.mapred.* or *.mapreduce.* does
>>> not depend on the hadoop version you use, but onthe API version you use
>>>
>>> So, I can assure you that if you use the saveAsNewAPIHadoopFile, with
>>> the spark.hadoop.mapreduce.* properties, the compression will be used.
>>>
>>> If you use the saveAsHadoopFile, it should be used with mapred.*
>>>
>>> If you use the saveAsObjectFile to a hdfs path, I'm not sure if the
>>> output is compressed.
>>>
>>> Anyway, saveAsObjectFile should be used for small objects, in my opinion.
>>>
>>> Guillaume
>>>
>>>   Even
>>>
>>> someMap.saveAsTextFile("out", classOf[GzipCodec])
>>>
>>>  has no effect.
>>>
>>>  Also, I notices that saving sequence files has no compression option
>>> (my original question was about compressing binary output).
>>>
>>>  Having said this, I still do not understand why kryo cannot be helpful
>>> when saving binary output. Binary output uses java serialization, which has
>>> a pretty hefty overhead.
>>>
>>>  How can kryo be applied to T when calling RDD[T]#saveAsObjectFile()?
>>>
>>>
>>> --
>>>    [image: eXenSa]
>>>  *Guillaume PITEL, Président*
>>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>>
>>>  eXenSa S.A.S. <http://www.exensa.com/>
>>>  41, rue Périer - 92120 Montrouge - FRANCE
>>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>>
>>
>>
>

Re: Turning kryo on does not decrease binary output

Posted by Andrew Ash <an...@andrewash.com>.
saveAsHadoopFile and saveAsNewAPIHadoopFile are on PairRDDFunctions which
uses some Scala magic to become available when you have an that's RDD[Key,
Value]

https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L648

Agreed, something like Chill would make this much easier for the default
cases.


On Fri, Jan 3, 2014 at 2:04 PM, Aureliano Buendia <bu...@gmail.com>wrote:

> RDD only defines saveAsTextFile and saveAsObjectFile. I think
> saveAsHadoopFile and saveAsNewAPIHadoopFile belong to the older versions.
>
> saveAsObjectFile definitely outputs hadoop format.
>
> I'm not trying to save big objects by saveAsObjectFile, I'm just trying to
> minimize the java serialization overhead when saving to a binary file.
>
> I can see spark can benefit from something like
> https://github.com/twitter/chill in this matter.
>
>
> On Fri, Jan 3, 2014 at 6:42 PM, Guillaume Pitel <
> guillaume.pitel@exensa.com> wrote:
>
>>  Hi,
>>
>> After a little bit of thinking, I'm not sure anymore if saveAsObjectFile
>> uses the spark.hadoop.*
>>
>> Also, I did write a mistake. The use of *.mapred.* or *.mapreduce.* does
>> not depend on the hadoop version you use, but onthe API version you use
>>
>> So, I can assure you that if you use the saveAsNewAPIHadoopFile, with the
>> spark.hadoop.mapreduce.* properties, the compression will be used.
>>
>> If you use the saveAsHadoopFile, it should be used with mapred.*
>>
>> If you use the saveAsObjectFile to a hdfs path, I'm not sure if the
>> output is compressed.
>>
>> Anyway, saveAsObjectFile should be used for small objects, in my opinion.
>>
>> Guillaume
>>
>>   Even
>>
>> someMap.saveAsTextFile("out", classOf[GzipCodec])
>>
>>  has no effect.
>>
>>  Also, I notices that saving sequence files has no compression option (my
>> original question was about compressing binary output).
>>
>>  Having said this, I still do not understand why kryo cannot be helpful
>> when saving binary output. Binary output uses java serialization, which has
>> a pretty hefty overhead.
>>
>>  How can kryo be applied to T when calling RDD[T]#saveAsObjectFile()?
>>
>>
>> --
>>    [image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>
>>  eXenSa S.A.S. <http://www.exensa.com/>
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>

Re: Turning kryo on does not decrease binary output

Posted by Aureliano Buendia <bu...@gmail.com>.
RDD only defines saveAsTextFile and saveAsObjectFile. I think
saveAsHadoopFile and saveAsNewAPIHadoopFile belong to the older versions.

saveAsObjectFile definitely outputs hadoop format.

I'm not trying to save big objects by saveAsObjectFile, I'm just trying to
minimize the java serialization overhead when saving to a binary file.

I can see spark can benefit from something like
https://github.com/twitter/chill in this matter.


On Fri, Jan 3, 2014 at 6:42 PM, Guillaume Pitel
<gu...@exensa.com>wrote:

>  Hi,
>
> After a little bit of thinking, I'm not sure anymore if saveAsObjectFile
> uses the spark.hadoop.*
>
> Also, I did write a mistake. The use of *.mapred.* or *.mapreduce.* does
> not depend on the hadoop version you use, but onthe API version you use
>
> So, I can assure you that if you use the saveAsNewAPIHadoopFile, with the
> spark.hadoop.mapreduce.* properties, the compression will be used.
>
> If you use the saveAsHadoopFile, it should be used with mapred.*
>
> If you use the saveAsObjectFile to a hdfs path, I'm not sure if the output
> is compressed.
>
> Anyway, saveAsObjectFile should be used for small objects, in my opinion.
>
> Guillaume
>
>   Even
>
> someMap.saveAsTextFile("out", classOf[GzipCodec])
>
>  has no effect.
>
>  Also, I notices that saving sequence files has no compression option (my
> original question was about compressing binary output).
>
>  Having said this, I still do not understand why kryo cannot be helpful
> when saving binary output. Binary output uses java serialization, which has
> a pretty hefty overhead.
>
>  How can kryo be applied to T when calling RDD[T]#saveAsObjectFile()?
>
>
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>
>  eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>

Re: Turning kryo on does not decrease binary output

Posted by Guillaume Pitel <gu...@exensa.com>.
Hi,

After a little bit of thinking, I'm not sure anymore if saveAsObjectFile uses
the spark.hadoop.*

Also, I did write a mistake. The use of *.mapred.* or *.mapreduce.* does not
depend on the hadoop version you use, but onthe API version you use

So, I can assure you that if you use the saveAsNewAPIHadoopFile, with the
spark.hadoop.mapreduce.* properties, the compression will be used.

If you use the saveAsHadoopFile, it should be used with mapred.*

If you use the saveAsObjectFile to a hdfs path, I'm not sure if the output is
compressed.

Anyway, saveAsObjectFile should be used for small objects, in my opinion.

Guillaume
> Even 
>
> someMap.saveAsTextFile("out", classOf[GzipCodec])
>
> has no effect.
>
> Also, I notices that saving sequence files has no compression option (my
> original question was about compressing binary output).
>
> Having said this, I still do not understand why kryo cannot be helpful when
> saving binary output. Binary output uses java serialization, which has a
> pretty hefty overhead.
>
> How can kryo be applied to T when calling RDD[T]#saveAsObjectFile()?

-- 
eXenSa

	
*Guillaume PITEL, Président*
+33(0)6 25 48 86 80 / +33(0)9 70 44 67 53

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


Re: Turning kryo on does not decrease binary output

Posted by Andrew Ash <an...@andrewash.com>.
Hi Aureliano,

First, check out the documentation the team has written up on using Kryo
here: http://spark.incubator.apache.org/docs/latest/tuning.htmlspecifically
the Data Serialization and Serialized RDD Storage sections.

If you want RDDs to spill over to disk if they don't fit in memory (rather
than be recalculated), then you must use the MEMORY_AND_DISK storage level
-- 
http://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence

That focus is on using Kryo for temporary RDD serialization though, not so
much storing long term binary output.  It sounds like you're going to need
to touch a little of the Hadoop APIs to get this working.

Check out how the saveAsTextFile(path, code) method works:
https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L893

Which calls the saveAsHadoopFile method in a PairRDD:
https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L648

Hope that helps you down the right path,
Andrew





On Fri, Jan 3, 2014 at 1:18 PM, Aureliano Buendia <bu...@gmail.com>wrote:

> Even
>
> someMap.saveAsTextFile("out", classOf[GzipCodec])
>
> has no effect.
>
> Also, I notices that saving sequence files has no compression option (my
> original question was about compressing binary output).
>
> Having said this, I still do not understand why kryo cannot be helpful
> when saving binary output. Binary output uses java serialization, which has
> a pretty hefty overhead.
>
> How can kryo be applied to T when calling RDD[T]#saveAsObjectFile()?
>
>
>
> On Fri, Jan 3, 2014 at 5:58 PM, Guillaume Pitel <
> guillaume.pitel@exensa.com> wrote:
>
>>  That's the right place. Maybe try with HDP1 properties :
>>
>>
>> http://stackoverflow.com/questions/17241185/spark-standalone-mode-how-to-compress-spark-output-written-to-hdfs
>>
>> About your Kryo error, you can use that if you want a coverage of scala
>> types : https://github.com/romix/scala-kryo-serialization
>>
>> Guillaume
>>
>>  Thanks for clarifying this.
>>
>>  I tried setting hadoop properties before constructing SparkContext, but
>> it had no effect.
>>
>>  Where is the right place to set these properties?
>>
>>
>> On Fri, Jan 3, 2014 at 4:56 PM, Guillaume Pitel <
>> guillaume.pitel@exensa.com> wrote:
>>
>>>  Hi,
>>>
>>> I believe Kryo is only use during RDD serialization (i.e. communication
>>> between nodes), not for saving. If you want to compress output, you can use
>>> GZip or snappy codec like that :
>>>
>>> val codec = "org.apache.hadoop.io.compress.SnappyCodec" // for snappy
>>> val codec = "org.apache.hadoop.io.compress.GzipCodec" // for gzip
>>>
>>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress",
>>> "true")
>>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.codec",
>>> codec)
>>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.type",
>>> "BLOCK")
>>>
>>> (That's for HDP2, for HDP1, the keys are different)
>>> Regards
>>> Guillaume
>>>
>>>   Hi,
>>>
>>>  I'm trying to call saveAsObjectFile() on an RDD[*(Int, Int, Double
>>> Double)*], expecting the output binary to be smaller, but it is exactly
>>> the same size of when kryo is not on.
>>>
>>>  I've checked the log, and there is no trace of kryo related errors.
>>>
>>>  The code looks something like:
>>>
>>> class MyRegistrator extends KryoRegistrator {
>>>   override def registerClasses(kryo: Kryo) {
>>>     kryo.setRegistrationRequired(true)
>>>     kryo.register(classOf[*(Int, Int, Double Double)*])
>>>   }
>>> }
>>>  System.setProperty("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>> System.setProperty("spark.kryo.registrator", "MyRegistrator")
>>>
>>>  At the end, I tried to call:
>>>
>>> kryo.setRegistrationRequired(*true*)
>>>
>>>  to make sure my class gets registered. But I found errors like:
>>>
>>> Exception in thread "DAGScheduler"
>>> com.esotericsoftware.kryo.KryoException:
>>> java.lang.IllegalArgumentException: Class is not registered:
>>> *scala.math.Numeric$IntIsIntegral$*
>>> Note: To register this class use:
>>> kryo.register(scala.math.Numeric$IntIsIntegral$.class);
>>>
>>>  It appears many scala internal types have to be registered in order to
>>> have full kryo support.
>>>
>>>  Any idea why my simple tuple type should not get kryo benefits?
>>>
>>>
>>>
>>>   --
>>>    [image: eXenSa]
>>>  *Guillaume PITEL, Président*
>>> +33(0)6 25 48 86 80 <%2B33%280%296%2025%2048%2086%2080> / +33(0)9 70 44
>>> 67 53 <%2B33%280%299%2070%2044%2067%2053>
>>>
>>>  eXenSa S.A.S. <http://www.exensa.com/>
>>>  41, rue Périer - 92120 Montrouge - FRANCE
>>> Tel +33(0)1 84 16 36 77 <%2B33%280%291%2084%2016%2036%2077> / Fax +33(0)9
>>> 72 28 37 05
>>>
>>
>>
>>
>> --
>>    [image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>
>>  eXenSa S.A.S. <http://www.exensa.com/>
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>

Re: Turning kryo on does not decrease binary output

Posted by Aureliano Buendia <bu...@gmail.com>.
Even

someMap.saveAsTextFile("out", classOf[GzipCodec])

has no effect.

Also, I notices that saving sequence files has no compression option (my
original question was about compressing binary output).

Having said this, I still do not understand why kryo cannot be helpful when
saving binary output. Binary output uses java serialization, which has a
pretty hefty overhead.

How can kryo be applied to T when calling RDD[T]#saveAsObjectFile()?



On Fri, Jan 3, 2014 at 5:58 PM, Guillaume Pitel
<gu...@exensa.com>wrote:

>  That's the right place. Maybe try with HDP1 properties :
>
>
> http://stackoverflow.com/questions/17241185/spark-standalone-mode-how-to-compress-spark-output-written-to-hdfs
>
> About your Kryo error, you can use that if you want a coverage of scala
> types : https://github.com/romix/scala-kryo-serialization
>
> Guillaume
>
>  Thanks for clarifying this.
>
>  I tried setting hadoop properties before constructing SparkContext, but
> it had no effect.
>
>  Where is the right place to set these properties?
>
>
> On Fri, Jan 3, 2014 at 4:56 PM, Guillaume Pitel <
> guillaume.pitel@exensa.com> wrote:
>
>>  Hi,
>>
>> I believe Kryo is only use during RDD serialization (i.e. communication
>> between nodes), not for saving. If you want to compress output, you can use
>> GZip or snappy codec like that :
>>
>> val codec = "org.apache.hadoop.io.compress.SnappyCodec" // for snappy
>> val codec = "org.apache.hadoop.io.compress.GzipCodec" // for gzip
>>
>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress",
>> "true")
>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.codec",
>> codec)
>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.type",
>> "BLOCK")
>>
>> (That's for HDP2, for HDP1, the keys are different)
>> Regards
>> Guillaume
>>
>>   Hi,
>>
>>  I'm trying to call saveAsObjectFile() on an RDD[*(Int, Int, Double
>> Double)*], expecting the output binary to be smaller, but it is exactly
>> the same size of when kryo is not on.
>>
>>  I've checked the log, and there is no trace of kryo related errors.
>>
>>  The code looks something like:
>>
>> class MyRegistrator extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo) {
>>     kryo.setRegistrationRequired(true)
>>     kryo.register(classOf[*(Int, Int, Double Double)*])
>>   }
>> }
>>  System.setProperty("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> System.setProperty("spark.kryo.registrator", "MyRegistrator")
>>
>>  At the end, I tried to call:
>>
>> kryo.setRegistrationRequired(*true*)
>>
>>  to make sure my class gets registered. But I found errors like:
>>
>> Exception in thread "DAGScheduler"
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.IllegalArgumentException: Class is not registered:
>> *scala.math.Numeric$IntIsIntegral$*
>> Note: To register this class use:
>> kryo.register(scala.math.Numeric$IntIsIntegral$.class);
>>
>>  It appears many scala internal types have to be registered in order to
>> have full kryo support.
>>
>>  Any idea why my simple tuple type should not get kryo benefits?
>>
>>
>>
>>   --
>>    [image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80 <%2B33%280%296%2025%2048%2086%2080> / +33(0)9 70 44
>> 67 53 <%2B33%280%299%2070%2044%2067%2053>
>>
>>  eXenSa S.A.S. <http://www.exensa.com/>
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 <%2B33%280%291%2084%2016%2036%2077> / Fax +33(0)9
>> 72 28 37 05
>>
>
>
>
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>
>  eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>

Re: Turning kryo on does not decrease binary output

Posted by Guillaume Pitel <gu...@exensa.com>.
That's the right place. Maybe try with HDP1 properties :

http://stackoverflow.com/questions/17241185/spark-standalone-mode-how-to-compress-spark-output-written-to-hdfs

About your Kryo error, you can use that if you want a coverage of scala types :
https://github.com/romix/scala-kryo-serialization

Guillaume
> Thanks for clarifying this.
>
> I tried setting hadoop properties before constructing SparkContext, but it had
> no effect.
>
> Where is the right place to set these properties?
>
>
> On Fri, Jan 3, 2014 at 4:56 PM, Guillaume Pitel <guillaume.pitel@exensa.com
> <ma...@exensa.com>> wrote:
>
>     Hi,
>
>     I believe Kryo is only use during RDD serialization (i.e. communication
>     between nodes), not for saving. If you want to compress output, you can
>     use GZip or snappy codec like that :
>
>     val codec = "org.apache.hadoop.io.compress.SnappyCodec" // for snappy
>     val codec = "org.apache.hadoop.io.compress.GzipCodec" // for gzip
>
>     System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress",
>     "true")
>     System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.codec",
>     codec)
>     System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.type",
>     "BLOCK")
>
>     (That's for HDP2, for HDP1, the keys are different)
>     Regards
>     Guillaume   
>>     Hi,
>>
>>     I'm trying to call saveAsObjectFile() on an RDD[*(Int, Int, Double
>>     Double)*], expecting the output binary to be smaller, but it is exactly
>>     the same size of when kryo is not on.
>>
>>     I've checked the log, and there is no trace of kryo related errors.
>>
>>     The code looks something like:
>>
>>     class MyRegistrator extends KryoRegistrator {
>>       override def registerClasses(kryo: Kryo) {
>>         kryo.setRegistrationRequired(true)
>>         kryo.register(classOf[*(Int, Int, Double Double)*])
>>       }
>>     }
>>     System.setProperty("spark.serializer",
>>     "org.apache.spark.serializer.KryoSerializer")
>>     System.setProperty("spark.kryo.registrator", "MyRegistrator")
>>
>>     At the end, I tried to call:
>>
>>     kryo.setRegistrationRequired(*true*)
>>
>>     to make sure my class gets registered. But I found errors like:
>>
>>     Exception in thread "DAGScheduler"
>>     com.esotericsoftware.kryo.KryoException:
>>     java.lang.IllegalArgumentException: Class is not registered:
>>     *scala.math.Numeric$IntIsIntegral$*
>>     Note: To register this class use:
>>     kryo.register(scala.math.Numeric$IntIsIntegral$.class);
>>
>>     It appears many scala internal types have to be registered in order to
>>     have full kryo support.
>>
>>     Any idea why my simple tuple type should not get kryo benefits?
>>
>
>
>     -- 
>     eXenSa
>
>     	
>     *Guillaume PITEL, Président*
>     +33(0)6 25 48 86 80 <tel:%2B33%280%296%2025%2048%2086%2080> / +33(0)9 70
>     44 67 53 <tel:%2B33%280%299%2070%2044%2067%2053>
>
>     eXenSa S.A.S. <http://www.exensa.com/>
>     41, rue Périer - 92120 Montrouge - FRANCE
>     Tel +33(0)1 84 16 36 77 <tel:%2B33%280%291%2084%2016%2036%2077> / Fax
>     +33(0)9 72 28 37 05
>
>


-- 
eXenSa

	
*Guillaume PITEL, Président*
+33(0)6 25 48 86 80 / +33(0)9 70 44 67 53

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


Re: Turning kryo on does not decrease binary output

Posted by Aureliano Buendia <bu...@gmail.com>.
Thanks for clarifying this.

I tried setting hadoop properties before constructing SparkContext, but it
had no effect.

Where is the right place to set these properties?


On Fri, Jan 3, 2014 at 4:56 PM, Guillaume Pitel
<gu...@exensa.com>wrote:

>  Hi,
>
> I believe Kryo is only use during RDD serialization (i.e. communication
> between nodes), not for saving. If you want to compress output, you can use
> GZip or snappy codec like that :
>
> val codec = "org.apache.hadoop.io.compress.SnappyCodec" // for snappy
> val codec = "org.apache.hadoop.io.compress.GzipCodec" // for gzip
>
> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress",
> "true")
> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.codec",
> codec)
> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.type",
> "BLOCK")
>
> (That's for HDP2, for HDP1, the keys are different)
> Regards
> Guillaume
>
>   Hi,
>
>  I'm trying to call saveAsObjectFile() on an RDD[*(Int, Int, Double
> Double)*], expecting the output binary to be smaller, but it is exactly
> the same size of when kryo is not on.
>
>  I've checked the log, and there is no trace of kryo related errors.
>
>  The code looks something like:
>
> class MyRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
>     kryo.setRegistrationRequired(true)
>     kryo.register(classOf[*(Int, Int, Double Double)*])
>   }
> }
>  System.setProperty("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> System.setProperty("spark.kryo.registrator", "MyRegistrator")
>
>  At the end, I tried to call:
>
> kryo.setRegistrationRequired(*true*)
>
>  to make sure my class gets registered. But I found errors like:
>
> Exception in thread "DAGScheduler"
> com.esotericsoftware.kryo.KryoException:
> java.lang.IllegalArgumentException: Class is not registered:
> *scala.math.Numeric$IntIsIntegral$*
> Note: To register this class use:
> kryo.register(scala.math.Numeric$IntIsIntegral$.class);
>
>  It appears many scala internal types have to be registered in order to
> have full kryo support.
>
>  Any idea why my simple tuple type should not get kryo benefits?
>
>
>
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>
>  eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>

Re: Turning kryo on does not decrease binary output

Posted by Guillaume Pitel <gu...@exensa.com>.
Hi,

I believe Kryo is only use during RDD serialization (i.e. communication between
nodes), not for saving. If you want to compress output, you can use GZip or
snappy codec like that :

val codec = "org.apache.hadoop.io.compress.SnappyCodec" // for snappy
val codec = "org.apache.hadoop.io.compress.GzipCodec" // for gzip

System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress",
"true")
System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.codec",
codec)
System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.type",
"BLOCK")

(That's for HDP2, for HDP1, the keys are different)
Regards
Guillaume   
> Hi,
>
> I'm trying to call saveAsObjectFile() on an RDD[*(Int, Int, Double Double)*],
> expecting the output binary to be smaller, but it is exactly the same size of
> when kryo is not on.
>
> I've checked the log, and there is no trace of kryo related errors.
>
> The code looks something like:
>
> class MyRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
>     kryo.setRegistrationRequired(true)
>     kryo.register(classOf[*(Int, Int, Double Double)*])
>   }
> }
> System.setProperty("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> System.setProperty("spark.kryo.registrator", "MyRegistrator")
>
> At the end, I tried to call:
>
> kryo.setRegistrationRequired(*true*)
>
> to make sure my class gets registered. But I found errors like:
>
> Exception in thread "DAGScheduler" com.esotericsoftware.kryo.KryoException:
> java.lang.IllegalArgumentException: Class is not registered:
> *scala.math.Numeric$IntIsIntegral$*
> Note: To register this class use:
> kryo.register(scala.math.Numeric$IntIsIntegral$.class);
>
> It appears many scala internal types have to be registered in order to have
> full kryo support.
>
> Any idea why my simple tuple type should not get kryo benefits?
>


-- 
eXenSa

	
*Guillaume PITEL, Président*
+33(0)6 25 48 86 80 / +33(0)9 70 44 67 53

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05