You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Adrian Mocanu <am...@verticalscope.com> on 2014/04/29 20:50:26 UTC

What is Seq[V] in updateStateByKey?

What is Seq[V] in updateStateByKey?
Does this store the collected tuples of the RDD in a collection?

Method signature:
def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)]

In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the moment I switched to a different type like Seq[(String, Double)] the code didn't compile.

-Adrian


Re: What is Seq[V] in updateStateByKey?

Posted by Tathagata Das <ta...@gmail.com>.
Depends on your code. Referring to the earlier example, if you do

words.map(x => (x,1)).updateStateByKey(....)

then for a particular word, if a batch contains 6 occurrences of that word,
then the Seq[V] will be [1, 1, 1, 1, 1, 1]

Instead if you do

words.map(x => (x,1)).reduceByKey(_ + _).updateStateByKey(...)

then Seq[V] will be [ 6 ]  , that is, all the 1s will be summed up already
due to the reduceByKey.

TD



On Thu, May 1, 2014 at 7:29 AM, Adrian Mocanu <am...@verticalscope.com>wrote:

> So Seq[V] contains only "new" tuples. I initially thought that whenever a
> new tuple was found, it would add it to Seq and call the update function
> immediately so there wouldn't be more than 1 update to Seq per function
> call.
>
> Say I want to sum tuples with the same key is an RDD using
> updateStateByKey, Then (1) Seq[V] would contain the numbers for a
> particular key and my S state could be the sum?
> Or would (2) Seq contain partial sums (say sum per partition?) which I
> then need to sum into the final sum?
>
> After writing this out and thinking a little more about it I think #2 is
> correct. Can you confirm?
>
> Thanks again!
> -A
>
> -----Original Message-----
> From: Sean Owen [mailto:sowen@cloudera.com]
> Sent: April-30-14 4:30 PM
> To: user@spark.apache.org
> Subject: Re: What is Seq[V] in updateStateByKey?
>
> S is the previous count, if any. Seq[V] are potentially many new counts.
> All of them have to be added together to keep an accurate total.  It's as
> if the count were 3, and I tell you I've just observed 2, 5, and 1
> additional occurrences -- the new count is 3 + (2+5+1) not
> 1 + 1.
>
>
> I butted in since I'd like to ask a different question about the same line
> of code. Why:
>
>       val currentCount = values.foldLeft(0)(_ + _)
>
> instead of
>
>       val currentCount = values.sum
>
> This happens a few places in the code. sum seems equivalent and likely
> quicker. Same with things like "filter(_ == 200).size" instead of "count(_
> == 200)"... pretty trivial but hey.
>
>
> On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu <am...@verticalscope.com>
> wrote:
> > Hi TD,
> >
> > Why does the example keep recalculating the count via fold?
> >
> > Wouldn’t it make more sense to get the last count in values Seq and
> > add 1 to it and save that as current count?
> >
> >
> >
> > From what Sean explained I understand that all values in Seq have the
> > same key. Then when a new value for that key is found it is added to
> > this Seq collection and the update function is called.
> >
> >
> >
> > Is my understanding correct?
>

RE: What is Seq[V] in updateStateByKey?

Posted by Adrian Mocanu <am...@verticalscope.com>.
So Seq[V] contains only "new" tuples. I initially thought that whenever a new tuple was found, it would add it to Seq and call the update function immediately so there wouldn't be more than 1 update to Seq per function call.

Say I want to sum tuples with the same key is an RDD using updateStateByKey, Then (1) Seq[V] would contain the numbers for a particular key and my S state could be the sum? 
Or would (2) Seq contain partial sums (say sum per partition?) which I then need to sum into the final sum?

After writing this out and thinking a little more about it I think #2 is correct. Can you confirm?

Thanks again!
-A

-----Original Message-----
From: Sean Owen [mailto:sowen@cloudera.com] 
Sent: April-30-14 4:30 PM
To: user@spark.apache.org
Subject: Re: What is Seq[V] in updateStateByKey?

S is the previous count, if any. Seq[V] are potentially many new counts. All of them have to be added together to keep an accurate total.  It's as if the count were 3, and I tell you I've just observed 2, 5, and 1 additional occurrences -- the new count is 3 + (2+5+1) not
1 + 1.


I butted in since I'd like to ask a different question about the same line of code. Why:

      val currentCount = values.foldLeft(0)(_ + _)

instead of

      val currentCount = values.sum

This happens a few places in the code. sum seems equivalent and likely quicker. Same with things like "filter(_ == 200).size" instead of "count(_ == 200)"... pretty trivial but hey.


On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu <am...@verticalscope.com> wrote:
> Hi TD,
>
> Why does the example keep recalculating the count via fold?
>
> Wouldn’t it make more sense to get the last count in values Seq and 
> add 1 to it and save that as current count?
>
>
>
> From what Sean explained I understand that all values in Seq have the 
> same key. Then when a new value for that key is found it is added to 
> this Seq collection and the update function is called.
>
>
>
> Is my understanding correct?

Re: What is Seq[V] in updateStateByKey?

Posted by Tathagata Das <ta...@gmail.com>.
Yeah, I remember changing fold to sum in a few places, probably in
testsuites, but missed this example I guess.



On Wed, Apr 30, 2014 at 1:29 PM, Sean Owen <so...@cloudera.com> wrote:

> S is the previous count, if any. Seq[V] are potentially many new
> counts. All of them have to be added together to keep an accurate
> total.  It's as if the count were 3, and I tell you I've just observed
> 2, 5, and 1 additional occurrences -- the new count is 3 + (2+5+1) not
> 1 + 1.
>
>
> I butted in since I'd like to ask a different question about the same
> line of code. Why:
>
>       val currentCount = values.foldLeft(0)(_ + _)
>
> instead of
>
>       val currentCount = values.sum
>
> This happens a few places in the code. sum seems equivalent and likely
> quicker. Same with things like "filter(_ == 200).size" instead of
> "count(_ == 200)"... pretty trivial but hey.
>
>
> On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu
> <am...@verticalscope.com> wrote:
> > Hi TD,
> >
> > Why does the example keep recalculating the count via fold?
> >
> > Wouldn’t it make more sense to get the last count in values Seq and add
> 1 to
> > it and save that as current count?
> >
> >
> >
> > From what Sean explained I understand that all values in Seq have the
> same
> > key. Then when a new value for that key is found it is added to this Seq
> > collection and the update function is called.
> >
> >
> >
> > Is my understanding correct?
>

Re: What is Seq[V] in updateStateByKey?

Posted by Sean Owen <so...@cloudera.com>.
S is the previous count, if any. Seq[V] are potentially many new
counts. All of them have to be added together to keep an accurate
total.  It's as if the count were 3, and I tell you I've just observed
2, 5, and 1 additional occurrences -- the new count is 3 + (2+5+1) not
1 + 1.


I butted in since I'd like to ask a different question about the same
line of code. Why:

      val currentCount = values.foldLeft(0)(_ + _)

instead of

      val currentCount = values.sum

This happens a few places in the code. sum seems equivalent and likely
quicker. Same with things like "filter(_ == 200).size" instead of
"count(_ == 200)"... pretty trivial but hey.


On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu
<am...@verticalscope.com> wrote:
> Hi TD,
>
> Why does the example keep recalculating the count via fold?
>
> Wouldn’t it make more sense to get the last count in values Seq and add 1 to
> it and save that as current count?
>
>
>
> From what Sean explained I understand that all values in Seq have the same
> key. Then when a new value for that key is found it is added to this Seq
> collection and the update function is called.
>
>
>
> Is my understanding correct?

RE: What is Seq[V] in updateStateByKey?

Posted by Adrian Mocanu <am...@verticalscope.com>.
Hi TD,
Why does the example keep recalculating the count via fold?
Wouldn’t it make more sense to get the last count in values Seq and add 1 to it and save that as current count?

From what Sean explained I understand that all values in Seq have the same key. Then when a new value for that key is found it is added to this Seq collection and the update function is called.

Is my understanding correct?

-Adrian

From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
Sent: April-29-14 4:57 PM
To: user@spark.apache.org
Cc: user@spark.incubator.apache.org
Subject: Re: What is Seq[V] in updateStateByKey?

You may have already seen it, but I will mention it anyways. This example may help.
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala

Here the state is essentially a running count of the words seen. So the value type (i.e, V) is Int (count of a word in each batch) and the state type (i.e. S) is also a Int (running count). The updateFunction essentially sums up the running count with the new count and to generate a new running count.

TD


On Tue, Apr 29, 2014 at 1:49 PM, Sean Owen <so...@cloudera.com>> wrote:
The original DStream is of (K,V). This function creates a DStream of
(K,S). Each time slice brings one or more new V for each K. The old
state S (can be different from V!) for each K -- possibly non-existent
-- is updated in some way by a bunch of new V, to produce a new state
S -- which also might not exist anymore after update. That's why the
function is from a Seq[V], and an Option[S], to an Option[S].

If you RDD has value type V = Double then your function needs to
update state based on a new Seq[Double] at each time slice, since
Doubles are the new thing arriving for each key at each time slice.


On Tue, Apr 29, 2014 at 7:50 PM, Adrian Mocanu
<am...@verticalscope.com>> wrote:
> What is Seq[V] in updateStateByKey?
>
> Does this store the collected tuples of the RDD in a collection?
>
>
>
> Method signature:
>
> def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =>
> Option[S] ): DStream[(K, S)]
>
>
>
> In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the
> moment I switched to a different type like Seq[(String, Double)] the code
> didn’t compile.
>
>
>
> -Adrian
>
>


Re: What is Seq[V] in updateStateByKey?

Posted by Tathagata Das <ta...@gmail.com>.
You may have already seen it, but I will mention it anyways. This example
may help.
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala

Here the state is essentially a running count of the words seen. So the
value type (i.e, V) is Int (count of a word in each batch) and the state
type (i.e. S) is also a Int (running count). The updateFunction essentially
sums up the running count with the new count and to generate a new running
count.

TD



On Tue, Apr 29, 2014 at 1:49 PM, Sean Owen <so...@cloudera.com> wrote:

> The original DStream is of (K,V). This function creates a DStream of
> (K,S). Each time slice brings one or more new V for each K. The old
> state S (can be different from V!) for each K -- possibly non-existent
> -- is updated in some way by a bunch of new V, to produce a new state
> S -- which also might not exist anymore after update. That's why the
> function is from a Seq[V], and an Option[S], to an Option[S].
>
> If you RDD has value type V = Double then your function needs to
> update state based on a new Seq[Double] at each time slice, since
> Doubles are the new thing arriving for each key at each time slice.
>
>
> On Tue, Apr 29, 2014 at 7:50 PM, Adrian Mocanu
> <am...@verticalscope.com> wrote:
> > What is Seq[V] in updateStateByKey?
> >
> > Does this store the collected tuples of the RDD in a collection?
> >
> >
> >
> > Method signature:
> >
> > def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =>
> > Option[S] ): DStream[(K, S)]
> >
> >
> >
> > In my case I used Seq[Double] assuming a sequence of Doubles in the RDD;
> the
> > moment I switched to a different type like Seq[(String, Double)] the code
> > didn’t compile.
> >
> >
> >
> > -Adrian
> >
> >
>

Re: What is Seq[V] in updateStateByKey?

Posted by Sean Owen <so...@cloudera.com>.
The original DStream is of (K,V). This function creates a DStream of
(K,S). Each time slice brings one or more new V for each K. The old
state S (can be different from V!) for each K -- possibly non-existent
-- is updated in some way by a bunch of new V, to produce a new state
S -- which also might not exist anymore after update. That's why the
function is from a Seq[V], and an Option[S], to an Option[S].

If you RDD has value type V = Double then your function needs to
update state based on a new Seq[Double] at each time slice, since
Doubles are the new thing arriving for each key at each time slice.


On Tue, Apr 29, 2014 at 7:50 PM, Adrian Mocanu
<am...@verticalscope.com> wrote:
> What is Seq[V] in updateStateByKey?
>
> Does this store the collected tuples of the RDD in a collection?
>
>
>
> Method signature:
>
> def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =>
> Option[S] ): DStream[(K, S)]
>
>
>
> In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the
> moment I switched to a different type like Seq[(String, Double)] the code
> didn’t compile.
>
>
>
> -Adrian
>
>