You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Vijay Innamuri <vi...@gmail.com> on 2015/03/16 06:58:20 UTC

Spark Streaming with compressed xml files

Hi All,

Processing streaming JSON files with Spark features (Spark streaming and
Spark SQL), is very efficient and works like a charm.

Below is the code snippet to process JSON files.

        windowDStream.foreachRDD(IncomingFiles => {
        val IncomingFilesTable = sqlContext.jsonRDD(IncomingFiles);
        IncomingFilesTable.registerAsTable("IncomingFilesTable");
        val result = sqlContext.sql("select text from
IncomingFilesTable").collect;
        sc.parallelize(result).saveAsTextFile("filepath");
        }


But, I feel its difficult to use spark features efficiently with streaming
xml files (each compressed file would be 4 MB).

What is the best approach for processing compressed xml files?

Regards
Vijay

Re: Spark Streaming with compressed xml files

Posted by Tathagata Das <td...@databricks.com>.
That's why XMLInputFormat suggested by Akhil is a good idea. It should give
you full XML object as on record, (as opposed to an XML record spread
across multiple line records in textFileStream). Then you could convert
each record into a json, thereby making it a json RDD. Then you can save it
as a json file, and use SQLContext.jsonFile() to read it and process with
SQL / DataFrames.

TD

On Mon, Mar 16, 2015 at 11:17 AM, Vijay Innamuri <vi...@gmail.com>
wrote:

>
> textFileStream and default fileStream recognizes the compressed
> xml(.xml.gz) files.
>
> Each line in the xml file is an element in RDD[string].
>
> Then whole RDD is converted to a proper xml format data and stored in a *Scala
> variable*.
>
>    - I believe storing huge data in a *Scala variable* is inefficient. Is
>    there any alternative processing for xml files?
>    - How to create Spark SQL table  with the above xml data?
>
> Regards
> Vijay Innamuri
>
>
> On 16 March 2015 at 12:12, Akhil Das <ak...@sigmoidanalytics.com> wrote:
>
>> One approach would be, If you are using fileStream you can access the
>> individual filenames from the partitions and with that filename you can
>> apply your uncompression logic/parsing logic and get it done.
>>
>>
>> Like:
>>
>>         UnionPartition upp = (UnionPartition) ds.values().getPartitions()[i];	NewHadoopPartition npp = (NewHadoopPartition) upp.split();	String *fPath* = npp.serializableHadoopSplit().value().toString();
>>
>>
>> Another approach would be to create a custom inputReader and InpurFormat,
>> then pass it along with your fileStream and within the reader, you do your
>> uncompression/parsing etc. You can also look into XMLInputFormat
>> <https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java>
>> of mahout.
>>
>>
>>
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Mar 16, 2015 at 11:28 AM, Vijay Innamuri <
>> vijay.innamuri@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> Processing streaming JSON files with Spark features (Spark streaming and
>>> Spark SQL), is very efficient and works like a charm.
>>>
>>> Below is the code snippet to process JSON files.
>>>
>>>         windowDStream.foreachRDD(IncomingFiles => {
>>>         val IncomingFilesTable = sqlContext.jsonRDD(IncomingFiles);
>>>         IncomingFilesTable.registerAsTable("IncomingFilesTable");
>>>         val result = sqlContext.sql("select text from
>>> IncomingFilesTable").collect;
>>>         sc.parallelize(result).saveAsTextFile("filepath");
>>>         }
>>>
>>>
>>> But, I feel its difficult to use spark features efficiently with
>>> streaming xml files (each compressed file would be 4 MB).
>>>
>>> What is the best approach for processing compressed xml files?
>>>
>>> Regards
>>> Vijay
>>>
>>
>>
>

Re: Spark Streaming with compressed xml files

Posted by Vijay Innamuri <vi...@gmail.com>.
textFileStream and default fileStream recognizes the compressed
xml(.xml.gz) files.

Each line in the xml file is an element in RDD[string].

Then whole RDD is converted to a proper xml format data and stored in a *Scala
variable*.

   - I believe storing huge data in a *Scala variable* is inefficient. Is
   there any alternative processing for xml files?
   - How to create Spark SQL table  with the above xml data?

Regards
Vijay Innamuri


On 16 March 2015 at 12:12, Akhil Das <ak...@sigmoidanalytics.com> wrote:

> One approach would be, If you are using fileStream you can access the
> individual filenames from the partitions and with that filename you can
> apply your uncompression logic/parsing logic and get it done.
>
>
> Like:
>
>         UnionPartition upp = (UnionPartition) ds.values().getPartitions()[i];	NewHadoopPartition npp = (NewHadoopPartition) upp.split();	String *fPath* = npp.serializableHadoopSplit().value().toString();
>
>
> Another approach would be to create a custom inputReader and InpurFormat,
> then pass it along with your fileStream and within the reader, you do your
> uncompression/parsing etc. You can also look into XMLInputFormat
> <https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java>
> of mahout.
>
>
>
>
> Thanks
> Best Regards
>
> On Mon, Mar 16, 2015 at 11:28 AM, Vijay Innamuri <vijay.innamuri@gmail.com
> > wrote:
>
>> Hi All,
>>
>> Processing streaming JSON files with Spark features (Spark streaming and
>> Spark SQL), is very efficient and works like a charm.
>>
>> Below is the code snippet to process JSON files.
>>
>>         windowDStream.foreachRDD(IncomingFiles => {
>>         val IncomingFilesTable = sqlContext.jsonRDD(IncomingFiles);
>>         IncomingFilesTable.registerAsTable("IncomingFilesTable");
>>         val result = sqlContext.sql("select text from
>> IncomingFilesTable").collect;
>>         sc.parallelize(result).saveAsTextFile("filepath");
>>         }
>>
>>
>> But, I feel its difficult to use spark features efficiently with
>> streaming xml files (each compressed file would be 4 MB).
>>
>> What is the best approach for processing compressed xml files?
>>
>> Regards
>> Vijay
>>
>
>

Re: Spark Streaming with compressed xml files

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
One approach would be, If you are using fileStream you can access the
individual filenames from the partitions and with that filename you can
apply your uncompression logic/parsing logic and get it done.


Like:

        UnionPartition upp = (UnionPartition)
ds.values().getPartitions()[i];	NewHadoopPartition npp =
(NewHadoopPartition) upp.split();	String *fPath* =
npp.serializableHadoopSplit().value().toString();


Another approach would be to create a custom inputReader and InpurFormat,
then pass it along with your fileStream and within the reader, you do your
uncompression/parsing etc. You can also look into XMLInputFormat
<https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java>
of mahout.




Thanks
Best Regards

On Mon, Mar 16, 2015 at 11:28 AM, Vijay Innamuri <vi...@gmail.com>
wrote:

> Hi All,
>
> Processing streaming JSON files with Spark features (Spark streaming and
> Spark SQL), is very efficient and works like a charm.
>
> Below is the code snippet to process JSON files.
>
>         windowDStream.foreachRDD(IncomingFiles => {
>         val IncomingFilesTable = sqlContext.jsonRDD(IncomingFiles);
>         IncomingFilesTable.registerAsTable("IncomingFilesTable");
>         val result = sqlContext.sql("select text from
> IncomingFilesTable").collect;
>         sc.parallelize(result).saveAsTextFile("filepath");
>         }
>
>
> But, I feel its difficult to use spark features efficiently with
> streaming xml files (each compressed file would be 4 MB).
>
> What is the best approach for processing compressed xml files?
>
> Regards
> Vijay
>