You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by David Rosenstrauch <da...@gmail.com> on 2017/06/06 23:08:30 UTC

How to perform clean-up after stateful streaming processes an RDD?

We have some code we've written using stateful streaming (mapWithState)
which works well for the most part.  The stateful streaming runs, processes
the RDD of input data, calls the state spec function for each input record,
and does all proper adding and removing from the state cache.

However, I have a need to do some cleanup after stateful streaming
processes the input data RDD, and I can't seem to find any place where we
can put that code where it will run when it's supposed to.

Essentially our state spec function needs to a) call out to an external
service, b) hold some data from that service, and then c) inform the
service to clean up the remaining data once the RDD is complete.

I've gotten to the point where the code looks approximately like this:


val eventStream = incomingStream.transform(...)

val stateUpdater = new StateUpdater
val stateUpdateStream =
eventStream.mapWithState(stateUpdater.stateUpdateFunction _)

stateUpdateStream.foreachRdd(...) {
...
}
stateUpdater.cleanupExternalService    // DOES NOT WORK!


class StateUpdater extends Serializable {

def stateUpdateFunction(key, value, state) {
if (!state.initalized) {
state.initialize(externalService)
}
...
}

def cleanupExternalService {
externalService.cleanup  // writes some data back to the external service
}

@transient lazy val externalService = new ExternalService
}


Note that the ExternalService object is holding onto a small bit of state
that it needs to write back to the external service once we have completed
running the stateUpdateFunction on every record in the RDD.  However this
code doesn't actually work.  Because of the way Spark serializes objects on
the driver and then deserializes them onto the executor, there's no way for
me to get a hold of the ExternalService object that is being used on each
RDD partition and clean up its leftover data.  Those objects seem to be
held internally somewhere in the bowels of stateful streaming (where it
processes an RDD of incoming data and applies it to the state).  And back
in the main code where I'm trying to call the cleanup method, I'm actually
calling it on a totally different object than the one that ran in the RDD
partitions.  And stateful streaming doesn't provide me with any opportunity
to perform any cleanup processing - say by calling some "rddDone" method to
notify me that it just finished doing state processing on an RDD.  It just
calls only the statespec function over and over, once for every record, and
never notifying me that we've ended processing an RDD or started a new one.


Is there any way out of this conundrum?  I've tried to avoid the problem by
moving my interactions with the external service outside of the state spec
function.  But that didn't work:  the interaction with the external service
is really needed inside of the state spec function, and I caused a bug in
our code when I tried to move it.

Any suggestions that I might not have thought of on how to fix this issue?

Thanks,

DR

Re: How to perform clean-up after stateful streaming processes an RDD?

Posted by David Rosenstrauch <da...@darose.net>.
Thanks much for the suggestion.  Reading over your mail though, I
realize that I may not have made something clear:  I don't just have a
single external service object; the idea is that I have one per
executor, so that the code running on each executor accesses the
external service independently.  (And performs its cleanup independently
as well.)  So I actually don't want stateUpdater.cleanupExternalService
to run on the driver, but rather to run on each executor.

So to be a bit more explicit, using words rather than code, what I'm
trying to do is:

In the driver: 
* Start reading the incoming stream of data (event strings, in my case) 
* Run stateful streaming to ingest the stream of incoming data and
update state objects with it (user summaries, in my case) 
* Output all the completed user summaries (to Kafka, in my case) for
further downstream processing 

In each executor (i.e., RDD partition), using stateful streaming / the
state spec function: 
* Process a batch of incoming event strings 
* Update each event string into the user summary 
* In the process of building the user summary, retrieve some data from
the external service (lazily initialized) 
* Emit each completed user summary as output from the stateful streaming
state spec function 
* After all event strings in the batch have been processed, perform
cleanup on this executor's interaction with the external service by
writing some data back to the service

I'm able to get all of this working except for that last bullet point.

There doesn't seem to be any way to access the external service
connection object being used in my state spec function and then, after
the RDD batch is done, tell it to write back the data it's holding. 
Stateful streaming doesn't seem to give one much to work with in this
regard.  It doesn't notify you in any way that a batch is complete; it
just repeatedly calls your state spec function for each incoming record,
without any indication as to when one batch ended and another one
started.  And the instances of the objects that each executor
deserializes and calls your state spec function on don't seem to be
accessible from ... well, anywhere else other than in the state spec
function itself.

Any ideas how what I'm trying to do might be achievable using stateful
streaming?

Thanks,

DR 

On Tue, Jun 6, 2017 at 7:31 PM, Gerard Maas <ge...@gmail.com>
wrote:

> It looks like the clean up should go into the foreachRDD function: 
> 
> stateUpdateStream.foreachRdd(...) { rdd =>  
> // do stuff with the rdd 
> 
> stateUpdater.cleanupExternalService    // should work in this position
> } 
> 
> Code within the foreachRDD(*) executes on the driver, so you can keep the state of the object there. 
> 
> What will not work is to update the stateUpdater state from a side effect of the stateUpdateFunction used in the mapWithState transformation and expect those changes to be visible at the call site sketched above. 
> 
> kr, Gerard. 
> 
> (*) a typical construct found in the wild is: 
> dstream.foreachRDD{rdd => 
> // do some preparation 
> rdd.operation{elem => ... } 
> ... 
> // close/clean/report 
> } 
> So the code within the foreachRDD closure executes on the driver, *but* the code within the rdd.operation{...} closure is a spark operation and executes distributed on the executors. 
> One must be careful of not incorrectly mixing the scopes, in particular when holding on to local state. 
> 
> On Wed, Jun 7, 2017 at 1:08 AM, David Rosenstrauch <da...@gmail.com> wrote:
> 
>> We have some code we've written using stateful streaming (mapWithState) which works well for the most part.  The stateful streaming runs, processes the RDD of input data, calls the state spec function for each input record, and does all proper adding and removing from the state cache.
>> 
>> However, I have a need to do some cleanup after stateful streaming processes the input data RDD, and I can't seem to find any place where we can put that code where it will run when it's supposed to.
>> 
>> Essentially our state spec function needs to a) call out to an external service, b) hold some data from that service, and then c) inform the service to clean up the remaining data once the RDD is complete.
>> 
>> I've gotten to the point where the code looks approximately like this:
>> 
>> val eventStream = incomingStream.transform(...)
>> 
>> val stateUpdater = new StateUpdater 
>> val stateUpdateStream = eventStream.mapWithState(stateUpdater.stateUpdateFunction _)
>> 
>> stateUpdateStream.foreachRdd(...) {
>> ...
>> } 
>> stateUpdater.cleanupExternalService    // DOES NOT WORK! 
>> 
>> class StateUpdater extends Serializable {
>> 
>> def stateUpdateFunction(key, value, state) { 
>> if (!state.initalized) { 
>> state.initialize(externalService) 
>> }
>> ... 
>> }
>> 
>> def cleanupExternalService { 
>> externalService.cleanup  // writes some data back to the external service 
>> } 
>> 
>> @transient lazy val externalService = new ExternalService 
>> }
>> 
>> Note that the ExternalService object is holding onto a small bit of state that it needs to write back to the external service once we have completed running the stateUpdateFunction on every record in the RDD.  However this code doesn't actually work.  Because of the way Spark serializes objects on the driver and then deserializes them onto the executor, there's no way for me to get a hold of the ExternalService object that is being used on each RDD partition and clean up its leftover data.  Those objects seem to be held internally somewhere in the bowels of stateful streaming (where it processes an RDD of incoming data and applies it to the state).  And back in the main code where I'm trying to call the cleanup method, I'm actually calling it on a totally different object than the one that ran in the RDD partitions.  And stateful streaming doesn't provide me with any opportunity to perform any cleanup processing - say by calling some "rddDone" method to notify me that it j
 ust
finished doing state processing on an RDD.  It just calls only the statespec function over and over, once for every record, and never notifying me that we've ended processing an RDD or started a new one.
>> 
>> Is there any way out of this conundrum?  I've tried to avoid the problem by moving my interactions with the external service outside of the state spec function.  But that didn't work:  the interaction with the external service is really needed inside of the state spec function, and I caused a bug in our code when I tried to move it.
>> 
>> Any suggestions that I might not have thought of on how to fix this issue?
>> 
>> Thanks,
>> 
>> DR

Re: How to perform clean-up after stateful streaming processes an RDD?

Posted by David Rosenstrauch <da...@gmail.com>.
Thanks much for the suggestion.  Reading over your mail though, I realize
that I may not have made something clear:  I don't just have a single
external service object; the idea is that I have one per executor, so that
the code running on each executor accesses the external service
independently.  (And performs its cleanup independently as well.)  So I
actually don't want stateUpdater.cleanupExternalService to run on the
driver, but rather to run on each executor.

So to be a bit more explicit, using words rather than code, what I'm trying
to do is:

In the driver:
* Start reading the incoming stream of data (event strings, in my case)
* Run stateful streaming to ingest the stream of incoming data and update
state objects with it (user summaries, in my case)
* Output all the completed user summaries (to Kafka, in my case) for
further downstream processing

In each executor (i.e., RDD partition), using stateful streaming / the
state spec function:
* Process a batch of incoming event strings
* Update each event string into the user summary
* In the process of building the user summary, retrieve some data from the
external service (lazily initialized)
* Emit each completed user summary as output from the stateful streaming
state spec function
* After all event strings in the batch have been processed, perform cleanup
on this executor's interaction with the external service by writing some
data back to the service

I'm able to get all of this working except for that last bullet point.

There doesn't seem to be any way to access the external service connection
object being used in my state spec function and then, after the RDD batch
is done, tell it to write back the data it's holding.  Stateful streaming
doesn't seem to give one much to work with in this regard.  It doesn't
notify you in any way that a batch is complete; it just repeatedly calls
your state spec function for each incoming record, without any indication
as to when one batch ended and another one started.  And the instances of
the objects that each executor deserializes and calls your state spec
function on don't seem to be accessible from ... well, anywhere else other
than in the state spec function itself.

Any ideas how what I'm trying to do might be achievable using stateful
streaming?

Thanks,

DR


On Tue, Jun 6, 2017 at 7:31 PM, Gerard Maas <ge...@gmail.com> wrote:

> It looks like the clean up should go into the foreachRDD function:
>
> stateUpdateStream.foreachRdd(...) { rdd =>
> // do stuff with the rdd
>
>   stateUpdater.cleanupExternalService    // should work in this position
> }
>
> Code within the foreachRDD(*) executes on the driver, so you can keep the
> state of the object there.
>
> What will not work is to update the stateUpdater state from a side effect
> of the stateUpdateFunction used in the mapWithState transformation and
> expect those changes to be visible at the call site sketched above.
>
> kr, Gerard.
>
> (*) a typical construct found in the wild is:
> dstream.foreachRDD{rdd =>
>    // do some preparation
>    rdd.operation{elem => ... }
>    ...
>    // close/clean/report
> }
> So the code within the foreachRDD closure executes on the driver, *but*
> the code within the rdd.operation{...} closure is a spark operation and
> executes distributed on the executors.
> One must be careful of not incorrectly mixing the scopes, in particular
> when holding on to local state.
>
>
>
> On Wed, Jun 7, 2017 at 1:08 AM, David Rosenstrauch <da...@gmail.com>
> wrote:
>
>> We have some code we've written using stateful streaming (mapWithState)
>> which works well for the most part.  The stateful streaming runs, processes
>> the RDD of input data, calls the state spec function for each input record,
>> and does all proper adding and removing from the state cache.
>>
>> However, I have a need to do some cleanup after stateful streaming
>> processes the input data RDD, and I can't seem to find any place where we
>> can put that code where it will run when it's supposed to.
>>
>> Essentially our state spec function needs to a) call out to an external
>> service, b) hold some data from that service, and then c) inform the
>> service to clean up the remaining data once the RDD is complete.
>>
>> I've gotten to the point where the code looks approximately like this:
>>
>>
>> val eventStream = incomingStream.transform(...)
>>
>> val stateUpdater = new StateUpdater
>> val stateUpdateStream = eventStream.mapWithState(stateUpdater.stateUpdateFunction
>> _)
>>
>> stateUpdateStream.foreachRdd(...) {
>> ...
>> }
>> stateUpdater.cleanupExternalService    // DOES NOT WORK!
>>
>>
>> class StateUpdater extends Serializable {
>>
>> def stateUpdateFunction(key, value, state) {
>> if (!state.initalized) {
>> state.initialize(externalService)
>> }
>> ...
>> }
>>
>> def cleanupExternalService {
>> externalService.cleanup  // writes some data back to the external service
>> }
>>
>> @transient lazy val externalService = new ExternalService
>> }
>>
>>
>> Note that the ExternalService object is holding onto a small bit of state
>> that it needs to write back to the external service once we have completed
>> running the stateUpdateFunction on every record in the RDD.  However this
>> code doesn't actually work.  Because of the way Spark serializes objects on
>> the driver and then deserializes them onto the executor, there's no way for
>> me to get a hold of the ExternalService object that is being used on each
>> RDD partition and clean up its leftover data.  Those objects seem to be
>> held internally somewhere in the bowels of stateful streaming (where it
>> processes an RDD of incoming data and applies it to the state).  And back
>> in the main code where I'm trying to call the cleanup method, I'm actually
>> calling it on a totally different object than the one that ran in the RDD
>> partitions.  And stateful streaming doesn't provide me with any opportunity
>> to perform any cleanup processing - say by calling some "rddDone" method to
>> notify me that it just finished doing state processing on an RDD.  It just
>> calls only the statespec function over and over, once for every record, and
>> never notifying me that we've ended processing an RDD or started a new one.
>>
>>
>> Is there any way out of this conundrum?  I've tried to avoid the problem
>> by moving my interactions with the external service outside of the state
>> spec function.  But that didn't work:  the interaction with the external
>> service is really needed inside of the state spec function, and I caused a
>> bug in our code when I tried to move it.
>>
>> Any suggestions that I might not have thought of on how to fix this issue?
>>
>> Thanks,
>>
>> DR
>>
>
>

Re: How to perform clean-up after stateful streaming processes an RDD?

Posted by Gerard Maas <ge...@gmail.com>.
It looks like the clean up should go into the foreachRDD function:

stateUpdateStream.foreachRdd(...) { rdd =>
// do stuff with the rdd

  stateUpdater.cleanupExternalService    // should work in this position
}

Code within the foreachRDD(*) executes on the driver, so you can keep the
state of the object there.

What will not work is to update the stateUpdater state from a side effect
of the stateUpdateFunction used in the mapWithState transformation and
expect those changes to be visible at the call site sketched above.

kr, Gerard.

(*) a typical construct found in the wild is:
dstream.foreachRDD{rdd =>
   // do some preparation
   rdd.operation{elem => ... }
   ...
   // close/clean/report
}
So the code within the foreachRDD closure executes on the driver, *but* the
code within the rdd.operation{...} closure is a spark operation and
executes distributed on the executors.
One must be careful of not incorrectly mixing the scopes, in particular
when holding on to local state.



On Wed, Jun 7, 2017 at 1:08 AM, David Rosenstrauch <da...@gmail.com>
wrote:

> We have some code we've written using stateful streaming (mapWithState)
> which works well for the most part.  The stateful streaming runs, processes
> the RDD of input data, calls the state spec function for each input record,
> and does all proper adding and removing from the state cache.
>
> However, I have a need to do some cleanup after stateful streaming
> processes the input data RDD, and I can't seem to find any place where we
> can put that code where it will run when it's supposed to.
>
> Essentially our state spec function needs to a) call out to an external
> service, b) hold some data from that service, and then c) inform the
> service to clean up the remaining data once the RDD is complete.
>
> I've gotten to the point where the code looks approximately like this:
>
>
> val eventStream = incomingStream.transform(...)
>
> val stateUpdater = new StateUpdater
> val stateUpdateStream = eventStream.mapWithState(stateUpdater.stateUpdateFunction
> _)
>
> stateUpdateStream.foreachRdd(...) {
> ...
> }
> stateUpdater.cleanupExternalService    // DOES NOT WORK!
>
>
> class StateUpdater extends Serializable {
>
> def stateUpdateFunction(key, value, state) {
> if (!state.initalized) {
> state.initialize(externalService)
> }
> ...
> }
>
> def cleanupExternalService {
> externalService.cleanup  // writes some data back to the external service
> }
>
> @transient lazy val externalService = new ExternalService
> }
>
>
> Note that the ExternalService object is holding onto a small bit of state
> that it needs to write back to the external service once we have completed
> running the stateUpdateFunction on every record in the RDD.  However this
> code doesn't actually work.  Because of the way Spark serializes objects on
> the driver and then deserializes them onto the executor, there's no way for
> me to get a hold of the ExternalService object that is being used on each
> RDD partition and clean up its leftover data.  Those objects seem to be
> held internally somewhere in the bowels of stateful streaming (where it
> processes an RDD of incoming data and applies it to the state).  And back
> in the main code where I'm trying to call the cleanup method, I'm actually
> calling it on a totally different object than the one that ran in the RDD
> partitions.  And stateful streaming doesn't provide me with any opportunity
> to perform any cleanup processing - say by calling some "rddDone" method to
> notify me that it just finished doing state processing on an RDD.  It just
> calls only the statespec function over and over, once for every record, and
> never notifying me that we've ended processing an RDD or started a new one.
>
>
> Is there any way out of this conundrum?  I've tried to avoid the problem
> by moving my interactions with the external service outside of the state
> spec function.  But that didn't work:  the interaction with the external
> service is really needed inside of the state spec function, and I caused a
> bug in our code when I tried to move it.
>
> Any suggestions that I might not have thought of on how to fix this issue?
>
> Thanks,
>
> DR
>