You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2016/05/07 15:09:43 UTC

Finding max value in spark streaming sliding window

Hi,

What is the easiest way of finding max(price) in code below

object CEP_AVG {
  def main(args: Array[String]) {
// Create a local StreamingContext with two working thread and batch
interval of 10 seconds.
val sparkConf = new SparkConf().
             setAppName("CEP_AVG").
             setMaster("local[12]").
             set("spark.cores.max", "2").
             set("spark.streaming.concurrentJobs", "2").
             set("spark.driver.allowMultipleContexts", "true").
             set("spark.hadoop.validateOutputSpecs", "false")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081",
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "CEP_AVG" )
val topics = Set("newtopic")
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topics)
dstream.cache()
val lines = dstream.map(_._2)
*val price = lines.map(_.split(',').view(2))*

val windowLength = 4
val slidingInterval = 2

//
// In Here I can pick up all prices > 99.8 bit I also wamt max(price)
within the sliding window as well
//

val countByValueAndWindow = price.filter(_ >
"99.8").countByValueAndWindow(Seconds(windowLength),
Seconds(slidingInterval))
countByValueAndWindow.print()
//
ssc.start()
ssc.awaitTermination()
//ssc.stop()
  }
}

Thanks

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com