You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Daniel Mahler <dm...@gmail.com> on 2014/11/04 05:04:51 UTC
Cleaning/transforming json befor converting to SchemaRDD
I am trying to convert terabytes of json log files into parquet files.
but I need to clean it a little first.
I end up doing the following
txt = sc.textFile(inpath).coalesce(800)
val json = (for {
line <- txt
JObject(child) = parse(line)
child2 = (for {
JField(name, value) <- child
_ <- patt(name) // filter fields with invalid names
} yield JField(name.toLowerCase, value))
} yield compact(render(JObject(child2))))
sqx.jsonRDD(json, 5e-2).saveAsParquetFile(outpath)
And glaring inefficiency is that after parsing and cleaning the data i
reserialize it
by calling compact(render(JObject(child2)))) only to pass the text
to jsonRDD to be parsed agian. However I see no way to turn an RDD of
json4s objects directly into a SchemRDD without turning it back into text
first
Is there any way to do this?
I am also open to other suggestions for speeding up the above code,
it is very slow in its current form.
I would also like to make jsonFile drop invalid json records rather than
failing the entire job. Is that possible?
thanks
Daniel
Re: Cleaning/transforming json befor converting to SchemaRDD
Posted by Yin Huai <hu...@gmail.com>.
Hi Daniel,
Right now, you need to do the transformation manually. The feature you need
is under development (https://issues.apache.org/jira/browse/SPARK-4190).
Thanks,
Yin
On Tue, Nov 4, 2014 at 2:44 AM, Gerard Maas <ge...@gmail.com> wrote:
> You could transform the json to a case class instead of serializing it
> back to a String. The resulting RDD[MyCaseClass] is then directly usable as
> a SchemaRDD using the register function implicitly provided by 'import
> sqlContext.schemaRDD'. Then the rest of your pipeline will remain the same.
>
> -kr, Gerard
> On Nov 4, 2014 5:05 AM, "Daniel Mahler" <dm...@gmail.com> wrote:
>
>> I am trying to convert terabytes of json log files into parquet files.
>> but I need to clean it a little first.
>> I end up doing the following
>>
>> txt = sc.textFile(inpath).coalesce(800)
>>
>> val json = (for {
>> line <- txt
>> JObject(child) = parse(line)
>> child2 = (for {
>> JField(name, value) <- child
>> _ <- patt(name) // filter fields with invalid names
>> } yield JField(name.toLowerCase, value))
>> } yield compact(render(JObject(child2))))
>>
>> sqx.jsonRDD(json, 5e-2).saveAsParquetFile(outpath)
>>
>> And glaring inefficiency is that after parsing and cleaning the data i
>> reserialize it
>> by calling compact(render(JObject(child2)))) only to pass the text
>> to jsonRDD to be parsed agian. However I see no way to turn an RDD of
>> json4s objects directly into a SchemRDD without turning it back into text
>> first
>>
>> Is there any way to do this?
>>
>> I am also open to other suggestions for speeding up the above code,
>> it is very slow in its current form.
>>
>> I would also like to make jsonFile drop invalid json records rather than
>> failing the entire job. Is that possible?
>>
>> thanks
>> Daniel
>>
>>
Re: Cleaning/transforming json befor converting to SchemaRDD
Posted by Gerard Maas <ge...@gmail.com>.
You could transform the json to a case class instead of serializing it back
to a String. The resulting RDD[MyCaseClass] is then directly usable as a
SchemaRDD using the register function implicitly provided by 'import
sqlContext.schemaRDD'. Then the rest of your pipeline will remain the same.
-kr, Gerard
On Nov 4, 2014 5:05 AM, "Daniel Mahler" <dm...@gmail.com> wrote:
> I am trying to convert terabytes of json log files into parquet files.
> but I need to clean it a little first.
> I end up doing the following
>
> txt = sc.textFile(inpath).coalesce(800)
>
> val json = (for {
> line <- txt
> JObject(child) = parse(line)
> child2 = (for {
> JField(name, value) <- child
> _ <- patt(name) // filter fields with invalid names
> } yield JField(name.toLowerCase, value))
> } yield compact(render(JObject(child2))))
>
> sqx.jsonRDD(json, 5e-2).saveAsParquetFile(outpath)
>
> And glaring inefficiency is that after parsing and cleaning the data i
> reserialize it
> by calling compact(render(JObject(child2)))) only to pass the text
> to jsonRDD to be parsed agian. However I see no way to turn an RDD of
> json4s objects directly into a SchemRDD without turning it back into text
> first
>
> Is there any way to do this?
>
> I am also open to other suggestions for speeding up the above code,
> it is very slow in its current form.
>
> I would also like to make jsonFile drop invalid json records rather than
> failing the entire job. Is that possible?
>
> thanks
> Daniel
>
>