You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by buremba <em...@gmail.com> on 2014/04/27 14:40:49 UTC

Any advice for using big spark.cleaner.delay value in Spark Streaming?

It seems default value for spark.cleaner.delay is 3600 seconds but I need to
be able to count things on daily, weekly or even monthly based.

I suppose the aim of DStream batches and spark.cleaner.delay is to avoid
space issues (running out of memory etc.). I usually use HyperLogLog for
counting unique things to save space, and AFAIK, the other metrics are
simply long values which doesn't require much space.

When I start learning Spark Streaming it really confused me because in my
first "Hello World" example all I wanted is to count all events processed by
Spark Streaming. DStream batches are nice but when I need simple counting
operations it becomes complex. Since Spark Streaming creates new DStreams
for each interval, I needed to merge them in a single DStream so I used
updateStateByKey() to generate a StateDStream. I seems it works now but I'm
not sure whether it's efficient or not because I all need is a single global
counter but now Spark has counters for all 2 seconds intervals plus a global
counter for StateDStream.

I don't have any specific purpose like "Show me this type of unique things
for last 10 minutes", instead I need to be able to count things in a large
scale; it can be both 10 minutes or 1 month. I create pre-aggregation rules
on the fly and when I need simple monthly based counter, Spark seems
overkill to me for now.

Do you have any advice for me to use efficiently using Spark Streaming?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-advice-for-using-big-spark-cleaner-delay-value-in-Spark-Streaming-tp4895.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Any advice for using big spark.cleaner.delay value in Spark Streaming?

Posted by Tathagata Das <ta...@gmail.com>.
Whatever is inside the mapPartition get executed on workers. If that
mapPartition function refers to a global variable in the driver, then that
variable get serialized and sent to the workers as well. So the hll
(defined in lline 63) is an empty HyperLogLogMonoid, that gets serialized
and sent to workers to execute the mapPartition in lines 67-69. In each
batch of data, each user id is converted to a HLL object (using the empty
hll object) and then the HLL objects from all the partitions are reduced
using the "+" in line 69 to a single HLL (i.e. a RDD  containing one
element, which is the HLL of the batch).

The subsequent foreachRDD at line 73 get that HLL from the workers to the
driver and merges it with the running globalHll.

I agree, there is probably a better way/ more intuitive way to write this
example. :)

TD


On Wed, Apr 30, 2014 at 3:45 PM, buremba <em...@gmail.com> wrote:

> Thanks for your reply. Sorry for the late response, I wanted to do some
> tests
> before writing back.
>
> The counting part works similar to your advice. I specify a minimum
> interval
> like 1 minute, in each hour, day etc. it sums all counters of the current
> children intervals.
>
> However when I want to "count unique visitors of the month" things get much
> more complex. I need to merge 30 sets which contains visitor id's and each
> of them has more than a few hundred thousands of elements. Merging sets may
> be still the best option rather than keeping another Set for last month
> though, however I'm not sure because when there are many intersections it
> may be inefficient.
>
> BTW, I have one more question. The HLL example in repository seems
> confusing
> to me. How Spark handles global variable usages in mapPartitions method?
> (
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala#L68
> )
> I'm also a newbie but I thought the map and mapPartitions methods are
> similar to Hadoop's map methods so when we run the example on a cluster how
> an external node reaches a global variable in a single node? Does Spark
> replicate HyperLogLogMonoid instances across the cluster?
>
> Thanks,
> Burak Emre
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Any-advice-for-using-big-spark-cleaner-delay-value-in-Spark-Streaming-tp4895p5131.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: Any advice for using big spark.cleaner.delay value in Spark Streaming?

Posted by buremba <em...@gmail.com>.
Thanks for your reply. Sorry for the late response, I wanted to do some tests
before writing back.

The counting part works similar to your advice. I specify a minimum interval
like 1 minute, in each hour, day etc. it sums all counters of the current
children intervals.

However when I want to "count unique visitors of the month" things get much
more complex. I need to merge 30 sets which contains visitor id's and each
of them has more than a few hundred thousands of elements. Merging sets may
be still the best option rather than keeping another Set for last month
though, however I'm not sure because when there are many intersections it
may be inefficient.

BTW, I have one more question. The HLL example in repository seems confusing
to me. How Spark handles global variable usages in mapPartitions method?
(https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala#L68) 
I'm also a newbie but I thought the map and mapPartitions methods are
similar to Hadoop's map methods so when we run the example on a cluster how
an external node reaches a global variable in a single node? Does Spark
replicate HyperLogLogMonoid instances across the cluster?

Thanks,
Burak Emre



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Any-advice-for-using-big-spark-cleaner-delay-value-in-Spark-Streaming-tp4895p5131.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Any advice for using big spark.cleaner.delay value in Spark Streaming?

Posted by Tathagata Das <ta...@gmail.com>.
Hello,

If you want to do aggregations like count that spans across days, weeks or
months, AND do not want the result in real-time, then Spark Streaming
probably not the best thing to use. You probably should store all the data
in a data store (HDFS file or database) and then use Spark job / SQL
queries to do the counting. Spark Streaming is most useful when you want
the processing results based on incoming data streams within seconds of
receiving the data. In case, you want to do aggregations across a day's
data and do it in real time and continuously (e.g. every 5 second, count
records received in last 1 day), then you probably have to do something a
little bit smarter - have per-10-minute / per-hour counts, which gets
continuously together with the latest partial-hour counts.

And regarding the cleaner setting, it should be according to the
computation. If you are using window operations that uses data in the last
30 minutes, then the cleaner TTL should be more than 30 minutes. The
default of one hour should work fine, unless you need to use data that is
more than an hour old. If that is indeed necessary, consider using
(almost-to-be-released) Spark 1.0. That eliminates the requirement of
setting cleaner TTL for Spark Streaming, because Spark core has been made
smart enough to do GC based clean up of unused RDDs and shuffle files.

Regarding the second part, I am not sure what you meant by " Spark
Streaming creates new DStreams for each interval".  Spark Streaming creates
RDDs in each interval. And if you want to count all records received by
Spark Streaming over time you can do something like this.


// variable in the driver
var numRecordsReceivedTillNow: Long = 0


yourDStream.foreachRDD { rdd =>
  val numRecordsInBatch = rdd.count
  // increment the counter in the driver with the count in each batch /
each RDD
  numRecordsReceivedTillNow += numRecordsInBatch
}

Hope this helps!

TD



On Sun, Apr 27, 2014 at 5:40 AM, buremba <em...@gmail.com> wrote:

> It seems default value for spark.cleaner.delay is 3600 seconds but I need
> to
> be able to count things on daily, weekly or even monthly based.
>
> I suppose the aim of DStream batches and spark.cleaner.delay is to avoid
> space issues (running out of memory etc.). I usually use HyperLogLog for
> counting unique things to save space, and AFAIK, the other metrics are
> simply long values which doesn't require much space.
>
> When I start learning Spark Streaming it really confused me because in my
> first "Hello World" example all I wanted is to count all events processed
> by
> Spark Streaming. DStream batches are nice but when I need simple counting
> operations it becomes complex. Since Spark Streaming creates new DStreams
> for each interval, I needed to merge them in a single DStream so I used
> updateStateByKey() to generate a StateDStream. I seems it works now but I'm
> not sure whether it's efficient or not because I all need is a single
> global
> counter but now Spark has counters for all 2 seconds intervals plus a
> global
> counter for StateDStream.
>
> I don't have any specific purpose like "Show me this type of unique things
> for last 10 minutes", instead I need to be able to count things in a large
> scale; it can be both 10 minutes or 1 month. I create pre-aggregation rules
> on the fly and when I need simple monthly based counter, Spark seems
> overkill to me for now.
>
> Do you have any advice for me to use efficiently using Spark Streaming?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Any-advice-for-using-big-spark-cleaner-delay-value-in-Spark-Streaming-tp4895.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>