You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Joe Panciera <jo...@gmail.com> on 2016/06/08 20:27:35 UTC

Variable in UpdateStateByKey Not Updating After Restarting Application from Checkpoint

I've run into an issue where a global variable used within an
UpdateStateByKey function isn't being assigned after the application
restarts from a checkpoint. Using ForEachRDD I have a global variable 'A'
that is propagated from a file every time a batch runs, and A is then used
in an UpdateStateByKey. When I initially run the application, it functions
as expected and the value of A is referenced correctly within the scope of
the update function.

However, when I bring the application down and restart, I see a different
behavior. Variable A is assigned the correct value by its corresponding
ForEachRDD function, but when the UpdateStateByKey function is executed the
new value for A isn't used. It just... disappears.

I could be going about the implementation of this wrong, but I'm hoping
that someone can point me in the correct direction.

Here's some pseudocode:

def readfile(rdd):

    global A
    a = readFromFile

def update(new, old)

    if old in A:
        do something


dstream.forEachRDD(readfile)
dstream.updateStateByKey(update)

ssc.checkpoint('checkpoint')

A is correct the first time this is run, but when the application is killed
and restarted A doesn't seem to be reassigned correctly.

Re: Variable in UpdateStateByKey Not Updating After Restarting Application from Checkpoint

Posted by Joe Panciera <jo...@gmail.com>.
Could it be possible that this is a bug? I hate to throw that word around,
but this is definitely not expected behavior (as far as I can tell). If
anyone has a suggestion for a work around or better way to accomplish
handling a global value in UpdateStateByKey, that would be fantastic.

Thanks

On Wed, Jun 8, 2016 at 1:27 PM, Joe Panciera <jo...@gmail.com> wrote:

> I've run into an issue where a global variable used within an
> UpdateStateByKey function isn't being assigned after the application
> restarts from a checkpoint. Using ForEachRDD I have a global variable 'A'
> that is propagated from a file every time a batch runs, and A is then used
> in an UpdateStateByKey. When I initially run the application, it functions
> as expected and the value of A is referenced correctly within the scope of
> the update function.
>
> However, when I bring the application down and restart, I see a different
> behavior. Variable A is assigned the correct value by its corresponding
> ForEachRDD function, but when the UpdateStateByKey function is executed the
> new value for A isn't used. It just... disappears.
>
> I could be going about the implementation of this wrong, but I'm hoping
> that someone can point me in the correct direction.
>
> Here's some pseudocode:
>
> def readfile(rdd):
>
>     global A
>     a = readFromFile
>
> def update(new, old)
>
>     if old in A:
>         do something
>
>
> dstream.forEachRDD(readfile)
> dstream.updateStateByKey(update)
>
> ssc.checkpoint('checkpoint')
>
> A is correct the first time this is run, but when the application is
> killed and restarted A doesn't seem to be reassigned correctly.
>
>