You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Daniel Mahler <dm...@gmail.com> on 2014/11/04 05:04:51 UTC

Cleaning/transforming json befor converting to SchemaRDD

I am trying to convert terabytes of json log files into parquet files.
but I need to clean it a little first.
I end up doing the following

txt = sc.textFile(inpath).coalesce(800)

val json = (for {
         line <- txt
         JObject(child) = parse(line)
         child2 = (for {
           JField(name, value) <- child
           _ <- patt(name) // filter fields with invalid names
         } yield JField(name.toLowerCase, value))
} yield compact(render(JObject(child2))))

sqx.jsonRDD(json, 5e-2).saveAsParquetFile(outpath)

And glaring inefficiency is that after parsing and cleaning the data i
reserialize it
by calling compact(render(JObject(child2)))) only to pass the text
to jsonRDD to be parsed agian. However I see no way  to turn an RDD of
json4s objects directly into a SchemRDD without turning it back into text
first

Is there any way to do this?

I am also open to other suggestions for speeding up the above code,
it is very slow in its current form.

I would also like to make jsonFile drop invalid json records rather than
failing the entire job. Is that possible?

thanks
Daniel

Re: Cleaning/transforming json befor converting to SchemaRDD

Posted by Yin Huai <hu...@gmail.com>.
Hi Daniel,

Right now, you need to do the transformation manually. The feature you need
is under development (https://issues.apache.org/jira/browse/SPARK-4190).

Thanks,

Yin

On Tue, Nov 4, 2014 at 2:44 AM, Gerard Maas <ge...@gmail.com> wrote:

> You could transform the json to a case class instead of serializing it
> back to a String. The resulting RDD[MyCaseClass] is then directly usable as
> a SchemaRDD using the register function implicitly provided by 'import
> sqlContext.schemaRDD'. Then the rest of your pipeline will remain the same.
>
> -kr, Gerard
> On Nov 4, 2014 5:05 AM, "Daniel Mahler" <dm...@gmail.com> wrote:
>
>> I am trying to convert terabytes of json log files into parquet files.
>> but I need to clean it a little first.
>> I end up doing the following
>>
>> txt = sc.textFile(inpath).coalesce(800)
>>
>> val json = (for {
>>          line <- txt
>>          JObject(child) = parse(line)
>>          child2 = (for {
>>            JField(name, value) <- child
>>            _ <- patt(name) // filter fields with invalid names
>>          } yield JField(name.toLowerCase, value))
>> } yield compact(render(JObject(child2))))
>>
>> sqx.jsonRDD(json, 5e-2).saveAsParquetFile(outpath)
>>
>> And glaring inefficiency is that after parsing and cleaning the data i
>> reserialize it
>> by calling compact(render(JObject(child2)))) only to pass the text
>> to jsonRDD to be parsed agian. However I see no way  to turn an RDD of
>> json4s objects directly into a SchemRDD without turning it back into text
>> first
>>
>> Is there any way to do this?
>>
>> I am also open to other suggestions for speeding up the above code,
>> it is very slow in its current form.
>>
>> I would also like to make jsonFile drop invalid json records rather than
>> failing the entire job. Is that possible?
>>
>> thanks
>> Daniel
>>
>>

Re: Cleaning/transforming json befor converting to SchemaRDD

Posted by Gerard Maas <ge...@gmail.com>.
You could transform the json to a case class instead of serializing it back
to a String. The resulting RDD[MyCaseClass] is then directly usable as a
SchemaRDD using the register function implicitly provided by 'import
sqlContext.schemaRDD'. Then the rest of your pipeline will remain the same.

-kr, Gerard
On Nov 4, 2014 5:05 AM, "Daniel Mahler" <dm...@gmail.com> wrote:

> I am trying to convert terabytes of json log files into parquet files.
> but I need to clean it a little first.
> I end up doing the following
>
> txt = sc.textFile(inpath).coalesce(800)
>
> val json = (for {
>          line <- txt
>          JObject(child) = parse(line)
>          child2 = (for {
>            JField(name, value) <- child
>            _ <- patt(name) // filter fields with invalid names
>          } yield JField(name.toLowerCase, value))
> } yield compact(render(JObject(child2))))
>
> sqx.jsonRDD(json, 5e-2).saveAsParquetFile(outpath)
>
> And glaring inefficiency is that after parsing and cleaning the data i
> reserialize it
> by calling compact(render(JObject(child2)))) only to pass the text
> to jsonRDD to be parsed agian. However I see no way  to turn an RDD of
> json4s objects directly into a SchemRDD without turning it back into text
> first
>
> Is there any way to do this?
>
> I am also open to other suggestions for speeding up the above code,
> it is very slow in its current form.
>
> I would also like to make jsonFile drop invalid json records rather than
> failing the entire job. Is that possible?
>
> thanks
> Daniel
>
>