You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juan Gentile <j....@criteo.com> on 2018/08/08 17:02:25 UTC
State in the Scala DataStream API
Hello,
I'm looking at the following page of the documentation
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html
particularly at this piece of code:
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(_._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ( (in._1, c), Some(c + in._2) )
case None => ( (in._1, 0), Some(in._2) )
})
How is the state clear/purge in this case for keys that no longer appear?
Thank you,
Juan
Re: State in the Scala DataStream API
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Juan,
The state will be purged if you return None instead of a Some.
However, this only happens when the function is called for a specific key,
i.e., state won't be automatically removed after some time.
If this is your use case, you have to implement a ProcessFunction and use
timers to manually clean up the state.
Best, Fabian
2018-08-08 19:02 GMT+02:00 Juan Gentile <j....@criteo.com>:
> Hello,
>
>
> I'm looking at the following page of the documentation
>
> https://ci.apache.org/projects/flink/flink-docs-
> stable/dev/stream/state/state.html
>
> particularly at this piece of code:
>
>
> val stream: DataStream[(String, Int)] = ...
> val counts: DataStream[(String, Int)] = stream
> .keyBy(_._1)
> .mapWithState((in: (String, Int), count: Option[Int]) =>
> count match {
> case Some(c) => ( (in._1, c), Some(c + in._2) )
> case None => ( (in._1, 0), Some(in._2) )
> })
>
>
> How is the state clear/purge in this case for keys that no longer appear?
>
>
> Thank you,
>
> Juan
>