You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Todd <bi...@163.com> on 2016/05/17 08:36:18 UTC

Does Structured Streaming support count(distinct) over all the streaming data?

Hi,
We have a requirement to do count(distinct) in a processing batch against all the streaming data(eg, last 24 hours' data),that is,when we do count(distinct),we actually want to compute distinct against last 24 hours' data.
Does structured streaming support this scenario?Thanks!

Re: Does Structured Streaming support count(distinct) over all the streaming data?

Posted by Sean Owen <so...@cloudera.com>.
Late to the thread, but, why is counting distinct elements over a
24-hour window not possible? you can certainly do it now, and I'd
presume it's possible with structured streaming with a window.

countByValueAndWindow should do it right? the keys (with non-zero
counts, I suppose) in a window are the distinct values from the stream
in that window. Your example looks right.

On Wed, May 18, 2016 at 12:17 AM, Mich Talebzadeh
<mi...@gmail.com> wrote:
>
> Ok What can be used here below
>
> //val countDistinctByValueAndWindow = price.filter(_ > 0.0).reduceByKey((t1, t2) -> t1).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
> //countDistinctByValueAndWindow.print()
>
>>> On 17 May 2016 at 20:02, Michael Armbrust <mi...@databricks.com> wrote:
>>>> In 2.0 you won't be able to do this.  The long term vision would be to make this possible, but a window will be required (like the 24 hours you suggest).
>>>>
>>>> On Tue, May 17, 2016 at 1:36 AM, Todd <bi...@163.com> wrote:
>>>>>
>>>>> Hi,
>>>>> We have a requirement to do count(distinct) in a processing batch against all the streaming data(eg, last 24 hours' data),that is,when we do count(distinct),we actually want to compute distinct against last 24 hours' data.
>>>>> Does structured streaming support this scenario?Thanks!

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re:Re: Does Structured Streaming support count(distinct) over all the streaming data?

Posted by Todd <bi...@163.com>.
Thanks you guys for the help.I will try






At 2016-05-18 07:17:08, "Mich Talebzadeh" <mi...@gmail.com> wrote:

Thanks Chris,


In a nutshell I don't think one can do that.


So let us see.  Here is my program that is looking for share prices > 95.9. It does work. It is pretty simple


import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.collection.mutable.ArrayBuffer
//
object CEP_AVG {
  def main(args: Array[String]) {
// Create a local StreamingContext with two working thread and batch interval of n seconds.
val sparkConf = new SparkConf().
             setAppName("CEP_AVG").
             setMaster("local[2]").
             set("spark.cores.max", "2").
             set("spark.streaming.concurrentJobs", "2").
             set("spark.driver.allowMultipleContexts", "true").
             set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(sparkConf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", "zookeeper.connect" -> "rhes564:2181", "group.id" -> "CEP_AVG" )
val topics = Set("newtopic")


val DStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
DStream.cache()
val lines = DStream.map(_._2)
val price = lines.map(_.split(',').view(2)).map(_.toDouble)

val windowLength = 4
val slidingInterval = 2
val countByValueAndWindow = price.filter(_ > 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
countByValueAndWindow.print()

//
//Now how I can get the distinct price values here?
//
//val countDistinctByValueAndWindow = price.filter(_ > 0.0).reduceByKey((t1, t2) -> t1).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
//countDistinctByValueAndWindow.print()

ssc.start()
ssc.awaitTermination()
//ssc.stop()
  }
}


Ok What can be used here below


//val countDistinctByValueAndWindow = price.filter(_ > 0.0).reduceByKey((t1, t2) -> t1).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
//countDistinctByValueAndWindow.print()


Let me know your thoughts?


Thanks







Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 



On 17 May 2016 at 23:47, Chris Fregly <ch...@fregly.com> wrote:

you can use HyperLogLog with Spark Streaming to accomplish this.


here is an example from my fluxcapacitor GitHub repo:


https://github.com/fluxcapacitor/pipeline/tree/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/rating/approx


here's an accompanying SlideShare presentation from one of my recent meetups (slides 70-83):


http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037



and a YouTube video for those that prefer video (starting at 32 mins into the video for your convenience):


https://youtu.be/wM9Z0PLx3cw?t=1922




On Tue, May 17, 2016 at 12:17 PM, Mich Talebzadeh <mi...@gmail.com> wrote:

Ok but how about something similar to


val countByValueAndWindow = price.filter(_ > 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))





Using a new count => countDistinctByValueAndWindow ?


val countDistinctByValueAndWindow = price.filter(_ > 95.0).countDistinctByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))




HTH



Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 



On 17 May 2016 at 20:02, Michael Armbrust <mi...@databricks.com> wrote:

In 2.0 you won't be able to do this.  The long term vision would be to make this possible, but a window will be required (like the 24 hours you suggest).


On Tue, May 17, 2016 at 1:36 AM, Todd <bi...@163.com> wrote:

Hi,
We have a requirement to do count(distinct) in a processing batch against all the streaming data(eg, last 24 hours' data),that is,when we do count(distinct),we actually want to compute distinct against last 24 hours' data.
Does structured streaming support this scenario?Thanks!









--

Chris Fregly
Research Scientist @ Flux Capacitor AI
"Bringing AI Back to the Future!"
San Francisco, CA
http://fluxcapacitor.com


Re: Does Structured Streaming support count(distinct) over all the streaming data?

Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks Chris,

In a nutshell I don't think one can do that.

So let us see.  Here is my program that is looking for share prices > 95.9.
It does work. It is pretty simple

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.collection.mutable.ArrayBuffer
//
object CEP_AVG {
  def main(args: Array[String]) {
// Create a local StreamingContext with two working thread and batch
interval of n seconds.
val sparkConf = new SparkConf().
             setAppName("CEP_AVG").
             setMaster("local[2]").
             set("spark.cores.max", "2").
             set("spark.streaming.concurrentJobs", "2").
             set("spark.driver.allowMultipleContexts", "true").
             set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(sparkConf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081",
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "CEP_AVG" )
val topics = Set("newtopic")

val DStream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topics)
DStream.cache()
val lines = DStream.map(_._2)
val price = lines.map(_.split(',').view(2)).map(_.toDouble)

val windowLength = 4
val slidingInterval = 2


*val countByValueAndWindow = price.filter(_ >
95.0).countByValueAndWindow(Seconds(windowLength),
Seconds(slidingInterval))countByValueAndWindow.print()*
//
//Now how I can get the distinct price values here?
//
//val countDistinctByValueAndWindow = price.filter(_ >
0.0).reduceByKey((t1, t2) ->
t1).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
//countDistinctByValueAndWindow.print()

ssc.start()
ssc.awaitTermination()
//ssc.stop()
  }
}
Ok What can be used here below

//val countDistinctByValueAndWindow = price.filter(_ >
0.0).reduceByKey((t1, t2) ->
t1).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
//countDistinctByValueAndWindow.print()

Let me know your thoughts?

Thanks



Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 17 May 2016 at 23:47, Chris Fregly <ch...@fregly.com> wrote:

> you can use HyperLogLog with Spark Streaming to accomplish this.
>
> here is an example from my fluxcapacitor GitHub repo:
>
>
> https://github.com/fluxcapacitor/pipeline/tree/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/rating/approx
>
> here's an accompanying SlideShare presentation from one of my recent
> meetups (slides 70-83):
>
>
> http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037
>
>
> <http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037>
> and a YouTube video for those that prefer video (starting at 32 mins into
> the video for your convenience):
>
> https://youtu.be/wM9Z0PLx3cw?t=1922
>
>
> On Tue, May 17, 2016 at 12:17 PM, Mich Talebzadeh <
> mich.talebzadeh@gmail.com> wrote:
>
>> Ok but how about something similar to
>>
>> val countByValueAndWindow = price.filter(_ >
>> 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
>>
>>
>> Using a new count => c*ountDistinctByValueAndWindow ?*
>>
>> val countDistinctByValueAndWindow = price.filter(_ >
>> 95.0).countDistinctByValueAndWindow(Seconds(windowLength),
>> Seconds(slidingInterval))
>>
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 17 May 2016 at 20:02, Michael Armbrust <mi...@databricks.com> wrote:
>>
>>> In 2.0 you won't be able to do this.  The long term vision would be to
>>> make this possible, but a window will be required (like the 24 hours you
>>> suggest).
>>>
>>> On Tue, May 17, 2016 at 1:36 AM, Todd <bi...@163.com> wrote:
>>>
>>>> Hi,
>>>> We have a requirement to do count(distinct) in a processing batch
>>>> against all the streaming data(eg, last 24 hours' data),that is,when we do
>>>> count(distinct),we actually want to compute distinct against last 24 hours'
>>>> data.
>>>> Does structured streaming support this scenario?Thanks!
>>>>
>>>
>>>
>>
>
> --
> *Chris Fregly*
> Research Scientist @ Flux Capacitor AI
> "Bringing AI Back to the Future!"
> San Francisco, CA
> http://fluxcapacitor.com
>

Re: Does Structured Streaming support count(distinct) over all the streaming data?

Posted by Chris Fregly <ch...@fregly.com>.
you can use HyperLogLog with Spark Streaming to accomplish this.

here is an example from my fluxcapacitor GitHub repo:

https://github.com/fluxcapacitor/pipeline/tree/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/rating/approx

here's an accompanying SlideShare presentation from one of my recent
meetups (slides 70-83):

http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037

<http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037>
and a YouTube video for those that prefer video (starting at 32 mins into
the video for your convenience):

https://youtu.be/wM9Z0PLx3cw?t=1922


On Tue, May 17, 2016 at 12:17 PM, Mich Talebzadeh <mich.talebzadeh@gmail.com
> wrote:

> Ok but how about something similar to
>
> val countByValueAndWindow = price.filter(_ >
> 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
>
>
> Using a new count => c*ountDistinctByValueAndWindow ?*
>
> val countDistinctByValueAndWindow = price.filter(_ >
> 95.0).countDistinctByValueAndWindow(Seconds(windowLength),
> Seconds(slidingInterval))
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 17 May 2016 at 20:02, Michael Armbrust <mi...@databricks.com> wrote:
>
>> In 2.0 you won't be able to do this.  The long term vision would be to
>> make this possible, but a window will be required (like the 24 hours you
>> suggest).
>>
>> On Tue, May 17, 2016 at 1:36 AM, Todd <bi...@163.com> wrote:
>>
>>> Hi,
>>> We have a requirement to do count(distinct) in a processing batch
>>> against all the streaming data(eg, last 24 hours' data),that is,when we do
>>> count(distinct),we actually want to compute distinct against last 24 hours'
>>> data.
>>> Does structured streaming support this scenario?Thanks!
>>>
>>
>>
>

-- 
*Chris Fregly*
Research Scientist @ Flux Capacitor AI
"Bringing AI Back to the Future!"
San Francisco, CA
http://fluxcapacitor.com

Re: Does Structured Streaming support count(distinct) over all the streaming data?

Posted by Mich Talebzadeh <mi...@gmail.com>.
Ok but how about something similar to

val countByValueAndWindow = price.filter(_ >
95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))


Using a new count => c*ountDistinctByValueAndWindow ?*

val countDistinctByValueAndWindow = price.filter(_ >
95.0).countDistinctByValueAndWindow(Seconds(windowLength),
Seconds(slidingInterval))


HTH

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 17 May 2016 at 20:02, Michael Armbrust <mi...@databricks.com> wrote:

> In 2.0 you won't be able to do this.  The long term vision would be to
> make this possible, but a window will be required (like the 24 hours you
> suggest).
>
> On Tue, May 17, 2016 at 1:36 AM, Todd <bi...@163.com> wrote:
>
>> Hi,
>> We have a requirement to do count(distinct) in a processing batch against
>> all the streaming data(eg, last 24 hours' data),that is,when we do
>> count(distinct),we actually want to compute distinct against last 24 hours'
>> data.
>> Does structured streaming support this scenario?Thanks!
>>
>
>

Re: Does Structured Streaming support count(distinct) over all the streaming data?

Posted by Michael Armbrust <mi...@databricks.com>.
In 2.0 you won't be able to do this.  The long term vision would be to make
this possible, but a window will be required (like the 24 hours you
suggest).

On Tue, May 17, 2016 at 1:36 AM, Todd <bi...@163.com> wrote:

> Hi,
> We have a requirement to do count(distinct) in a processing batch against
> all the streaming data(eg, last 24 hours' data),that is,when we do
> count(distinct),we actually want to compute distinct against last 24 hours'
> data.
> Does structured streaming support this scenario?Thanks!
>