You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Pierce Lamb <ri...@gmail.com> on 2014/12/17 23:07:10 UTC

Help with updateStateByKey

I am trying to run stateful Spark Streaming computations over (fake)
apache web server logs read from Kafka. The goal is to "sessionize"
the web traffic similar to this blog post:
http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/

The only difference is that I want to "sessionize" each page the IP
hits, instead of the entire session. I was able to do this reading
from a file of fake web traffic using Spark in batch mode, but now I
want to do it in a streaming context.

Log files are read from Kafka and parsed into K/V pairs of

(String, (String, Long, Long)) or

(IP, (requestPage, time, time))

I then call "groupByKey()" on this K/V pair. In batch mode, this would
produce a:

(String, CollectionBuffer((String, Long, Long), ...) or

(IP, CollectionBuffer((requestPage, time, time), ...)

In a StreamingContext, it produces a:

(String, ArrayBuffer((String, Long, Long), ...) like so:

(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))

However, as the next microbatch (DStream) arrives, this information is
discarded. Ultimately what I want is for that ArrayBuffer to fill up
over time as a given IP continues to interact and to run some
computations on its data to "sessionize" the page time. I believe the
operator to make that happen is "updateStateByKey." I'm having some
trouble with this operator (I'm new to both Spark & Scala); any help
is appreciated.

Thus far:

    val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)


        def updateGroupByKey(
                              a: Seq[(String, ArrayBuffer[(String,
Long, Long)])],
                              b: Option[(String, ArrayBuffer[(String,
Long, Long)])]
                              ): Option[(String, ArrayBuffer[(String,
Long, Long)])] = {

      }

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Help with updateStateByKey

Posted by Silvio Fiorito <si...@granturing.com>.
Great, glad it worked out! Just keep an eye on memory usage as you roll it 
out. Like I said before, if you’ll be running this 24/7 consider cleaning 
up sessions by returning None after some sort of timeout.




On 12/18/14, 8:25 PM, "Pierce Lamb" <ri...@gmail.com> wrote:

>This produces the expected output, thank you!
>
>On Thu, Dec 18, 2014 at 12:11 PM, Silvio Fiorito
><si...@granturing.com> wrote:
>> Ok, I have a better idea of what you’re trying to do now.
>>
>> I think the prob might be the map. The first time the function runs,
>> currentValue will be None. Using map on None returns None.
>>
>> Instead, try:
>>
>> Some(currentValue.getOrElse(Seq.empty) ++ newValues)
>>
>> I think that should give you the expected result.
>>
>>
>> From: Pierce Lamb <ri...@gmail.com>
>> Date: Thursday, December 18, 2014 at 2:31 PM
>> To: Silvio Fiorito <si...@granturing.com>
>> Cc: "user@spark.apache.org" <us...@spark.apache.org>
>> Subject: Re: Help with updateStateByKey
>>
>> Hi Silvio,
>>
>> This is a great suggestion (I wanted to get rid of groupByKey), I have 
>>been
>> trying to implement it this morning, but having some trouble. I would 
>>love
>> to see your code for the function that goes inside updateStateByKey
>>
>> Here is my current code:
>>
>>  def updateGroupByKey( newValues: Seq[(String, Long, Long)],
>>                           currentValue: Option[Seq[(String, Long, 
>>Long)]]
>>                           ): Option[Seq[(String, Long, Long)]] = {
>>
>>       currentValue.map{ case (v) => v ++ newValues
>>       }
>>     }
>>
>>     val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey)
>>
>>
>> However, when I run it the grouped DStream doesn't get populated with
>> anything. The issue is probably that currentValue is not actually an
>> Option[Seq[triple]] but rather an Option[triple]. However if I change 
>>it to
>> an Option[triple] then I have to also return an Option[triple] for
>> updateStateByKey to compile, but I want that return value to be an
>> Option[Seq[triple]] because ultimately i want the data to look like
>> (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested,
>> startTime, EndTime)...]) and have that Seq build over time
>>
>> Am I thinking about this wrong?
>>
>> Thank you
>>
>> On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito
>> <si...@granturing.com> wrote:
>>>
>>> Hi Pierce,
>>>
>>> You shouldn’t have to use groupByKey because updateStateByKey will get 
>>>a
>>> Seq of all the values for that key already.
>>>
>>> I used that for realtime sessionization as well. What I did was key my
>>> incoming events, then send them to udpateStateByKey. The 
>>>updateStateByKey
>>> function then received a Seq of the events and the Option of the 
>>>previous
>>> state for that key. The sessionization code then did its thing to 
>>>check if
>>> the incoming events were part of the same session, based on a 
>>>configured
>>> timeout. If a session already was active (from the previous state) and 
>>>it
>>> hadn’t exceeded the timeout, it used that value. Otherwise it 
>>>generated a
>>> new session id. Then the return value for the updateStateByKey function
>>> was a Tuple of session id and last timestamp.
>>>
>>> Then I joined the DStream with the session ids, which were both keyed 
>>>off
>>> the same id and continued my processing. Your requirements may be
>>> different, but that’s what worked well for me.
>>>
>>> Another thing to consider is cleaning up old sessions by returning 
>>>None in
>>> the updateStateByKey function. This will help with long running apps 
>>>and
>>> minimize memory usage (and checkpoint size).
>>>
>>> I was using something similar to the method above on a live production
>>> stream with very little CPU and memory footprint, running for weeks at 
>>>a
>>> time, processing up to 15M events per day with fluctuating traffic.
>>>
>>> Thanks,
>>> Silvio
>>>
>>>
>>>
>>> On 12/17/14, 10:07 PM, "Pierce Lamb" <ri...@gmail.com>
>>> wrote:
>>>
>>> >I am trying to run stateful Spark Streaming computations over (fake)
>>> >apache web server logs read from Kafka. The goal is to "sessionize"
>>> >the web traffic similar to this blog post:
>>>
>>> > 
>>>>http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessioni
>>>>zat
>>> >ion-with-spark-streaming-and-apache-hadoop/
>>> >
>>> >The only difference is that I want to "sessionize" each page the IP
>>> >hits, instead of the entire session. I was able to do this reading
>>> >from a file of fake web traffic using Spark in batch mode, but now I
>>> >want to do it in a streaming context.
>>> >
>>> >Log files are read from Kafka and parsed into K/V pairs of
>>> >
>>> >(String, (String, Long, Long)) or
>>> >
>>> >(IP, (requestPage, time, time))
>>> >
>>> >I then call "groupByKey()" on this K/V pair. In batch mode, this would
>>> >produce a:
>>> >
>>> >(String, CollectionBuffer((String, Long, Long), ...) or
>>> >
>>> >(IP, CollectionBuffer((requestPage, time, time), ...)
>>> >
>>> >In a StreamingContext, it produces a:
>>> >
>>> >(String, ArrayBuffer((String, Long, Long), ...) like so:
>>> >
>>> >(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
>>> >
>>> >However, as the next microbatch (DStream) arrives, this information is
>>> >discarded. Ultimately what I want is for that ArrayBuffer to fill up
>>> >over time as a given IP continues to interact and to run some
>>> >computations on its data to "sessionize" the page time. I believe the
>>> >operator to make that happen is "updateStateByKey." I'm having some
>>> >trouble with this operator (I'm new to both Spark & Scala); any help
>>> >is appreciated.
>>> >
>>> >Thus far:
>>> >
>>> >    val grouped =
>>> >ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
>>> >
>>> >
>>> >        def updateGroupByKey(
>>> >                              a: Seq[(String, ArrayBuffer[(String,
>>> >Long, Long)])],
>>> >                              b: Option[(String, ArrayBuffer[(String,
>>> >Long, Long)])]
>>> >                              ): Option[(String, ArrayBuffer[(String,
>>> >Long, Long)])] = {
>>> >
>>> >      }
>>> >
>>> >---------------------------------------------------------------------
>>> >To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >For additional commands, e-mail: user-help@spark.apache.org
>>> >

Re: Help with updateStateByKey

Posted by Pierce Lamb <ri...@gmail.com>.
This produces the expected output, thank you!

On Thu, Dec 18, 2014 at 12:11 PM, Silvio Fiorito
<si...@granturing.com> wrote:
> Ok, I have a better idea of what you’re trying to do now.
>
> I think the prob might be the map. The first time the function runs,
> currentValue will be None. Using map on None returns None.
>
> Instead, try:
>
> Some(currentValue.getOrElse(Seq.empty) ++ newValues)
>
> I think that should give you the expected result.
>
>
> From: Pierce Lamb <ri...@gmail.com>
> Date: Thursday, December 18, 2014 at 2:31 PM
> To: Silvio Fiorito <si...@granturing.com>
> Cc: "user@spark.apache.org" <us...@spark.apache.org>
> Subject: Re: Help with updateStateByKey
>
> Hi Silvio,
>
> This is a great suggestion (I wanted to get rid of groupByKey), I have been
> trying to implement it this morning, but having some trouble. I would love
> to see your code for the function that goes inside updateStateByKey
>
> Here is my current code:
>
>  def updateGroupByKey( newValues: Seq[(String, Long, Long)],
>                           currentValue: Option[Seq[(String, Long, Long)]]
>                           ): Option[Seq[(String, Long, Long)]] = {
>
>       currentValue.map{ case (v) => v ++ newValues
>       }
>     }
>
>     val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey)
>
>
> However, when I run it the grouped DStream doesn't get populated with
> anything. The issue is probably that currentValue is not actually an
> Option[Seq[triple]] but rather an Option[triple]. However if I change it to
> an Option[triple] then I have to also return an Option[triple] for
> updateStateByKey to compile, but I want that return value to be an
> Option[Seq[triple]] because ultimately i want the data to look like
> (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested,
> startTime, EndTime)...]) and have that Seq build over time
>
> Am I thinking about this wrong?
>
> Thank you
>
> On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito
> <si...@granturing.com> wrote:
>>
>> Hi Pierce,
>>
>> You shouldn’t have to use groupByKey because updateStateByKey will get a
>> Seq of all the values for that key already.
>>
>> I used that for realtime sessionization as well. What I did was key my
>> incoming events, then send them to udpateStateByKey. The updateStateByKey
>> function then received a Seq of the events and the Option of the previous
>> state for that key. The sessionization code then did its thing to check if
>> the incoming events were part of the same session, based on a configured
>> timeout. If a session already was active (from the previous state) and it
>> hadn’t exceeded the timeout, it used that value. Otherwise it generated a
>> new session id. Then the return value for the updateStateByKey function
>> was a Tuple of session id and last timestamp.
>>
>> Then I joined the DStream with the session ids, which were both keyed off
>> the same id and continued my processing. Your requirements may be
>> different, but that’s what worked well for me.
>>
>> Another thing to consider is cleaning up old sessions by returning None in
>> the updateStateByKey function. This will help with long running apps and
>> minimize memory usage (and checkpoint size).
>>
>> I was using something similar to the method above on a live production
>> stream with very little CPU and memory footprint, running for weeks at a
>> time, processing up to 15M events per day with fluctuating traffic.
>>
>> Thanks,
>> Silvio
>>
>>
>>
>> On 12/17/14, 10:07 PM, "Pierce Lamb" <ri...@gmail.com>
>> wrote:
>>
>> >I am trying to run stateful Spark Streaming computations over (fake)
>> >apache web server logs read from Kafka. The goal is to "sessionize"
>> >the web traffic similar to this blog post:
>>
>> > >http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat
>> >ion-with-spark-streaming-and-apache-hadoop/
>> >
>> >The only difference is that I want to "sessionize" each page the IP
>> >hits, instead of the entire session. I was able to do this reading
>> >from a file of fake web traffic using Spark in batch mode, but now I
>> >want to do it in a streaming context.
>> >
>> >Log files are read from Kafka and parsed into K/V pairs of
>> >
>> >(String, (String, Long, Long)) or
>> >
>> >(IP, (requestPage, time, time))
>> >
>> >I then call "groupByKey()" on this K/V pair. In batch mode, this would
>> >produce a:
>> >
>> >(String, CollectionBuffer((String, Long, Long), ...) or
>> >
>> >(IP, CollectionBuffer((requestPage, time, time), ...)
>> >
>> >In a StreamingContext, it produces a:
>> >
>> >(String, ArrayBuffer((String, Long, Long), ...) like so:
>> >
>> >(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
>> >
>> >However, as the next microbatch (DStream) arrives, this information is
>> >discarded. Ultimately what I want is for that ArrayBuffer to fill up
>> >over time as a given IP continues to interact and to run some
>> >computations on its data to "sessionize" the page time. I believe the
>> >operator to make that happen is "updateStateByKey." I'm having some
>> >trouble with this operator (I'm new to both Spark & Scala); any help
>> >is appreciated.
>> >
>> >Thus far:
>> >
>> >    val grouped =
>> >ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
>> >
>> >
>> >        def updateGroupByKey(
>> >                              a: Seq[(String, ArrayBuffer[(String,
>> >Long, Long)])],
>> >                              b: Option[(String, ArrayBuffer[(String,
>> >Long, Long)])]
>> >                              ): Option[(String, ArrayBuffer[(String,
>> >Long, Long)])] = {
>> >
>> >      }
>> >
>> >---------------------------------------------------------------------
>> >To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >For additional commands, e-mail: user-help@spark.apache.org
>> >

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Help with updateStateByKey

Posted by Silvio Fiorito <si...@granturing.com>.
Ok, I have a better idea of what you’re trying to do now.

I think the prob might be the map. The first time the function runs, currentValue will be None. Using map on None returns None.

Instead, try:

Some(currentValue.getOrElse(Seq.empty) ++ newValues)

I think that should give you the expected result.


From: Pierce Lamb <ri...@gmail.com>>
Date: Thursday, December 18, 2014 at 2:31 PM
To: Silvio Fiorito <si...@granturing.com>>
Cc: "user@spark.apache.org<ma...@spark.apache.org>" <us...@spark.apache.org>>
Subject: Re: Help with updateStateByKey

Hi Silvio,

This is a great suggestion (I wanted to get rid of groupByKey), I have been trying to implement it this morning, but having some trouble. I would love to see your code for the function that goes inside updateStateByKey

Here is my current code:

 def updateGroupByKey( newValues: Seq[(String, Long, Long)],
                          currentValue: Option[Seq[(String, Long, Long)]]
                          ): Option[Seq[(String, Long, Long)]] = {

      currentValue.map{ case (v) => v ++ newValues
      }
    }

    val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey)


However, when I run it the grouped DStream doesn't get populated with anything. The issue is probably that currentValue is not actually an Option[Seq[triple]] but rather an Option[triple]. However if I change it to an Option[triple] then I have to also return an Option[triple] for updateStateByKey to compile, but I want that return value to be an Option[Seq[triple]] because ultimately i want the data to look like (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested, startTime, EndTime)...]) and have that Seq build over time

Am I thinking about this wrong?

Thank you

On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito <si...@granturing.com>> wrote:
Hi Pierce,

You shouldn’t have to use groupByKey because updateStateByKey will get a
Seq of all the values for that key already.

I used that for realtime sessionization as well. What I did was key my
incoming events, then send them to udpateStateByKey. The updateStateByKey
function then received a Seq of the events and the Option of the previous
state for that key. The sessionization code then did its thing to check if
the incoming events were part of the same session, based on a configured
timeout. If a session already was active (from the previous state) and it
hadn’t exceeded the timeout, it used that value. Otherwise it generated a
new session id. Then the return value for the updateStateByKey function
was a Tuple of session id and last timestamp.

Then I joined the DStream with the session ids, which were both keyed off
the same id and continued my processing. Your requirements may be
different, but that’s what worked well for me.

Another thing to consider is cleaning up old sessions by returning None in
the updateStateByKey function. This will help with long running apps and
minimize memory usage (and checkpoint size).

I was using something similar to the method above on a live production
stream with very little CPU and memory footprint, running for weeks at a
time, processing up to 15M events per day with fluctuating traffic.

Thanks,
Silvio



On 12/17/14, 10:07 PM, "Pierce Lamb" <ri...@gmail.com>> wrote:

>I am trying to run stateful Spark Streaming computations over (fake)
>apache web server logs read from Kafka. The goal is to "sessionize"
>the web traffic similar to this blog post:
>http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat
>ion-with-spark-streaming-and-apache-hadoop/
>
>The only difference is that I want to "sessionize" each page the IP
>hits, instead of the entire session. I was able to do this reading
>from a file of fake web traffic using Spark in batch mode, but now I
>want to do it in a streaming context.
>
>Log files are read from Kafka and parsed into K/V pairs of
>
>(String, (String, Long, Long)) or
>
>(IP, (requestPage, time, time))
>
>I then call "groupByKey()" on this K/V pair. In batch mode, this would
>produce a:
>
>(String, CollectionBuffer((String, Long, Long), ...) or
>
>(IP, CollectionBuffer((requestPage, time, time), ...)
>
>In a StreamingContext, it produces a:
>
>(String, ArrayBuffer((String, Long, Long), ...) like so:
>
>(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
>
>However, as the next microbatch (DStream) arrives, this information is
>discarded. Ultimately what I want is for that ArrayBuffer to fill up
>over time as a given IP continues to interact and to run some
>computations on its data to "sessionize" the page time. I believe the
>operator to make that happen is "updateStateByKey." I'm having some
>trouble with this operator (I'm new to both Spark & Scala); any help
>is appreciated.
>
>Thus far:
>
>    val grouped =
>ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
>
>
>        def updateGroupByKey(
>                              a: Seq[(String, ArrayBuffer[(String,
>Long, Long)])],
>                              b: Option[(String, ArrayBuffer[(String,
>Long, Long)])]
>                              ): Option[(String, ArrayBuffer[(String,
>Long, Long)])] = {
>
>      }
>
>---------------------------------------------------------------------
>To unsubscribe, e-mail: user-unsubscribe@spark.apache.org<ma...@spark.apache.org>
>For additional commands, e-mail: user-help@spark.apache.org<ma...@spark.apache.org>
>

Re: Help with updateStateByKey

Posted by Pierce Lamb <ri...@gmail.com>.
Hi Silvio,

This is a great suggestion (I wanted to get rid of groupByKey), I have been
trying to implement it this morning, but having some trouble. I would love
to see your code for the function that goes inside updateStateByKey

Here is my current code:

 def updateGroupByKey( newValues: Seq[(String, Long, Long)],
                          currentValue: Option[Seq[(String, Long, Long)]]
                          ): Option[Seq[(String, Long, Long)]] = {

      currentValue.map{ case (v) => v ++ newValues
      }
    }

    val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey)


However, when I run it the grouped DStream doesn't get populated with
anything. The issue is probably that currentValue is not actually an
Option[Seq[triple]] but rather an Option[triple]. However if I change it to
an Option[triple] then I have to also return an Option[triple] for
updateStateByKey to compile, but I want that return value to be an
Option[Seq[triple]] because ultimately i want the data to look like
(IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested,
startTime, EndTime)...]) and have that Seq build over time

Am I thinking about this wrong?

Thank you

On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito <
silvio.fiorito@granturing.com> wrote:
>
> Hi Pierce,
>
> You shouldn’t have to use groupByKey because updateStateByKey will get a
> Seq of all the values for that key already.
>
> I used that for realtime sessionization as well. What I did was key my
> incoming events, then send them to udpateStateByKey. The updateStateByKey
> function then received a Seq of the events and the Option of the previous
> state for that key. The sessionization code then did its thing to check if
> the incoming events were part of the same session, based on a configured
> timeout. If a session already was active (from the previous state) and it
> hadn’t exceeded the timeout, it used that value. Otherwise it generated a
> new session id. Then the return value for the updateStateByKey function
> was a Tuple of session id and last timestamp.
>
> Then I joined the DStream with the session ids, which were both keyed off
> the same id and continued my processing. Your requirements may be
> different, but that’s what worked well for me.
>
> Another thing to consider is cleaning up old sessions by returning None in
> the updateStateByKey function. This will help with long running apps and
> minimize memory usage (and checkpoint size).
>
> I was using something similar to the method above on a live production
> stream with very little CPU and memory footprint, running for weeks at a
> time, processing up to 15M events per day with fluctuating traffic.
>
> Thanks,
> Silvio
>
>
>
> On 12/17/14, 10:07 PM, "Pierce Lamb" <ri...@gmail.com>
> wrote:
>
> >I am trying to run stateful Spark Streaming computations over (fake)
> >apache web server logs read from Kafka. The goal is to "sessionize"
> >the web traffic similar to this blog post:
> >
> http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat
> >ion-with-spark-streaming-and-apache-hadoop/
> >
> >The only difference is that I want to "sessionize" each page the IP
> >hits, instead of the entire session. I was able to do this reading
> >from a file of fake web traffic using Spark in batch mode, but now I
> >want to do it in a streaming context.
> >
> >Log files are read from Kafka and parsed into K/V pairs of
> >
> >(String, (String, Long, Long)) or
> >
> >(IP, (requestPage, time, time))
> >
> >I then call "groupByKey()" on this K/V pair. In batch mode, this would
> >produce a:
> >
> >(String, CollectionBuffer((String, Long, Long), ...) or
> >
> >(IP, CollectionBuffer((requestPage, time, time), ...)
> >
> >In a StreamingContext, it produces a:
> >
> >(String, ArrayBuffer((String, Long, Long), ...) like so:
> >
> >(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
> >
> >However, as the next microbatch (DStream) arrives, this information is
> >discarded. Ultimately what I want is for that ArrayBuffer to fill up
> >over time as a given IP continues to interact and to run some
> >computations on its data to "sessionize" the page time. I believe the
> >operator to make that happen is "updateStateByKey." I'm having some
> >trouble with this operator (I'm new to both Spark & Scala); any help
> >is appreciated.
> >
> >Thus far:
> >
> >    val grouped =
> >ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
> >
> >
> >        def updateGroupByKey(
> >                              a: Seq[(String, ArrayBuffer[(String,
> >Long, Long)])],
> >                              b: Option[(String, ArrayBuffer[(String,
> >Long, Long)])]
> >                              ): Option[(String, ArrayBuffer[(String,
> >Long, Long)])] = {
> >
> >      }
> >
> >---------------------------------------------------------------------
> >To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >For additional commands, e-mail: user-help@spark.apache.org
> >
>

Re: Help with updateStateByKey

Posted by Silvio Fiorito <si...@granturing.com>.
Hi Pierce,

You shouldn’t have to use groupByKey because updateStateByKey will get a 
Seq of all the values for that key already.

I used that for realtime sessionization as well. What I did was key my 
incoming events, then send them to udpateStateByKey. The updateStateByKey 
function then received a Seq of the events and the Option of the previous 
state for that key. The sessionization code then did its thing to check if 
the incoming events were part of the same session, based on a configured 
timeout. If a session already was active (from the previous state) and it 
hadn’t exceeded the timeout, it used that value. Otherwise it generated a 
new session id. Then the return value for the updateStateByKey function 
was a Tuple of session id and last timestamp.

Then I joined the DStream with the session ids, which were both keyed off 
the same id and continued my processing. Your requirements may be 
different, but that’s what worked well for me.

Another thing to consider is cleaning up old sessions by returning None in 
the updateStateByKey function. This will help with long running apps and 
minimize memory usage (and checkpoint size).

I was using something similar to the method above on a live production 
stream with very little CPU and memory footprint, running for weeks at a 
time, processing up to 15M events per day with fluctuating traffic.

Thanks,
Silvio



On 12/17/14, 10:07 PM, "Pierce Lamb" <ri...@gmail.com> wrote:

>I am trying to run stateful Spark Streaming computations over (fake)
>apache web server logs read from Kafka. The goal is to "sessionize"
>the web traffic similar to this blog post:
>http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat
>ion-with-spark-streaming-and-apache-hadoop/
>
>The only difference is that I want to "sessionize" each page the IP
>hits, instead of the entire session. I was able to do this reading
>from a file of fake web traffic using Spark in batch mode, but now I
>want to do it in a streaming context.
>
>Log files are read from Kafka and parsed into K/V pairs of
>
>(String, (String, Long, Long)) or
>
>(IP, (requestPage, time, time))
>
>I then call "groupByKey()" on this K/V pair. In batch mode, this would
>produce a:
>
>(String, CollectionBuffer((String, Long, Long), ...) or
>
>(IP, CollectionBuffer((requestPage, time, time), ...)
>
>In a StreamingContext, it produces a:
>
>(String, ArrayBuffer((String, Long, Long), ...) like so:
>
>(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
>
>However, as the next microbatch (DStream) arrives, this information is
>discarded. Ultimately what I want is for that ArrayBuffer to fill up
>over time as a given IP continues to interact and to run some
>computations on its data to "sessionize" the page time. I believe the
>operator to make that happen is "updateStateByKey." I'm having some
>trouble with this operator (I'm new to both Spark & Scala); any help
>is appreciated.
>
>Thus far:
>
>    val grouped = 
>ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
>
>
>        def updateGroupByKey(
>                              a: Seq[(String, ArrayBuffer[(String,
>Long, Long)])],
>                              b: Option[(String, ArrayBuffer[(String,
>Long, Long)])]
>                              ): Option[(String, ArrayBuffer[(String,
>Long, Long)])] = {
>
>      }
>
>---------------------------------------------------------------------
>To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>For additional commands, e-mail: user-help@spark.apache.org
>

Re: Help with updateStateByKey

Posted by Tathagata Das <ta...@gmail.com>.
Another point to start playing with updateStateByKey is the example
StatefulNetworkWordCount. See the streaming examples directory in the
Spark repository.

TD



On Thu, Dec 18, 2014 at 6:07 AM, Pierce Lamb
<ri...@gmail.com> wrote:
> I am trying to run stateful Spark Streaming computations over (fake)
> apache web server logs read from Kafka. The goal is to "sessionize"
> the web traffic similar to this blog post:
> http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/
>
> The only difference is that I want to "sessionize" each page the IP
> hits, instead of the entire session. I was able to do this reading
> from a file of fake web traffic using Spark in batch mode, but now I
> want to do it in a streaming context.
>
> Log files are read from Kafka and parsed into K/V pairs of
>
> (String, (String, Long, Long)) or
>
> (IP, (requestPage, time, time))
>
> I then call "groupByKey()" on this K/V pair. In batch mode, this would
> produce a:
>
> (String, CollectionBuffer((String, Long, Long), ...) or
>
> (IP, CollectionBuffer((requestPage, time, time), ...)
>
> In a StreamingContext, it produces a:
>
> (String, ArrayBuffer((String, Long, Long), ...) like so:
>
> (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
>
> However, as the next microbatch (DStream) arrives, this information is
> discarded. Ultimately what I want is for that ArrayBuffer to fill up
> over time as a given IP continues to interact and to run some
> computations on its data to "sessionize" the page time. I believe the
> operator to make that happen is "updateStateByKey." I'm having some
> trouble with this operator (I'm new to both Spark & Scala); any help
> is appreciated.
>
> Thus far:
>
>     val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
>
>
>         def updateGroupByKey(
>                               a: Seq[(String, ArrayBuffer[(String,
> Long, Long)])],
>                               b: Option[(String, ArrayBuffer[(String,
> Long, Long)])]
>                               ): Option[(String, ArrayBuffer[(String,
> Long, Long)])] = {
>
>       }
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org