You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by xargsgrep <ah...@gmail.com> on 2014/04/17 22:24:52 UTC

Valid spark streaming use case?

Hi, I'm completely new to Spark streaming (and Spark) and have been reading
up on it and trying out various examples the past few days. I have a
particular use case which I think it would work well for, but I wanted to
put it out there and get some feedback on whether or not it actually would.
The use case is:

We have web tracking data continuously coming in from a pool of web servers.
For simplicity, let's just say the data is text lines with a known set of
fields, eg: "timestamp userId domain ...". What I want to do is:
1. group this continuous stream of data by "userId:domain", and
2. when the latest timestamp in each group is older than a certain
threshold, persist the results to a DB

#1 is straightforward and there are plenty of examples showing how to do it.
However, I'm not sure how I would go about doing #2, or if that's something
I can even do with spark because as far as I can tell it operates on sliding
windows. I really just want to continue to accumulate these groups of
"userId:domain" for all time (without specifying a window) and then roll
them up and flush them once no new data has come in for a group after a
certain amount of time. Would the updateStateByKey function allow me to do
this somehow?

Any help would be appreciated.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Valid-spark-streaming-use-case-tp4410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Valid spark streaming use case?

Posted by xargsgrep <ah...@gmail.com>.
Great, this should give me enough to go on. Appreciate the help!



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Valid-spark-streaming-use-case-tp4410p4507.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Valid spark streaming use case?

Posted by Tathagata Das <ta...@gmail.com>.
Regarding memory usage, you can configure Spark's memory fraction such that
persisted state RDDs fall out to disk and does crash the JVM. Also, the
state RDDs are periodically checkpointed HDFS for better recoverability.

But this seems like a pretty involved usecase that needs keeping around all
the records seen in the last 30 minutes (until flushed) within the state.
The updateFunction is run against all the key, even if there are no new
values for that key. So you can flush out values for keys that have not
been updated for 30 minutes. Also there are alternative versions of
updateStateByKey, that allows you do this more efficiently (e.g., process
whole partition at a time, and flush all old values together). Take a look
at those.

TD



On Fri, Apr 18, 2014 at 8:49 AM, xargsgrep <ah...@gmail.com> wrote:

> Thanks, I played around with that example and had some followup questions.
>
> 1. The only way I was able to accumulate data per-key was to actually store
> all the data in the state, not just the timestamp (see example below).
> Otherwise I don't have access to data older than the batchDuration of the
> StreamingContext. This creates a concern about memory usage and what would
> happen if a node crashes, and how partitioning and replication work across
> spark nodes. Does it store the state on disk ever? Again, what I want to do
> is aggregate these sets of text lines by key and when a key has been
> "inactive" (ie, no new data has been received for that key) for a certain
> amount of time (eg 30 minutes), then finally save them somewhere and remove
> them from the state.
>
> 2. Is the update function called only for keys that are in the current
> batchDuration's stream or for all keys that exist? If it's the former, how
> can I check the timestamp of keys from an older batch that never appear in
> the stream again?
>
> Example:
> batchDuration = 10 minutes
> timestampThreshold = 30 minutes
>
> data:
> 09:00:00 user-one foo.com
> 09:09:00 user-one foo.com
> 09:15:00 user-two bar.com
> 09:18:00 user-one foo.com
> 09:25:00 user-two bar.com
>
> So given this set of data there are 3 batches and the state would look
> something like:
> batch1: { "user-one:foo.com" : ("09:00:00 user-one foo.com", "09:09:00
> user-one foo.com") }
> batch2: { "user-one:foo.com" : ("09:00:00 user-one foo.com", "09:09:00
> user-one foo.com", "09:18:00 user-one foo.com"), "user-two:bar.com" :
> ("09:15:00 user-two bar.com") }
> batch3: { "user-one:foo.com" : ("09:00:00 user-one foo.com", "09:09:00
> user-one foo.com", "09:18:00 user-one foo.com"), "user-two:bar.com" :
> ("09:15:00 user-two bar.com", "09:25:00 user-two bar.com") }
>
> Now let's assume no more data comes in for either of the two keys above.
> Since the latest timestamp threshold is 30 minutes, "user-one:foo.com"
> should be flushed after 9:48 and "user-two:bar.com" should be flushed
> after
> 9:55.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Valid-spark-streaming-use-case-tp4410p4455.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: Valid spark streaming use case?

Posted by xargsgrep <ah...@gmail.com>.
Thanks, I played around with that example and had some followup questions.

1. The only way I was able to accumulate data per-key was to actually store
all the data in the state, not just the timestamp (see example below).
Otherwise I don't have access to data older than the batchDuration of the
StreamingContext. This creates a concern about memory usage and what would
happen if a node crashes, and how partitioning and replication work across
spark nodes. Does it store the state on disk ever? Again, what I want to do
is aggregate these sets of text lines by key and when a key has been
"inactive" (ie, no new data has been received for that key) for a certain
amount of time (eg 30 minutes), then finally save them somewhere and remove
them from the state.

2. Is the update function called only for keys that are in the current
batchDuration's stream or for all keys that exist? If it's the former, how
can I check the timestamp of keys from an older batch that never appear in
the stream again?

Example:
batchDuration = 10 minutes
timestampThreshold = 30 minutes

data:
09:00:00 user-one foo.com
09:09:00 user-one foo.com
09:15:00 user-two bar.com
09:18:00 user-one foo.com
09:25:00 user-two bar.com

So given this set of data there are 3 batches and the state would look
something like:
batch1: { "user-one:foo.com" : ("09:00:00 user-one foo.com", "09:09:00
user-one foo.com") }
batch2: { "user-one:foo.com" : ("09:00:00 user-one foo.com", "09:09:00
user-one foo.com", "09:18:00 user-one foo.com"), "user-two:bar.com" :
("09:15:00 user-two bar.com") }
batch3: { "user-one:foo.com" : ("09:00:00 user-one foo.com", "09:09:00
user-one foo.com", "09:18:00 user-one foo.com"), "user-two:bar.com" :
("09:15:00 user-two bar.com", "09:25:00 user-two bar.com") }

Now let's assume no more data comes in for either of the two keys above.
Since the latest timestamp threshold is 30 minutes, "user-one:foo.com"
should be flushed after 9:48 and "user-two:bar.com" should be flushed after
9:55.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Valid-spark-streaming-use-case-tp4410p4455.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Valid spark streaming use case?

Posted by Tathagata Das <ta...@gmail.com>.
This is a good usecase for using DStream.updateStateByKey! This allows you
to maintain arbitrary per-key state. Checkout this example.
https://github.com/tdas/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala

Also take a look at the documentation for more information:

This example just keeps the an integer (the running count) as the state,
but in practice it can be anything. In your case, you can keep that
per-group (i.e. per-key) threshold as the state. In the update function,
for each record of a group, you can check the timestamp against the
threshold and push stuff to DB and maybe update the threshold.

TD



On Thu, Apr 17, 2014 at 1:24 PM, xargsgrep <ah...@gmail.com> wrote:

> Hi, I'm completely new to Spark streaming (and Spark) and have been reading
> up on it and trying out various examples the past few days. I have a
> particular use case which I think it would work well for, but I wanted to
> put it out there and get some feedback on whether or not it actually would.
> The use case is:
>
> We have web tracking data continuously coming in from a pool of web
> servers.
> For simplicity, let's just say the data is text lines with a known set of
> fields, eg: "timestamp userId domain ...". What I want to do is:
> 1. group this continuous stream of data by "userId:domain", and
> 2. when the latest timestamp in each group is older than a certain
> threshold, persist the results to a DB
>
> #1 is straightforward and there are plenty of examples showing how to do
> it.
> However, I'm not sure how I would go about doing #2, or if that's something
> I can even do with spark because as far as I can tell it operates on
> sliding
> windows. I really just want to continue to accumulate these groups of
> "userId:domain" for all time (without specifying a window) and then roll
> them up and flush them once no new data has come in for a group after a
> certain amount of time. Would the updateStateByKey function allow me to do
> this somehow?
>
> Any help would be appreciated.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Valid-spark-streaming-use-case-tp4410.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>