You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Sundaram, Muthu X." <Mu...@sabre.com> on 2014/07/22 17:24:11 UTC

Tranforming flume events using Spark transformation functions

Hi All,
  I am getting events from flume using following line.

  JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);

Each event is a delimited record. I like to use some of the transformation functions like map and reduce on this. Do I need to convert the JavaDStream<SparkFlumeEvent> to JavaDStream<String> or can I apply these function directly on this?

I need to do following kind of operations

XXXX                     AA
YYYYY                    Delta
TTTTT                    AA
CCCC                     Southwest
XXXX                     AA

Unique tickets are XXXX , YYYYY, TTTT, CCCC, XXXX.
Count is XXXX 2, YYYY 1, TTTTT 1 and so on...
AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 ticket.

I have to do transformations like this. Right now I am able to receives records. But I am struggling to transform them using spark transformation functions since they are not of type JavaRDD<String>.

Can I create new JavaRDD<String>? How do I create new JavaRDD?

I loop through  the events like below

flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception {
                     String logRecord = null;
                     List<SparkFlumeEvent> events = eventsData.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = events.iterator();
                     long t1 = System.currentTimeMillis();
                     AvroFlumeEvent avroEvent = null;
                     ByteBuffer bytePayload = null;
                     // All the user level data is carried as payload in Flume Event
                     while(batchedEvents.hasNext()) {
                            SparkFlumeEvent flumeEvent = batchedEvents.next();
                            avroEvent = flumeEvent.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());

                            System.out.println(">>>>>>>>LOG RECORD = " + logRecord);
}

Where do I create new JavaRDD<String>? DO I do it before this loop? How do I create this JavaRDD<String>?
In the loop I am able to get every record and I am able to print them.

I appreciate any help here.

Thanks,
Muthu



Re: Tranforming flume events using Spark transformation functions

Posted by Tathagata Das <ta...@gmail.com>.
This is because of the RDD's lazy evaluation! Unless you force a
transformed (mapped/filtered/etc.) RDD to give you back some data (like
RDD.count) or output the data (like RDD.saveAsTextFile()), Spark will not
do anything.

So after the eventData.map(...), if you do take(10) and then print the
result, you should seem 10 items from each batch be printed.

Also you can do the same map operation on the Dstream as well. FYI.

inputDStream.map(...).foreachRDD(...)     is equivalent to
 inputDStream.foreachRDD(     // call rdd.map(...) )

Either way you have to call some RDD "action" (count, collect, take,
saveAsHadoopFile, etc.)  that asks the system to something concrete with
the data.

TD




On Tue, Jul 22, 2014 at 1:55 PM, Sundaram, Muthu X. <
Muthu.X.Sundaram.ctr@sabre.com> wrote:

> I tried to map SparkFlumeEvents to String of RDDs like below. But that map
> and call are not at all executed. I might be doing this in a wrong way. Any
> help would be appreciated.
>
> flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
>               @Override
>               public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws
> Exception {
>                                 System.out.println("<<<<<<Inside for
> each...call>>>>");
>
>                                 JavaRDD<String> records = eventsData.map(
>             new Function<SparkFlumeEvent, String>() {
>                                 @Override
>                 public String call(SparkFlumeEvent flume) throws Exception
> {
>                     String logRecord = null;
>                 AvroFlumeEvent avroEvent = null;
>       ByteBuffer bytePayload = null;
>
>
>       System.out.println("<<<<<<Inside Map..call>>>>");
>                     /* List<SparkFlumeEvent> events = flume.collect();
>                      Iterator<SparkFlumeEvent> batchedEvents =
> events.iterator();
>
>                             SparkFlumeEvent flumeEvent =
> batchedEvents.next();*/
>                             avroEvent = flume.event();
>                             bytePayload = avroEvent.getBody();
>                             logRecord = new String(bytePayload.array());
>
>                                       System.out.println("<<<<Record is" +
> logRecord);
>
>                     return logRecord;
>                 }
>             });
>                                 return null;
> }
>
> -----Original Message-----
> From: Sundaram, Muthu X. [mailto:Muthu.X.Sundaram.ctr@sabre.com]
> Sent: Tuesday, July 22, 2014 10:24 AM
> To: user@spark.apache.org; dev@spark.incubator.apache.org
> Subject: Tranforming flume events using Spark transformation functions
>
> Hi All,
>   I am getting events from flume using following line.
>
>   JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc,
> host, port);
>
> Each event is a delimited record. I like to use some of the transformation
> functions like map and reduce on this. Do I need to convert the
> JavaDStream<SparkFlumeEvent> to JavaDStream<String> or can I apply these
> function directly on this?
>
> I need to do following kind of operations
>
> XXXX                     AA
> YYYYY                    Delta
> TTTTT                    AA
> CCCC                     Southwest
> XXXX                     AA
>
> Unique tickets are XXXX , YYYYY, TTTT, CCCC, XXXX.
> Count is XXXX 2, YYYY 1, TTTTT 1 and so on...
> AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket,
> Southwest - 1 ticket.
>
> I have to do transformations like this. Right now I am able to receives
> records. But I am struggling to transform them using spark transformation
> functions since they are not of type JavaRDD<String>.
>
> Can I create new JavaRDD<String>? How do I create new JavaRDD?
>
> I loop through  the events like below
>
> flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
>               @Override
>               public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws
> Exception {
>                      String logRecord = null;
>                      List<SparkFlumeEvent> events = eventsData.collect();
>                      Iterator<SparkFlumeEvent> batchedEvents =
> events.iterator();
>                      long t1 = System.currentTimeMillis();
>                      AvroFlumeEvent avroEvent = null;
>                      ByteBuffer bytePayload = null;
>                      // All the user level data is carried as payload in
> Flume Event
>                      while(batchedEvents.hasNext()) {
>                             SparkFlumeEvent flumeEvent =
> batchedEvents.next();
>                             avroEvent = flumeEvent.event();
>                             bytePayload = avroEvent.getBody();
>                             logRecord = new String(bytePayload.array());
>
>                             System.out.println(">>>>>>>>LOG RECORD = " +
> logRecord); }
>
> Where do I create new JavaRDD<String>? DO I do it before this loop? How do
> I create this JavaRDD<String>?
> In the loop I am able to get every record and I am able to print them.
>
> I appreciate any help here.
>
> Thanks,
> Muthu
>
>
>

Re: Tranforming flume events using Spark transformation functions

Posted by Tathagata Das <ta...@gmail.com>.
This is because of the RDD's lazy evaluation! Unless you force a
transformed (mapped/filtered/etc.) RDD to give you back some data (like
RDD.count) or output the data (like RDD.saveAsTextFile()), Spark will not
do anything.

So after the eventData.map(...), if you do take(10) and then print the
result, you should seem 10 items from each batch be printed.

Also you can do the same map operation on the Dstream as well. FYI.

inputDStream.map(...).foreachRDD(...)     is equivalent to
 inputDStream.foreachRDD(     // call rdd.map(...) )

Either way you have to call some RDD "action" (count, collect, take,
saveAsHadoopFile, etc.)  that asks the system to something concrete with
the data.

TD




On Tue, Jul 22, 2014 at 1:55 PM, Sundaram, Muthu X. <
Muthu.X.Sundaram.ctr@sabre.com> wrote:

> I tried to map SparkFlumeEvents to String of RDDs like below. But that map
> and call are not at all executed. I might be doing this in a wrong way. Any
> help would be appreciated.
>
> flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
>               @Override
>               public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws
> Exception {
>                                 System.out.println("<<<<<<Inside for
> each...call>>>>");
>
>                                 JavaRDD<String> records = eventsData.map(
>             new Function<SparkFlumeEvent, String>() {
>                                 @Override
>                 public String call(SparkFlumeEvent flume) throws Exception
> {
>                     String logRecord = null;
>                 AvroFlumeEvent avroEvent = null;
>       ByteBuffer bytePayload = null;
>
>
>       System.out.println("<<<<<<Inside Map..call>>>>");
>                     /* List<SparkFlumeEvent> events = flume.collect();
>                      Iterator<SparkFlumeEvent> batchedEvents =
> events.iterator();
>
>                             SparkFlumeEvent flumeEvent =
> batchedEvents.next();*/
>                             avroEvent = flume.event();
>                             bytePayload = avroEvent.getBody();
>                             logRecord = new String(bytePayload.array());
>
>                                       System.out.println("<<<<Record is" +
> logRecord);
>
>                     return logRecord;
>                 }
>             });
>                                 return null;
> }
>
> -----Original Message-----
> From: Sundaram, Muthu X. [mailto:Muthu.X.Sundaram.ctr@sabre.com]
> Sent: Tuesday, July 22, 2014 10:24 AM
> To: user@spark.apache.org; dev@spark.incubator.apache.org
> Subject: Tranforming flume events using Spark transformation functions
>
> Hi All,
>   I am getting events from flume using following line.
>
>   JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc,
> host, port);
>
> Each event is a delimited record. I like to use some of the transformation
> functions like map and reduce on this. Do I need to convert the
> JavaDStream<SparkFlumeEvent> to JavaDStream<String> or can I apply these
> function directly on this?
>
> I need to do following kind of operations
>
> XXXX                     AA
> YYYYY                    Delta
> TTTTT                    AA
> CCCC                     Southwest
> XXXX                     AA
>
> Unique tickets are XXXX , YYYYY, TTTT, CCCC, XXXX.
> Count is XXXX 2, YYYY 1, TTTTT 1 and so on...
> AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket,
> Southwest - 1 ticket.
>
> I have to do transformations like this. Right now I am able to receives
> records. But I am struggling to transform them using spark transformation
> functions since they are not of type JavaRDD<String>.
>
> Can I create new JavaRDD<String>? How do I create new JavaRDD?
>
> I loop through  the events like below
>
> flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
>               @Override
>               public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws
> Exception {
>                      String logRecord = null;
>                      List<SparkFlumeEvent> events = eventsData.collect();
>                      Iterator<SparkFlumeEvent> batchedEvents =
> events.iterator();
>                      long t1 = System.currentTimeMillis();
>                      AvroFlumeEvent avroEvent = null;
>                      ByteBuffer bytePayload = null;
>                      // All the user level data is carried as payload in
> Flume Event
>                      while(batchedEvents.hasNext()) {
>                             SparkFlumeEvent flumeEvent =
> batchedEvents.next();
>                             avroEvent = flumeEvent.event();
>                             bytePayload = avroEvent.getBody();
>                             logRecord = new String(bytePayload.array());
>
>                             System.out.println(">>>>>>>>LOG RECORD = " +
> logRecord); }
>
> Where do I create new JavaRDD<String>? DO I do it before this loop? How do
> I create this JavaRDD<String>?
> In the loop I am able to get every record and I am able to print them.
>
> I appreciate any help here.
>
> Thanks,
> Muthu
>
>
>

RE: Tranforming flume events using Spark transformation functions

Posted by "Sundaram, Muthu X." <Mu...@sabre.com>.
I tried to map SparkFlumeEvents to String of RDDs like below. But that map and call are not at all executed. I might be doing this in a wrong way. Any help would be appreciated.

flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception {
                                System.out.println("<<<<<<Inside for each...call>>>>");

                                JavaRDD<String> records = eventsData.map(
            new Function<SparkFlumeEvent, String>() {
                                @Override
                public String call(SparkFlumeEvent flume) throws Exception {
                    String logRecord = null;
                AvroFlumeEvent avroEvent = null;
      ByteBuffer bytePayload = null;

                                                                                System.out.println("<<<<<<Inside Map..call>>>>");
                    /* List<SparkFlumeEvent> events = flume.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = events.iterator(); 
                                                                                
                            SparkFlumeEvent flumeEvent = batchedEvents.next();*/
                            avroEvent = flume.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());                                
                                                                                                                System.out.println("<<<<Record is" + logRecord);
                                                                                
                    return logRecord;
                }
            });                                               
                                return null;
}

-----Original Message-----
From: Sundaram, Muthu X. [mailto:Muthu.X.Sundaram.ctr@sabre.com] 
Sent: Tuesday, July 22, 2014 10:24 AM
To: user@spark.apache.org; dev@spark.incubator.apache.org
Subject: Tranforming flume events using Spark transformation functions

Hi All,
  I am getting events from flume using following line.

  JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);

Each event is a delimited record. I like to use some of the transformation functions like map and reduce on this. Do I need to convert the JavaDStream<SparkFlumeEvent> to JavaDStream<String> or can I apply these function directly on this?

I need to do following kind of operations

XXXX                     AA
YYYYY                    Delta
TTTTT                    AA
CCCC                     Southwest
XXXX                     AA

Unique tickets are XXXX , YYYYY, TTTT, CCCC, XXXX.
Count is XXXX 2, YYYY 1, TTTTT 1 and so on...
AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 ticket.

I have to do transformations like this. Right now I am able to receives records. But I am struggling to transform them using spark transformation functions since they are not of type JavaRDD<String>.

Can I create new JavaRDD<String>? How do I create new JavaRDD?

I loop through  the events like below

flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception {
                     String logRecord = null;
                     List<SparkFlumeEvent> events = eventsData.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = events.iterator();
                     long t1 = System.currentTimeMillis();
                     AvroFlumeEvent avroEvent = null;
                     ByteBuffer bytePayload = null;
                     // All the user level data is carried as payload in Flume Event
                     while(batchedEvents.hasNext()) {
                            SparkFlumeEvent flumeEvent = batchedEvents.next();
                            avroEvent = flumeEvent.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());

                            System.out.println(">>>>>>>>LOG RECORD = " + logRecord); }

Where do I create new JavaRDD<String>? DO I do it before this loop? How do I create this JavaRDD<String>?
In the loop I am able to get every record and I am able to print them.

I appreciate any help here.

Thanks,
Muthu



RE: Tranforming flume events using Spark transformation functions

Posted by "Sundaram, Muthu X." <Mu...@sabre.com>.
I tried to map SparkFlumeEvents to String of RDDs like below. But that map and call are not at all executed. I might be doing this in a wrong way. Any help would be appreciated.

flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception {
                                System.out.println("<<<<<<Inside for each...call>>>>");

                                JavaRDD<String> records = eventsData.map(
            new Function<SparkFlumeEvent, String>() {
                                @Override
                public String call(SparkFlumeEvent flume) throws Exception {
                    String logRecord = null;
                AvroFlumeEvent avroEvent = null;
      ByteBuffer bytePayload = null;

                                                                                System.out.println("<<<<<<Inside Map..call>>>>");
                    /* List<SparkFlumeEvent> events = flume.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = events.iterator(); 
                                                                                
                            SparkFlumeEvent flumeEvent = batchedEvents.next();*/
                            avroEvent = flume.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());                                
                                                                                                                System.out.println("<<<<Record is" + logRecord);
                                                                                
                    return logRecord;
                }
            });                                               
                                return null;
}

-----Original Message-----
From: Sundaram, Muthu X. [mailto:Muthu.X.Sundaram.ctr@sabre.com] 
Sent: Tuesday, July 22, 2014 10:24 AM
To: user@spark.apache.org; dev@spark.incubator.apache.org
Subject: Tranforming flume events using Spark transformation functions

Hi All,
  I am getting events from flume using following line.

  JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);

Each event is a delimited record. I like to use some of the transformation functions like map and reduce on this. Do I need to convert the JavaDStream<SparkFlumeEvent> to JavaDStream<String> or can I apply these function directly on this?

I need to do following kind of operations

XXXX                     AA
YYYYY                    Delta
TTTTT                    AA
CCCC                     Southwest
XXXX                     AA

Unique tickets are XXXX , YYYYY, TTTT, CCCC, XXXX.
Count is XXXX 2, YYYY 1, TTTTT 1 and so on...
AA - 2 tickets(Should not count the duplicate), Delta - 1 ticket, Southwest - 1 ticket.

I have to do transformations like this. Right now I am able to receives records. But I am struggling to transform them using spark transformation functions since they are not of type JavaRDD<String>.

Can I create new JavaRDD<String>? How do I create new JavaRDD?

I loop through  the events like below

flumeStream.foreach(new Function<JavaRDD<SparkFlumeEvent>,Void> () {
              @Override
              public Void call(JavaRDD<SparkFlumeEvent> eventsData) throws Exception {
                     String logRecord = null;
                     List<SparkFlumeEvent> events = eventsData.collect();
                     Iterator<SparkFlumeEvent> batchedEvents = events.iterator();
                     long t1 = System.currentTimeMillis();
                     AvroFlumeEvent avroEvent = null;
                     ByteBuffer bytePayload = null;
                     // All the user level data is carried as payload in Flume Event
                     while(batchedEvents.hasNext()) {
                            SparkFlumeEvent flumeEvent = batchedEvents.next();
                            avroEvent = flumeEvent.event();
                            bytePayload = avroEvent.getBody();
                            logRecord = new String(bytePayload.array());

                            System.out.println(">>>>>>>>LOG RECORD = " + logRecord); }

Where do I create new JavaRDD<String>? DO I do it before this loop? How do I create this JavaRDD<String>?
In the loop I am able to get every record and I am able to print them.

I appreciate any help here.

Thanks,
Muthu