You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ashok Kumar <as...@yahoo.com.INVALID> on 2016/09/06 20:31:08 UTC
Getting figures from spark streaming
Hello Gurus,
I am creating some figures and feed them into Kafka and then spark streaming.
It works OK but I have the following issue.
For now as a test I sent 5 prices in each batch interval. In the loop code this is what is hapening
dstream.foreachRDD { rdd => val x= rdd.count
i += 1 println(s"====> rdd loop i is ${i}, number of lines is ${x} <======") if (x > 0) { println(s"================processing ${x} records=================") var words1 = rdd.map(_._2).map(_.split(',').view(0)).map(_.toInt).collect.apply(0) println (words1) var words2 = rdd.map(_._2).map(_.split(',').view(1)).map(_.toString).collect.apply(0) println (words2) var price = rdd.map(_._2).map(_.split(',').view(2)).map(_.toFloat).collect.apply(0) println (price) rdd.collect.foreach(println) } }
My tuple looks like this
// (null, "ID TIMESTAMP PRICE")// (null, "40,20160426-080924, 67.55738301621814598514")
And this the sample output from the run
================processing 5 records=================320160906-21250980.224686(null,3,20160906-212509,80.22468448052631637099)(null,1,20160906-212509,60.40695324215582386153)(null,4,20160906-212509,61.95159400693415572125)(null,2,20160906-212509,93.05912099305473237788)(null,5,20160906-212509,81.08637370113427387121)
Now it does process the first values 3, 20160906-212509, 80.224686 for record (null,3,20160906-212509,80.22468448052631637099) but ignores the rest. of 4 records. How can I make it go through all records here? I want the third column from all records!
Greetings
Re: Getting figures from spark streaming
Posted by Thunder Stumpges <th...@gmail.com>.
Just a guess, but doesn't the `.apply(0)' at the end of each of your print
statements take just the first one of the returned list?
On Wed, Sep 7, 2016 at 12:36 AM Ashok Kumar <as...@yahoo.com.invalid>
wrote:
> Any help on this warmly appreciated.
>
>
> On Tuesday, 6 September 2016, 21:31, Ashok Kumar
> <as...@yahoo.com.INVALID> wrote:
>
>
> Hello Gurus,
>
> I am creating some figures and feed them into Kafka and then spark
> streaming.
>
> It works OK but I have the following issue.
>
> For now as a test I sent 5 prices in each batch interval. In the loop code
> this is what is hapening
>
> dstream.foreachRDD { rdd =>
> val x= rdd.count
> i += 1
> println(s"====> rdd loop i is ${i}, number of lines is ${x} <======")
> if (x > 0) {
> println(s"================processing ${x} records=================")
> var words1 =
> rdd.map(_._2).map(_.split(',').view(0)).map(_.toInt).collect.apply(0)
> println (words1)
> var words2 =
> rdd.map(_._2).map(_.split(',').view(1)).map(_.toString).collect.apply(0)
> println (words2)
> var price =
> rdd.map(_._2).map(_.split(',').view(2)).map(_.toFloat).collect.apply(0)
> println (price)
> rdd.collect.foreach(println)
> }
> }
>
> My tuple looks like this
>
> // (null, "ID TIMESTAMP PRICE")
> // (null, "40,20160426-080924, 67.55738301621814598514")
>
> And this the sample output from the run
>
> ================processing 5 records=================
> 3
> 20160906-212509
> 80.224686
> (null,3,20160906-212509,80.22468448052631637099)
> (null,1,20160906-212509,60.40695324215582386153)
> (null,4,20160906-212509,61.95159400693415572125)
> (null,2,20160906-212509,93.05912099305473237788)
> (null,5,20160906-212509,81.08637370113427387121)
>
> Now it does process the first values 3, 20160906-212509, 80.224686 for
> record (null,3,20160906-212509,80.22468448052631637099)
> but ignores the rest. of 4 records. How can I make it go through all
> records here? I want the third column from all records!
>
> Greetings
>
>
>
>
>
>
Re: Getting figures from spark streaming
Posted by Ashok Kumar <as...@yahoo.com.INVALID>.
Any help on this warmly appreciated.
On Tuesday, 6 September 2016, 21:31, Ashok Kumar <as...@yahoo.com.INVALID> wrote:
Hello Gurus,
I am creating some figures and feed them into Kafka and then spark streaming.
It works OK but I have the following issue.
For now as a test I sent 5 prices in each batch interval. In the loop code this is what is hapening
dstream.foreachRDD { rdd => val x= rdd.count
i += 1 println(s"====> rdd loop i is ${i}, number of lines is ${x} <======") if (x > 0) { println(s"================processing ${x} records=================") var words1 = rdd.map(_._2).map(_.split(',').view(0)).map(_.toInt).collect.apply(0) println (words1) var words2 = rdd.map(_._2).map(_.split(',').view(1)).map(_.toString).collect.apply(0) println (words2) var price = rdd.map(_._2).map(_.split(',').view(2)).map(_.toFloat).collect.apply(0) println (price) rdd.collect.foreach(println) } }
My tuple looks like this
// (null, "ID TIMESTAMP PRICE")// (null, "40,20160426-080924, 67.55738301621814598514")
And this the sample output from the run
================processing 5 records=================320160906-21250980.224686(null,3,20160906-212509,80.22468448052631637099)(null,1,20160906-212509,60.40695324215582386153)(null,4,20160906-212509,61.95159400693415572125)(null,2,20160906-212509,93.05912099305473237788)(null,5,20160906-212509,81.08637370113427387121)
Now it does process the first values 3, 20160906-212509, 80.224686 for record (null,3,20160906-212509,80.22468448052631637099) but ignores the rest. of 4 records. How can I make it go through all records here? I want the third column from all records!
Greetings