You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Vipul Pandey <vi...@gmail.com> on 2014/03/13 01:10:14 UTC

Re: Lzo + Protobuf

Extending this discussion further : 
Anyone able to write out Lzo compressed Protobuf to hdfs (using Elephant Bird - or any other way)? 

I have an RDD that I want written out as it is - but I'm unable to figure out a direct way of doing that. I can convert it to a "PairRDD" or Rdd of "Key" and "Value" instead of just "Value" by force injecting a long and then using the PairRDDFunctions.saveAsNewAPIHadoopFile function 


e.g the RDD I want to write out is 
   
    myRDD : org.apache.spark.rdd.RDD[com.xyz.MyProto] = MappedRDD[14] at map at <console>:42

    val conf = new Job().getConfiguration
    conf.set("io.compression.codecs","com.hadoop.compression.lzo.LzopCodec")
	//wrap it around with ProtobufWritable 
    val protoToWrite = myRDD.map(x => new ProtobufWritable[MyProto](x,new TypeRef[MyProto](x.getClass){})    
	//now add an extra long to make it a KeyValue pair to be able to use PairRDDFunctions
     protoToWrite.map(x => (1L,x)).saveAsNewAPIHadoopFile("/tmp/vipul/temp/proto",classOf[LongWritable],classOf[BinaryWritable[MyProto]],classOf[LzoProtobufBlockOutputFormat[MyProto]],conf);

As you can see this is just a kluge to get things running. Is there a neater way to write out the original "myRDD" as block compressed lzo?

Thanks,
Vipul



On Jan 29, 2014, at 9:40 AM, Issac Buenrostro <bu...@ooyala.com> wrote:

> Good! I'll keep your experience in mind in case we have problems in the future :)
> 
> 
> On Tue, Jan 28, 2014 at 5:55 PM, Vipul Pandey <vi...@gmail.com> wrote:
> I got this to run, maybe in a tad twisted way. Here is what I did to get to read Lzo compressed Protobufs in spark (I'm on 0.8.0) : 
> 
> - I added hadoop's conf folder to spark classpath (in spark-env.sh) in all the nodes and the shell as well - but that didn't help either. So I just added the property in configuration manually : 
>     val conf = new Job().getConfiguration
>     conf.set("io.compression.codecs","com.hadoop.compression.lzo.LzopCodec")
>     val logRecord = sc.newAPIHadoopFile(   filepath,classOf[...],classOf[...],classOf[...], conf)
> This seem to resolve the "No codec found" problem below
>  
> - I use twitter's ElephantBird to read lzo compressed protobufs using MultiInputFormat and read the data out as BinaryWritable. The only additional thing I had to do was to set the classConf in MutiInputFormat class. 
> 
> import com.twitter.elephantbird.mapreduce.input.MultiInputFormat
> import com.twitter.elephantbird.mapreduce.io.BinaryWritable
> 
>     MultiInputFormat.setClassConf(classOf[MyProtoClass],conf)
>     val record = sc.newAPIHadoopFile(   inputpath,classOf[MultiInputFormat[MyProtoClass]],classOf[LongWritable],classOf[BinaryWritable[MyProtoClass]], conf)
> 
> //this gets you the protobuf from BinaryWritable - thereafter you just follow your class structure
>     val protobuf = record.map(_._2.get.getProtobuf)  
> 
> 
> Hope this helps whoever is working with lzo compressed protobufs 
> 
> ~Vipul
> 
> 
> 
> 
> On Jan 22, 2014, at 2:09 PM, Vipul Pandey <vi...@gmail.com> wrote:
> 
>> Issac,
>> 
>> I have all these entries in my core-site.xml and as I mentioned before my Pig jobs are running just fine. And the JAVA_LIBRARY_PATH already points to the lzo lib directory. 
>> Not sure what to change/add and where.
>> 
>> Thanks,
>> Vipul
>> 
>> 
>> 
>> On Jan 22, 2014, at 1:37 PM, Issac Buenrostro <bu...@ooyala.com> wrote:
>> 
>>> You need a core-site.xml file in the classpath with these lines
>>> 
>>> <?xml version="1.0"?>
>>> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
>>> 
>>> <configuration>
>>> 
>>>   <property>
>>>     <name>io.compression.codecs</name>
>>>     <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.SnappyCodec</value>
>>>   </property>
>>>   <property>
>>>     <name>io.compression.codec.lzo.class</name>
>>>     <value>com.hadoop.compression.lzo.LzoCodec</value>
>>>   </property>
>>> 
>>> </configuration>
>>> 
>>> 
>>> I also added both the native libraries path and the path to lzoc library to JAVA_LIBRARY_PATH, but I don't know if this is necessary. This is the command I used in mac:
>>> 
>>> export JAVA_LIBRARY_PATH=/Users/*/hadoop-lzo/target/native/Mac_OS_X-x86_64-64/lib:/usr/local/Cellar/lzo/2.06/lib
>>> 
>>> 
>>> On Wed, Jan 22, 2014 at 12:28 PM, Vipul Pandey <vi...@gmail.com> wrote:
>>> 
>>>> Have you tried looking at the HBase and Cassandra examples under the spark example project? These use custom InputFormats and may provide guidance as to how to go about using the relevant Protobuf inputformat.
>>> 
>>> 
>>> Thanks for the pointer Nick, I will look at it once I get past the LZO stage. 
>>> 
>>> 
>>> Issac,
>>> 
>>> How did you get Spark to use the LZO native libraries. I have a fully functional hadoop deployment with pig and scalding crunching the lzo files. But even after adding the lzo library folder to SPARK_CLASSPATH I get the following error : 
>>> 
>>> java.io.IOException: No codec for file hdfs://abc.xxx.com:8020/path/to/lzo/file.lzo found, cannot run
>>> 	at com.twitter.elephantbird.mapreduce.input.LzoRecordReader.initialize(LzoRecordReader.java:80)
>>> 	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:86)
>>> 
>>> 
>>> 
>>> Thanks
>>> Vipul
>>> 
>>> On Jan 21, 2014, at 9:32 AM, Issac Buenrostro <bu...@ooyala.com> wrote:
>>> 
>>>> Hi Vipul,
>>>> 
>>>> I use something like this to read from LZO compressed text files, it may be helpful:
>>>> 
>>>> import com.twitter.elephantbird.mapreduce.input.LzoTextInputFormat
>>>> import org.apache.hadoop.io.{LongWritable, Text}
>>>> import org.apache.hadoop.mapreduce.Job
>>>> 
>>>> val sc = new SparkContext(sparkMaster, "lzoreader", sparkDir, List(config.getString("spark.jar")))
>>>> sc.newAPIHadoopFile(logFile,classOf[LzoTextInputFormat],classOf[LongWritable],classOf[Text], new Job().getConfiguration()).map(line => line._2)
>>>> 
>>>> Additionally I had to compile LZO native libraries, so keep that in mind.
>>>> 
>>>> 
>>>> On Tue, Jan 21, 2014 at 6:57 AM, Nick Pentreath <ni...@gmail.com> wrote:
>>>> Hi Vipul
>>>> 
>>>> Have you tried looking at the HBase and Cassandra examples under the spark example project? These use custom InputFormats and may provide guidance as to how to go about using the relevant Protobuf inputformat.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Mon, Jan 20, 2014 at 11:48 PM, Vipul Pandey <vi...@gmail.com> wrote:
>>>> Any suggestions, anyone? 
>>>> Core team / contributors / spark-developers - any thoughts?
>>>> 
>>>> On Jan 17, 2014, at 4:45 PM, Vipul Pandey <vi...@gmail.com> wrote:
>>>> 
>>>>> Hi All,
>>>>> 
>>>>> Can someone please share (sample) code to read lzo compressed protobufs from hdfs (using elephant bird)? I'm trying whatever I see in the forum and on the web but it doesn't seem comprehensive to me. 
>>>>> 
>>>>> I'm using Spark0.8.0 . My pig scripts are able to read protobuf just fine so the hadoop layer is setup alright.  It will be really helpful if someone can list out what needs to be done with/in spark. 
>>>>> 
>>>>> ~Vipul
>>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> --
>>>> Issac Buenrostro
>>>> Software Engineer | 
>>>> buenrostro@ooyala.com | (617) 997-3350
>>>> www.ooyala.com | blog | @ooyala
>>> 
>>> 
>>> 
>>> 
>>> -- 
>>> --
>>> Issac Buenrostro
>>> Software Engineer | 
>>> buenrostro@ooyala.com | (617) 997-3350
>>> www.ooyala.com | blog | @ooyala
>> 
> 
> 
> 
> 
> -- 
> --
> Issac Buenrostro
> Software Engineer | 
> buenrostro@ooyala.com | (617) 997-3350
> www.ooyala.com | blog | @ooyala