You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Kulkarni, Vikram" <vi...@hp.com> on 2014/04/28 14:46:11 UTC
Java Spark Streaming - SparkFlumeEvent
Hi Spark-users,
Within my Spark Streaming program, I am able to ingest data sent by my Flume Avro Client. I configured a 'spooling directory source' to write data to a Flume Avro Sink (the Spark Streaming Driver program in this case). The default deserializer i.e. LINE is used to parse the file into events. Therefore I am expecting an event (SparkFlumeEvent) for every line in the log file.
My Spark Streaming Code snippet here:
System.out.println("Setting up Flume Stream using Avro Sink at: " + avroServer + ":" + avroPort);
//JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("XXX.YYY.XXX.YYY", port);
JavaDStream<SparkFlumeEvent> flumeStream =
FlumeUtils.createStream(ssc, avroServer, avroPort);
flumeStream.count();
flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
@Override
public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception {
List<SparkFlumeEvent> events = eventsData.collect();
Iterator<SparkFlumeEvent> batchedEvents = events.iterator();
System.out.println(">>>>>> Received Spark Flume Events: " + events.size());
while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
//System.out.println("SparkFlumeEvent = " + flumeEvent);
//System.out.println(">>>>>>>>" + flumeEvent.toString());
//TODO: How to build each line in the file using this SparkFlumeEvent object?
}
return null;
}
});
Within this while loop, how do I extract each line that was streamed using the SparkFlumeEvent object? I intend to then parse this line, extract various fields and then persist it to memory.
Regards,
Vikram
RE: Java Spark Streaming - SparkFlumeEvent
Posted by "Kulkarni, Vikram" <vi...@hp.com>.
Thanks Tathagata. Here’s the code snippet:
// insert the records read in this batch interval into DB
flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
@Override
public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception {
String logRecord = null;
List<SparkFlumeEvent> events = eventsData.collect();
Iterator<SparkFlumeEvent> batchedEvents = events.iterator();
System.out.println(">>>>>> Received Spark Flume Events: " + events.size());
long t1 = System.currentTimeMillis();
AvroFlumeEvent avroEvent = null;
ByteBuffer bytePayload = null;
// All the user level data is carried as payload in Flume Event
while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
avroEvent = flumeEvent.event();
bytePayload = avroEvent.getBody();
logRecord = new String(bytePayload.array());
System.out.println(">>>>>>>>LOG RECORD = " + logRecord);
String[] fields = logRecord.split("|");
//TODO: Insert a record of format: [msisdn|ip_addr|start_time|end_time] into DB
}
System.out.println("Processed this batch in: " + (System.currentTimeMillis() - t1)/1000 + " seconds");
return null;
}
});
Regards,
Vikram
From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
Sent: Tuesday, April 29, 2014 2:41 AM
To: user@spark.apache.org
Subject: Re: Java Spark Streaming - SparkFlumeEvent
You can get the internal AvroFlumeEvent inside the SparkFlumeEvent using SparkFlumeEvent.event. That should probably give you all the original text data.
On Mon, Apr 28, 2014 at 5:46 AM, Kulkarni, Vikram <vi...@hp.com>> wrote:
Hi Spark-users,
Within my Spark Streaming program, I am able to ingest data sent by my Flume Avro Client. I configured a ‘spooling directory source’ to write data to a Flume Avro Sink (the Spark Streaming Driver program in this case). The default deserializer i.e. LINE is used to parse the file into events. Therefore I am expecting an event (SparkFlumeEvent) for every line in the log file.
My Spark Streaming Code snippet here:
System.out.println("Setting up Flume Stream using Avro Sink at: " + avroServer + ":" + avroPort);
//JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("XXX.YYY.XXX.YYY", port);
JavaDStream<SparkFlumeEvent> flumeStream =
FlumeUtils.createStream(ssc, avroServer, avroPort);
flumeStream.count();
flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
@Override
public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception {
List<SparkFlumeEvent> events = eventsData.collect();
Iterator<SparkFlumeEvent> batchedEvents = events.iterator();
System.out.println(">>>>>> Received Spark Flume Events: " + events.size());
while(batchedEvents.hasNext()) {
SparkFlumeEvent flumeEvent = batchedEvents.next();
//System.out.println("SparkFlumeEvent = " + flumeEvent);
//System.out.println(">>>>>>>>" + flumeEvent.toString());
//TODO: How to build each line in the file using this SparkFlumeEvent object?
}
return null;
}
});
Within this while loop, how do I extract each line that was streamed using the SparkFlumeEvent object? I intend to then parse this line, extract various fields and then persist it to memory.
Regards,
Vikram
Re: Java Spark Streaming - SparkFlumeEvent
Posted by Tathagata Das <ta...@gmail.com>.
You can get the internal AvroFlumeEvent inside the SparkFlumeEvent using
SparkFlumeEvent.event. That should probably give you all the original text
data.
On Mon, Apr 28, 2014 at 5:46 AM, Kulkarni, Vikram <vi...@hp.com>wrote:
> Hi Spark-users,
>
> Within my Spark Streaming program, I am able to ingest data sent by my
> Flume Avro Client. I configured a ‘spooling directory source’ to write data
> to a Flume Avro Sink (the Spark Streaming Driver program in this case). The
> default deserializer i.e. LINE is used to parse the file into events.
> Therefore I am expecting an event (SparkFlumeEvent) for every line in the
> log file.
>
>
>
> My Spark Streaming Code snippet here:
>
>
>
> System.*out*.println("Setting up Flume Stream using Avro Sink at: "+ avroServer +
> ":" + avroPort);
>
>
>
> //JavaDStream<SparkFlumeEvent> flumeStream =
> sc.flumeStream("XXX.YYY.XXX.YYY", port);
>
> JavaDStream<SparkFlumeEvent> flumeStream =
>
> FlumeUtils.*createStream*(ssc, avroServer, avroPort);
>
>
>
> flumeStream.count();
>
>
>
> flumeStream.foreach(*new* *Function<JavaRDD<SparkFlumeEvent>,Void>
> ()* {
>
> @Override
>
> *public* Void call(JavaRDD<SparkFlumeEvent>
> eventsData) *throws* Exception {
>
> List<SparkFlumeEvent> events =
> eventsData.collect();
>
> Iterator<SparkFlumeEvent> batchedEvents =
> events.iterator();
>
>
>
> System.*out*.println(">>>>>> Received Spark
> Flume Events: " + events.size());
>
> *while*(batchedEvents.hasNext()) {
>
> SparkFlumeEvent flumeEvent =
> batchedEvents.next();
>
> //System.out.println("SparkFlumeEvent
> = " + flumeEvent);
>
> //System.out.println(">>>>>>>>" +
> flumeEvent.toString());
>
>
>
> //TODO: How to build each line in the file using this SparkFlumeEvent
> object?
>
> }
>
> *return* *null*;
>
> }
>
> });
>
>
>
> Within this while loop, how do I extract each line that was streamed using
> the SparkFlumeEvent object? I intend to then parse this line, extract
> various fields and then persist it to memory.
>
>
>
> Regards,
>
> Vikram
>
>
>