You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sivakumaran S <si...@me.com> on 2016/06/14 20:29:32 UTC

choice of RDD function

Dear friends,

I have set up Kafka 0.9.0.0, Spark 1.6.1 and Scala 2.10. My source is sending a json string periodically to a topic in kafka. I am able to consume this topic using Spark Streaming and print it. The schema of the source json is as follows: 

{ “id”: 121156, “ht”: 42, “rotor_rpm”: 180, “temp”: 14.2, “time”:146593512}
{ “id”: 121157, “ht”: 18, “rotor_rpm”: 110, “temp”: 12.2, “time”: 146593512}
{ “id”: 121156, “ht”: 36, “rotor_rpm”: 160, “temp”: 14.4, “time”: 146593513}
{ “id”: 121157, “ht”: 19, “rotor_rpm”: 120, “temp”: 12.0, “time”: 146593513}
and so on.


In Spark streaming, I want to find the average of “ht” (height), “rotor_rpm” and “temp” for each “id". I also want to find the max and min of the same fields in the time window (300 seconds in this case). 

Q1.	Can this be done using plain RDD and streaming functions or does it require Dataframes/SQL? There may be more fields added to the json at a later stage. There will be a lot of “id”s at a later stage.

Q2.	If it can be done using either, which one would render to be more efficient and fast?

As of now, the entire set up is in a single laptop. 

Thanks in advance.

Regards,

Siva

Re: choice of RDD function

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi,

That's one of my concerns with the code. What concerned me the most is
that the RDD(s) were converted to DataFrames only to registerTempTable
and execute SQLs. I think it'd have better performance if DataFrame
operators were used instead. Wish I had numbers.

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Wed, Jun 15, 2016 at 11:32 PM, Cody Koeninger <co...@koeninger.org> wrote:
> Doesn't that result in consuming each RDD twice, in order to infer the
> json schema?
>
> On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S <si...@me.com> wrote:
>> Of course :)
>>
>> object sparkStreaming {
>>   def main(args: Array[String]) {
>>     StreamingExamples.setStreamingLogLevels() //Set reasonable logging
>> levels for streaming if the user has not configured log4j.
>>     val topics = "test"
>>     val brokers = "localhost:9092"
>>     val topicsSet = topics.split(",").toSet
>>     val sparkConf = new
>> SparkConf().setAppName("KafkaDroneCalc").setMaster("local")
>> //spark://localhost:7077
>>     val sc = new SparkContext(sparkConf)
>>     val ssc = new StreamingContext(sc, Seconds(30))
>>     val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>>     val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
>>     val lines = messages.map(_._2)
>>     val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>     lines.foreachRDD( rdd => {
>>       val df = sqlContext.read.json(rdd)
>>       df.registerTempTable(“drone")
>>       sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm),
>> AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show()
>>     })
>>     ssc.start()
>>     ssc.awaitTermination()
>>   }
>> }
>>
>> I haven’t checked long running performance though.
>>
>> Regards,
>>
>> Siva
>>
>> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>> Hi,
>>
>> Good to hear so! Mind sharing a few snippets of your solution?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <si...@me.com> wrote:
>>
>> Thanks Jacek,
>>
>> Job completed!! :) Just used data frames and sql query. Very clean and
>> functional code.
>>
>> Siva
>>
>> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>> mapWithState
>>
>>
>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: choice of RDD function

Posted by Sivakumaran S <si...@me.com>.
Dear Jacek and Cody,


I receive a stream of JSON (exactly this much: 4 json objects) once every 30 seconds from Kafka as follows (I have changed my data source to include more fields)
: 
{"windspeed":4.23,"pressure":1008.39,"location":"Dundee","latitude":56.5,"longitude":-2.96667,"id":2650752,"humidity":97.0,"temp":12.54,"winddirection":12.0003}
{"windspeed":4.23,"pressure":1008.39,"location":"Saint Andrews","latitude":56.338711,"longitude":-2.79902,"id":2638864,"humidity":97.0,"temp":12.54,"winddirection":12.0003}
{"windspeed":5.53,"pressure":1016.25,"location":"Arbroath","latitude":56.563171,"longitude":-2.58736,"id":2657215,"humidity":96.0,"temp":11.59,"winddirection":9.50031}
{"windspeed":4.73,"pressure":994.0,"location":"Aberdeen","latitude":57.143688,"longitude":-2.09814,"id":2657832,"humidity":1.0,"temp":0.0,"winddirection":357.0}
{"windspeed":6.13,"pressure":994.0,"location":"Peterhead","latitude":57.50584,"longitude":-1.79806,"id":2640351,"humidity":1.0,"temp":0.0,"winddirection":8.50031}

In my Spark app, I have set the batch duration as 60 seconds. Now, as per the 1.6.1 documentation, "Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. This conversion can be done using SQLContext.read.json() on either an RDD of String, or a JSON file.”. But what both of you pointed out is correct, it consumes the RDD twice, i do not understand why. Below is the snap of the DAG. 

I do not need stateful calculations and I need to write this result to a database at a later stage. Any input to improve this solution is appreciated. 




Regards,

Siva

> On 16-Jun-2016, at 12:48 PM, Sivakumaran S <si...@me.com> wrote:
> 
> Hi Jacek and Cody,
> 
> First of all, thanks for helping me out.
> 
> I started with using combineByKey while testing with just one field. Of course it worked fine, but I was worried that the code would become unreadable if there were many fields. Which is why I shifted to sqlContext because the code is comprehensible. Let me work out the stream statistics and update you in a while. 
> 
> 
> 
> Regards,
> 
> Siva
> 
> 
> 
>> On 16-Jun-2016, at 11:29 AM, Jacek Laskowski <ja...@japila.pl> wrote:
>> 
>> Rather
>> 
>> val df = sqlContext.read.json(rdd)
>> 
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>> 
>> 
>> On Wed, Jun 15, 2016 at 11:55 PM, Sivakumaran S <si...@me.com> wrote:
>>> Cody,
>>> 
>>> Are you referring to the  val lines = messages.map(_._2)?
>>> 
>>> Regards,
>>> 
>>> Siva
>>> 
>>>> On 15-Jun-2016, at 10:32 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>>> 
>>>> Doesn't that result in consuming each RDD twice, in order to infer the
>>>> json schema?
>>>> 
>>>> On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S <si...@me.com> wrote:
>>>>> Of course :)
>>>>> 
>>>>> object sparkStreaming {
>>>>> def main(args: Array[String]) {
>>>>>  StreamingExamples.setStreamingLogLevels() //Set reasonable logging
>>>>> levels for streaming if the user has not configured log4j.
>>>>>  val topics = "test"
>>>>>  val brokers = "localhost:9092"
>>>>>  val topicsSet = topics.split(",").toSet
>>>>>  val sparkConf = new
>>>>> SparkConf().setAppName("KafkaDroneCalc").setMaster("local")
>>>>> //spark://localhost:7077
>>>>>  val sc = new SparkContext(sparkConf)
>>>>>  val ssc = new StreamingContext(sc, Seconds(30))
>>>>>  val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>>>>>  val messages = KafkaUtils.createDirectStream[String, String,
>>>>> StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
>>>>>  val lines = messages.map(_._2)
>>>>>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>>>  lines.foreachRDD( rdd => {
>>>>>    val df = sqlContext.read.json(rdd)
>>>>>    df.registerTempTable(“drone")
>>>>>    sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm),
>>>>> AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show()
>>>>>  })
>>>>>  ssc.start()
>>>>>  ssc.awaitTermination()
>>>>> }
>>>>> }
>>>>> 
>>>>> I haven’t checked long running performance though.
>>>>> 
>>>>> Regards,
>>>>> 
>>>>> Siva
>>>>> 
>>>>> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> Good to hear so! Mind sharing a few snippets of your solution?
>>>>> 
>>>>> Pozdrawiam,
>>>>> Jacek Laskowski
>>>>> ----
>>>>> https://medium.com/@jaceklaskowski/
>>>>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>>>>> Follow me at https://twitter.com/jaceklaskowski
>>>>> 
>>>>> 
>>>>> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <si...@me.com> wrote:
>>>>> 
>>>>> Thanks Jacek,
>>>>> 
>>>>> Job completed!! :) Just used data frames and sql query. Very clean and
>>>>> functional code.
>>>>> 
>>>>> Siva
>>>>> 
>>>>> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>>>> 
>>>>> mapWithState
>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>> 
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>> 
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
> 


Re: choice of RDD function

Posted by Sivakumaran S <si...@me.com>.
Hi Jacek and Cody,

First of all, thanks for helping me out.

I started with using combineByKey while testing with just one field. Of course it worked fine, but I was worried that the code would become unreadable if there were many fields. Which is why I shifted to sqlContext because the code is comprehensible. Let me work out the stream statistics and update you in a while. 



Regards,

Siva



> On 16-Jun-2016, at 11:29 AM, Jacek Laskowski <ja...@japila.pl> wrote:
> 
> Rather
> 
> val df = sqlContext.read.json(rdd)
> 
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
> 
> 
> On Wed, Jun 15, 2016 at 11:55 PM, Sivakumaran S <si...@me.com> wrote:
>> Cody,
>> 
>> Are you referring to the  val lines = messages.map(_._2)?
>> 
>> Regards,
>> 
>> Siva
>> 
>>> On 15-Jun-2016, at 10:32 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>> 
>>> Doesn't that result in consuming each RDD twice, in order to infer the
>>> json schema?
>>> 
>>> On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S <si...@me.com> wrote:
>>>> Of course :)
>>>> 
>>>> object sparkStreaming {
>>>> def main(args: Array[String]) {
>>>>   StreamingExamples.setStreamingLogLevels() //Set reasonable logging
>>>> levels for streaming if the user has not configured log4j.
>>>>   val topics = "test"
>>>>   val brokers = "localhost:9092"
>>>>   val topicsSet = topics.split(",").toSet
>>>>   val sparkConf = new
>>>> SparkConf().setAppName("KafkaDroneCalc").setMaster("local")
>>>> //spark://localhost:7077
>>>>   val sc = new SparkContext(sparkConf)
>>>>   val ssc = new StreamingContext(sc, Seconds(30))
>>>>   val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>>>>   val messages = KafkaUtils.createDirectStream[String, String,
>>>> StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
>>>>   val lines = messages.map(_._2)
>>>>   val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>>   lines.foreachRDD( rdd => {
>>>>     val df = sqlContext.read.json(rdd)
>>>>     df.registerTempTable(“drone")
>>>>     sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm),
>>>> AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show()
>>>>   })
>>>>   ssc.start()
>>>>   ssc.awaitTermination()
>>>> }
>>>> }
>>>> 
>>>> I haven’t checked long running performance though.
>>>> 
>>>> Regards,
>>>> 
>>>> Siva
>>>> 
>>>> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> Good to hear so! Mind sharing a few snippets of your solution?
>>>> 
>>>> Pozdrawiam,
>>>> Jacek Laskowski
>>>> ----
>>>> https://medium.com/@jaceklaskowski/
>>>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>>>> Follow me at https://twitter.com/jaceklaskowski
>>>> 
>>>> 
>>>> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <si...@me.com> wrote:
>>>> 
>>>> Thanks Jacek,
>>>> 
>>>> Job completed!! :) Just used data frames and sql query. Very clean and
>>>> functional code.
>>>> 
>>>> Siva
>>>> 
>>>> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>>> 
>>>> mapWithState
>>>> 
>>>> 
>>>> 
>> 
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: choice of RDD function

Posted by Jacek Laskowski <ja...@japila.pl>.
Rather

val df = sqlContext.read.json(rdd)

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Wed, Jun 15, 2016 at 11:55 PM, Sivakumaran S <si...@me.com> wrote:
> Cody,
>
> Are you referring to the  val lines = messages.map(_._2)?
>
> Regards,
>
> Siva
>
>> On 15-Jun-2016, at 10:32 PM, Cody Koeninger <co...@koeninger.org> wrote:
>>
>> Doesn't that result in consuming each RDD twice, in order to infer the
>> json schema?
>>
>> On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S <si...@me.com> wrote:
>>> Of course :)
>>>
>>> object sparkStreaming {
>>>  def main(args: Array[String]) {
>>>    StreamingExamples.setStreamingLogLevels() //Set reasonable logging
>>> levels for streaming if the user has not configured log4j.
>>>    val topics = "test"
>>>    val brokers = "localhost:9092"
>>>    val topicsSet = topics.split(",").toSet
>>>    val sparkConf = new
>>> SparkConf().setAppName("KafkaDroneCalc").setMaster("local")
>>> //spark://localhost:7077
>>>    val sc = new SparkContext(sparkConf)
>>>    val ssc = new StreamingContext(sc, Seconds(30))
>>>    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>>>    val messages = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
>>>    val lines = messages.map(_._2)
>>>    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>    lines.foreachRDD( rdd => {
>>>      val df = sqlContext.read.json(rdd)
>>>      df.registerTempTable(“drone")
>>>      sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm),
>>> AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show()
>>>    })
>>>    ssc.start()
>>>    ssc.awaitTermination()
>>>  }
>>> }
>>>
>>> I haven’t checked long running performance though.
>>>
>>> Regards,
>>>
>>> Siva
>>>
>>> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>>
>>> Hi,
>>>
>>> Good to hear so! Mind sharing a few snippets of your solution?
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> ----
>>> https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <si...@me.com> wrote:
>>>
>>> Thanks Jacek,
>>>
>>> Job completed!! :) Just used data frames and sql query. Very clean and
>>> functional code.
>>>
>>> Siva
>>>
>>> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>>
>>> mapWithState
>>>
>>>
>>>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: choice of RDD function

Posted by Sivakumaran S <si...@me.com>.
Cody, 

Are you referring to the  val lines = messages.map(_._2)? 

Regards,

Siva

> On 15-Jun-2016, at 10:32 PM, Cody Koeninger <co...@koeninger.org> wrote:
> 
> Doesn't that result in consuming each RDD twice, in order to infer the
> json schema?
> 
> On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S <si...@me.com> wrote:
>> Of course :)
>> 
>> object sparkStreaming {
>>  def main(args: Array[String]) {
>>    StreamingExamples.setStreamingLogLevels() //Set reasonable logging
>> levels for streaming if the user has not configured log4j.
>>    val topics = "test"
>>    val brokers = "localhost:9092"
>>    val topicsSet = topics.split(",").toSet
>>    val sparkConf = new
>> SparkConf().setAppName("KafkaDroneCalc").setMaster("local")
>> //spark://localhost:7077
>>    val sc = new SparkContext(sparkConf)
>>    val ssc = new StreamingContext(sc, Seconds(30))
>>    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>>    val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
>>    val lines = messages.map(_._2)
>>    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>    lines.foreachRDD( rdd => {
>>      val df = sqlContext.read.json(rdd)
>>      df.registerTempTable(“drone")
>>      sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm),
>> AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show()
>>    })
>>    ssc.start()
>>    ssc.awaitTermination()
>>  }
>> }
>> 
>> I haven’t checked long running performance though.
>> 
>> Regards,
>> 
>> Siva
>> 
>> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>> 
>> Hi,
>> 
>> Good to hear so! Mind sharing a few snippets of your solution?
>> 
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>> 
>> 
>> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <si...@me.com> wrote:
>> 
>> Thanks Jacek,
>> 
>> Job completed!! :) Just used data frames and sql query. Very clean and
>> functional code.
>> 
>> Siva
>> 
>> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>> 
>> mapWithState
>> 
>> 
>> 


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: choice of RDD function

Posted by Cody Koeninger <co...@koeninger.org>.
Doesn't that result in consuming each RDD twice, in order to infer the
json schema?

On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S <si...@me.com> wrote:
> Of course :)
>
> object sparkStreaming {
>   def main(args: Array[String]) {
>     StreamingExamples.setStreamingLogLevels() //Set reasonable logging
> levels for streaming if the user has not configured log4j.
>     val topics = "test"
>     val brokers = "localhost:9092"
>     val topicsSet = topics.split(",").toSet
>     val sparkConf = new
> SparkConf().setAppName("KafkaDroneCalc").setMaster("local")
> //spark://localhost:7077
>     val sc = new SparkContext(sparkConf)
>     val ssc = new StreamingContext(sc, Seconds(30))
>     val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>     val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
>     val lines = messages.map(_._2)
>     val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>     lines.foreachRDD( rdd => {
>       val df = sqlContext.read.json(rdd)
>       df.registerTempTable(“drone")
>       sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm),
> AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show()
>     })
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
>
> I haven’t checked long running performance though.
>
> Regards,
>
> Siva
>
> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>
> Hi,
>
> Good to hear so! Mind sharing a few snippets of your solution?
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <si...@me.com> wrote:
>
> Thanks Jacek,
>
> Job completed!! :) Just used data frames and sql query. Very clean and
> functional code.
>
> Siva
>
> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>
> mapWithState
>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: choice of RDD function

Posted by Sivakumaran S <si...@me.com>.
Of course :)

object sparkStreaming {
  def main(args: Array[String]) {
    StreamingExamples.setStreamingLogLevels() //Set reasonable logging levels for streaming if the user has not configured log4j.
    val topics = "test"
    val brokers = "localhost:9092"
    val topicsSet = topics.split(",").toSet
    val sparkConf = new SparkConf().setAppName("KafkaDroneCalc").setMaster("local") //spark://localhost:7077
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(30))
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
    val lines = messages.map(_._2)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    lines.foreachRDD( rdd => {
      val df = sqlContext.read.json(rdd)
      df.registerTempTable(“drone")
      sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm), AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show()
    })
    ssc.start()
    ssc.awaitTermination()
  }
}
I haven’t checked long running performance though. 

Regards,

Siva

> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski <ja...@japila.pl> wrote:
> 
> Hi,
> 
> Good to hear so! Mind sharing a few snippets of your solution?
> 
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
> 
> 
> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <si...@me.com> wrote:
>> Thanks Jacek,
>> 
>> Job completed!! :) Just used data frames and sql query. Very clean and
>> functional code.
>> 
>> Siva
>> 
>> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>> 
>> mapWithState
>> 
>> 


Re: choice of RDD function

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi,

Good to hear so! Mind sharing a few snippets of your solution?

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <si...@me.com> wrote:
> Thanks Jacek,
>
> Job completed!! :) Just used data frames and sql query. Very clean and
> functional code.
>
> Siva
>
> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>
> mapWithState
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: choice of RDD function

Posted by Sivakumaran S <si...@me.com>.
Thanks Jacek,

Job completed!! :) Just used data frames and sql query. Very clean and functional code.

Siva

> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote:
> 
> mapWithState


Re: choice of RDD function

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi,

Ad Q1, yes. See stateful operators like mapWithState and windows.

Ad Q2, RDDs should be fine (and available out of the box), but I'd give
Datasets a try too since they're .toDF away.

Jacek
On 14 Jun 2016 10:29 p.m., "Sivakumaran S" <si...@me.com> wrote:

Dear friends,

I have set up Kafka 0.9.0.0, Spark 1.6.1 and Scala 2.10. My source is
sending a json string periodically to a topic in kafka. I am able to
consume this topic using Spark Streaming and print it. The schema of the
source json is as follows:

{ “id”: 121156, “ht”: 42, “rotor_rpm”: 180, “temp”: 14.2, “time”:146593512}
{ “id”: 121157, “ht”: 18, “rotor_rpm”: 110, “temp”: 12.2, “time”: 146593512}
{ “id”: 121156, “ht”: 36, “rotor_rpm”: 160, “temp”: 14.4, “time”: 146593513}
{ “id”: 121157, “ht”: 19, “rotor_rpm”: 120, “temp”: 12.0, “time”: 146593513}
and so on.


In Spark streaming, I want to find the *average* of “ht” (height),
“rotor_rpm” and “temp” for each “id". I also want to find the max and min
of the same fields in the time window (300 seconds in this case).

Q1. Can this be done using plain RDD and streaming functions or does it
require Dataframes/SQL? There may be more fields added to the json at a
later stage. There will be a lot of “id”s at a later stage.

Q2. If it can be done using either, which one would render to be more
efficient and fast?

As of now, the entire set up is in a single laptop.

Thanks in advance.

Regards,

Siva