You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sergey Oboguev <ob...@gmail.com> on 2021/02/20 23:47:53 UTC

Controlling Spark StateStore retention

 I am trying to write a Spark Structured Streaming application consisting
of GroupState construct followed by aggregation.

Events arriving from event sources are bucketized by deviceId and quantized
timestamp, composed together into group state key idTime.

Logical plan consists of stages (in the order of data flow):

FlatMapGroupsWithState
Aggregate


and translates to physical plan (in the same order)

FlatMapGroupsWithState
SerializeFromObject
Project (idTime, timestamp)
HashAggregate(keys=[idTime] ... partial aggregation)
Exchange hashpartitioning(idTime)
HashAggregate(keys=[idTime] ... merge aggregates)
StateStoreRestore [idTime], state info)
HashAggregate(keys=[idTime] ... merge aggregates)
StateStoreSave [idTime], state info)
HashAggregate(keys=[idTime], functions= ...)


This all works, but it appears that partial aggregate state does not ever
get released.

If I send 10 events for some value of idTime, the stream produces an output
batch with count = 10.

If some significant time later (after group state expires) I send 10 more
events for the some value of idTime, the stream produces another output
batch with count = 20. Other aggregates also reflect that both old and new
events were reflected in this subsequent aggregation output batch.

Thus, it appears state information is not cleared from the state store.

This is nice from the standpoint of handling latecomer events, but also
poses a problem: if partial aggregate information per every idTime value is
never cleared from the state store, the state store eventually is going to
run out of space.

Is there a way to control this retention and trigger the release of state
store data for old values idTime, no longer needed?

Thanks for advice.

Re: Controlling Spark StateStore retention

Posted by Jungtaek Lim <ka...@gmail.com>.
Looks like you're trying to add two stateful operations in a chain -
actually this would trigger the limitation of global watermark and lead the
output "possibly" to be incorrect.
We've documented the limitations in the SS guide doc starting from Spark
3.0, so please take time to read the doc to know what would be possible
issues for this and what workaround is available.

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#limitation-of-global-watermark

Starting from Spark 3.0, Spark provides a warning log message when the
pattern is detected. Even in upcoming Spark 3.1, the query having such a
pattern is disallowed unless end users set the config explicitly to force
run.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Sun, Feb 21, 2021 at 8:49 AM Sergey Oboguev <ob...@gmail.com> wrote:

> I am trying to write a Spark Structured Streaming application consisting
> of GroupState construct followed by aggregation.
>
> Events arriving from event sources are bucketized by deviceId and
> quantized timestamp, composed together into group state key idTime.
>
> Logical plan consists of stages (in the order of data flow):
>
> FlatMapGroupsWithState
> Aggregate
>
>
> and translates to physical plan (in the same order)
>
> FlatMapGroupsWithState
> SerializeFromObject
> Project (idTime, timestamp)
> HashAggregate(keys=[idTime] ... partial aggregation)
> Exchange hashpartitioning(idTime)
> HashAggregate(keys=[idTime] ... merge aggregates)
> StateStoreRestore [idTime], state info)
> HashAggregate(keys=[idTime] ... merge aggregates)
> StateStoreSave [idTime], state info)
> HashAggregate(keys=[idTime], functions= ...)
>
>
> This all works, but it appears that partial aggregate state does not ever
> get released.
>
> If I send 10 events for some value of idTime, the stream produces an
> output batch with count = 10.
>
> If some significant time later (after group state expires) I send 10 more
> events for the some value of idTime, the stream produces another output
> batch with count = 20. Other aggregates also reflect that both old and new
> events were reflected in this subsequent aggregation output batch.
>
> Thus, it appears state information is not cleared from the state store.
>
> This is nice from the standpoint of handling latecomer events, but also
> poses a problem: if partial aggregate information per every idTime value is
> never cleared from the state store, the state store eventually is going to
> run out of space.
>
> Is there a way to control this retention and trigger the release of state
> store data for old values idTime, no longer needed?
>
> Thanks for advice.
>