You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Kulkarni, Vikram" <vi...@hp.com> on 2014/04/28 14:46:11 UTC

Java Spark Streaming - SparkFlumeEvent

Hi Spark-users,
  Within my Spark Streaming program, I am able to ingest data sent by my Flume Avro Client. I configured a 'spooling directory source' to write data to a Flume Avro Sink (the Spark Streaming Driver program in this case). The default deserializer i.e. LINE is used to parse the file into events. Therefore I am expecting an event (SparkFlumeEvent) for every line in the log file.

My Spark Streaming Code snippet here:

       System.out.println("Setting up Flume Stream using Avro Sink at: " + avroServer + ":" + avroPort);

       //JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("XXX.YYY.XXX.YYY", port);
       JavaDStream<SparkFlumeEvent> flumeStream =
                     FlumeUtils.createStream(ssc, avroServer, avroPort);

       flumeStream.count();

       flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
                     @Override
                     public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception {
                            List<SparkFlumeEvent> events = eventsData.collect();
                            Iterator<SparkFlumeEvent> batchedEvents = events.iterator();

                            System.out.println(">>>>>> Received Spark Flume Events: " + events.size());
                            while(batchedEvents.hasNext()) {
                                   SparkFlumeEvent flumeEvent = batchedEvents.next();
                                   //System.out.println("SparkFlumeEvent = " + flumeEvent);
                                   //System.out.println(">>>>>>>>" + flumeEvent.toString());

//TODO: How to build each line in the file using this SparkFlumeEvent object?
                            }
                           return null;
                     }
             });

Within this while loop, how do I extract each line that was streamed using the SparkFlumeEvent object? I intend to then parse this line, extract various fields and then persist it to memory.

Regards,
Vikram


RE: Java Spark Streaming - SparkFlumeEvent

Posted by "Kulkarni, Vikram" <vi...@hp.com>.
Thanks Tathagata. Here’s the code snippet:

       // insert the records read in this batch interval into DB
       flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception {
                     String logRecord = null;
                     List<SparkFlumeEvent> events = eventsData.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = events.iterator();

                     System.out.println(">>>>>> Received Spark Flume Events: " + events.size());

                     long t1 = System.currentTimeMillis();
                     AvroFlumeEvent avroEvent = null;
                     ByteBuffer bytePayload = null;

                     // All the user level data is carried as payload in Flume Event
                     while(batchedEvents.hasNext()) {
                            SparkFlumeEvent flumeEvent = batchedEvents.next();

                            avroEvent = flumeEvent.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());

                            System.out.println(">>>>>>>>LOG RECORD = " + logRecord);
                            String[] fields = logRecord.split("|");

                            //TODO: Insert a record of format: [msisdn|ip_addr|start_time|end_time] into DB
                     }
                     System.out.println("Processed this batch in: " + (System.currentTimeMillis() - t1)/1000 + " seconds");

                     return null;
              }
         });


Regards,
Vikram

From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
Sent: Tuesday, April 29, 2014 2:41 AM
To: user@spark.apache.org
Subject: Re: Java Spark Streaming - SparkFlumeEvent

You can get the internal AvroFlumeEvent inside the SparkFlumeEvent using SparkFlumeEvent.event. That should probably give you all the original text data.


On Mon, Apr 28, 2014 at 5:46 AM, Kulkarni, Vikram <vi...@hp.com>> wrote:
Hi Spark-users,
  Within my Spark Streaming program, I am able to ingest data sent by my Flume Avro Client. I configured a ‘spooling directory source’ to write data to a Flume Avro Sink (the Spark Streaming Driver program in this case). The default deserializer i.e. LINE is used to parse the file into events. Therefore I am expecting an event (SparkFlumeEvent) for every line in the log file.

My Spark Streaming Code snippet here:

       System.out.println("Setting up Flume Stream using Avro Sink at: " + avroServer + ":" + avroPort);

       //JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("XXX.YYY.XXX.YYY", port);
       JavaDStream<SparkFlumeEvent> flumeStream =
                     FlumeUtils.createStream(ssc, avroServer, avroPort);

       flumeStream.count();

       flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
                     @Override
                     public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception {
                            List<SparkFlumeEvent> events = eventsData.collect();
                            Iterator<SparkFlumeEvent> batchedEvents = events.iterator();

                            System.out.println(">>>>>> Received Spark Flume Events: " + events.size());
                            while(batchedEvents.hasNext()) {
                                   SparkFlumeEvent flumeEvent = batchedEvents.next();
                                   //System.out.println("SparkFlumeEvent = " + flumeEvent);
                                   //System.out.println(">>>>>>>>" + flumeEvent.toString());

//TODO: How to build each line in the file using this SparkFlumeEvent object?
                            }
                           return null;
                     }
             });

Within this while loop, how do I extract each line that was streamed using the SparkFlumeEvent object? I intend to then parse this line, extract various fields and then persist it to memory.

Regards,
Vikram



Re: Java Spark Streaming - SparkFlumeEvent

Posted by Tathagata Das <ta...@gmail.com>.
You can get the internal AvroFlumeEvent inside the SparkFlumeEvent using
SparkFlumeEvent.event. That should probably give you all the original text
data.



On Mon, Apr 28, 2014 at 5:46 AM, Kulkarni, Vikram <vi...@hp.com>wrote:

>  Hi Spark-users,
>
>   Within my Spark Streaming program, I am able to ingest data sent by my
> Flume Avro Client. I configured a ‘spooling directory source’ to write data
> to a Flume Avro Sink (the Spark Streaming Driver program in this case). The
> default deserializer i.e. LINE is used to parse the file into events.
> Therefore I am expecting an event (SparkFlumeEvent) for every line in the
> log file.
>
>
>
> My Spark Streaming Code snippet here:
>
>
>
>        System.*out*.println("Setting up Flume Stream using Avro Sink at: "+ avroServer +
> ":" + avroPort);
>
>
>
>        //JavaDStream<SparkFlumeEvent> flumeStream =
> sc.flumeStream("XXX.YYY.XXX.YYY", port);
>
>        JavaDStream<SparkFlumeEvent> flumeStream =
>
>                      FlumeUtils.*createStream*(ssc, avroServer, avroPort);
>
>
>
>        flumeStream.count();
>
>
>
>        flumeStream.foreach(*new* *Function<JavaRDD<SparkFlumeEvent>,Void>
> ()* {
>
>                      @Override
>
>                      *public* Void call(JavaRDD<SparkFlumeEvent>
> eventsData) *throws* Exception {
>
>                             List<SparkFlumeEvent> events =
> eventsData.collect();
>
>                             Iterator<SparkFlumeEvent> batchedEvents =
> events.iterator();
>
>
>
>                             System.*out*.println(">>>>>> Received Spark
> Flume Events: " + events.size());
>
>                             *while*(batchedEvents.hasNext()) {
>
>                                    SparkFlumeEvent flumeEvent =
> batchedEvents.next();
>
>                                    //System.out.println("SparkFlumeEvent
> = " + flumeEvent);
>
>                                    //System.out.println(">>>>>>>>" +
> flumeEvent.toString());
>
>
>
> //TODO: How to build each line in the file using this SparkFlumeEvent
> object?
>
>                             }
>
>                            *return* *null*;
>
>                      }
>
>              });
>
>
>
> Within this while loop, how do I extract each line that was streamed using
> the SparkFlumeEvent object? I intend to then parse this line, extract
> various fields and then persist it to memory.
>
>
>
> Regards,
>
> Vikram
>
>
>