You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juho Autio <ju...@rovio.com> on 2018/08/22 08:53:45 UTC

State TTL in Flink 1.6.0

First, I couldn't find anything about State TTL in Flink docs, is there
anything like that? I can manage based on Javadocs & source code, but just
wondering.

Then to main main question, why doesn't the TTL support event time, and is
there any sensible use case for the TTL if the streaming charateristic of
my job is event time?

I have a job that is cleaning up old entries from a keyed MapState by
calling registerEventTimeTimer & implementing the onTimer method. This way
I can keep the state for a certain time in _event time_.

That's more complicated code than it would have to be, so I wanted to
convert by function to use Flink's own state TTL. I started writing this:

        MapStateDescriptor<String, String> stateDesc = new
MapStateDescriptor<>(
                "deviceState", String.class, String.class);
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.milliseconds(stateRetentionMillis))
                // TODO EventTime is not supported?

.setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime)
                .build();
        stateDesc.enableTimeToLive(ttlConfig);

So, I realized that ProcessingTime is the only existing TimeCharacteristic
in StateTtlConfig.

Based on some comments in Flink tickets it seems that it was a conscious
choice, because supporting EventTime TTL would be much heavier:

https://issues.apache.org/jira/browse/FLINK-3089?focusedCommentId=16318013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16318013

So I can't exactly match the current behaviour that guarantees to keep the
state available for 24 hours (or whatever is passed as
--stateRetentionMillis).

However, if we accept the restriction and switch to processing time in
state cleanup, what does it mean?

- As long as stream keeps up with the input rate (from kafka), there's no
big difference, because 24 hours in processing time ~= 24 hours in even
time.
- If the stream is lagging behind a lot, then it would be possible that the
state is cleaned "too early". However we aim at not having a lot of lag, so
this is not a real issue – job would be scaled up to catch up before it
starts lagging too much to get misses because of cleared state. Still, if
we fail to scale up quickly enough, the state might be cleared too early
and cause real trouble.
- One problem is that if the stream is quickly processing a long backlog
(say, start streaming 7 days back in event time), then the state size can
temporarily grow bigger than usual – maybe this wouldn't be a big problem,
but it could at least require extraneous upscaling of resources.
- After restoring from a savepoint, the processing time on the state is as
much older than what was the time of downtime due to job restart. Even this
is not a huge issue as long as the deployment downtime is short compared to
the 24 hour TTL.

Any way, all these issues combined, I'm a bit confused on the whole TTL
feature. Can it be used in event time based streaming in any sensible way?
It seems like it would be more like a cache then, and can't be relied on
well enough.

Thanks.

Juho

Re: State TTL in Flink 1.6.0

Posted by Andrey Zagrebin <an...@data-artisans.com>.
Hi Juho,

As Aljoscha mentioned the current TTL implementation was mostly targeted to data privacy applications
where only processing time matters.

I think the event time can be also useful for TTL and should address your concerns. 
The event time extension is on the road map for the future Flink releases.

Cheers,
Andrey

> On 22 Aug 2018, at 11:57, Aljoscha Krettek <al...@apache.org> wrote:
> 
> Hi Juho,
> 
> The main motivation for the initial implementation of TTL was compliance with new GDPR rules. I.e. data cannot be accessible and must be dropped according to time in the real world, i.e. processing time. The behaviour you describe, with data being dropped if you keep a savepoint for too long, is actually what is required for this use case.
> 
> I do see that also having this for event time can also be useful and it might get implemented in the future. Maybe Stefan can chime in here.
> 
> Best,
> Aljoscha
> 
>> On 22. Aug 2018, at 11:01, Chesnay Schepler <ch...@apache.org> wrote:
>> 
>> Just a quick note for the docs: https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl
>> 
>> On 22.08.2018 10:53, Juho Autio wrote:
>>> First, I couldn't find anything about State TTL in Flink docs, is there anything like that? I can manage based on Javadocs & source code, but just wondering.
>>> 
>>> Then to main main question, why doesn't the TTL support event time, and is there any sensible use case for the TTL if the streaming charateristic of my job is event time?
>>> 
>>> I have a job that is cleaning up old entries from a keyed MapState by calling registerEventTimeTimer & implementing the onTimer method. This way I can keep the state for a certain time in _event time_.
>>> 
>>> That's more complicated code than it would have to be, so I wanted to convert by function to use Flink's own state TTL. I started writing this:
>>> 
>>>       MapStateDescriptor<String, String> stateDesc = new MapStateDescriptor<>(
>>>               "deviceState", String.class, String.class);
>>>       StateTtlConfig ttlConfig = StateTtlConfig
>>> .newBuilder(Time.milliseconds(stateRetentionMillis))
>>>               // TODO EventTime is not supported?
>>> .setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime)
>>>               .build();
>>>       stateDesc.enableTimeToLive(ttlConfig);
>>> 
>>> So, I realized that ProcessingTime is the only existing TimeCharacteristic in StateTtlConfig.
>>> 
>>> Based on some comments in Flink tickets it seems that it was a conscious choice, because supporting EventTime TTL would be much heavier:
>>> 
>>> https://issues.apache.org/jira/browse/FLINK-3089?focusedCommentId=16318013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16318013
>>> 
>>> So I can't exactly match the current behaviour that guarantees to keep the state available for 24 hours (or whatever is passed as --stateRetentionMillis).
>>> 
>>> However, if we accept the restriction and switch to processing time in state cleanup, what does it mean?
>>> 
>>> - As long as stream keeps up with the input rate (from kafka), there's no big difference, because 24 hours in processing time ~= 24 hours in even time.
>>> - If the stream is lagging behind a lot, then it would be possible that the state is cleaned "too early". However we aim at not having a lot of lag, so this is not a real issue – job would be scaled up to catch up before it starts lagging too much to get misses because of cleared state. Still, if we fail to scale up quickly enough, the state might be cleared too early and cause real trouble.
>>> - One problem is that if the stream is quickly processing a long backlog (say, start streaming 7 days back in event time), then the state size can temporarily grow bigger than usual – maybe this wouldn't be a big problem, but it could at least require extraneous upscaling of resources.
>>> - After restoring from a savepoint, the processing time on the state is as much older than what was the time of downtime due to job restart. Even this is not a huge issue as long as the deployment downtime is short compared to the 24 hour TTL.
>>> 
>>> Any way, all these issues combined, I'm a bit confused on the whole TTL feature. Can it be used in event time based streaming in any sensible way? It seems like it would be more like a cache then, and can't be relied on well enough.
>>> 
>>> Thanks.
>>> 
>>> Juho
>> 
>> 
> 


Re: State TTL in Flink 1.6.0

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Juho,

The main motivation for the initial implementation of TTL was compliance with new GDPR rules. I.e. data cannot be accessible and must be dropped according to time in the real world, i.e. processing time. The behaviour you describe, with data being dropped if you keep a savepoint for too long, is actually what is required for this use case.

I do see that also having this for event time can also be useful and it might get implemented in the future. Maybe Stefan can chime in here.

Best,
Aljoscha

> On 22. Aug 2018, at 11:01, Chesnay Schepler <ch...@apache.org> wrote:
> 
> Just a quick note for the docs: https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl
> 
> On 22.08.2018 10:53, Juho Autio wrote:
>> First, I couldn't find anything about State TTL in Flink docs, is there anything like that? I can manage based on Javadocs & source code, but just wondering.
>> 
>> Then to main main question, why doesn't the TTL support event time, and is there any sensible use case for the TTL if the streaming charateristic of my job is event time?
>> 
>> I have a job that is cleaning up old entries from a keyed MapState by calling registerEventTimeTimer & implementing the onTimer method. This way I can keep the state for a certain time in _event time_.
>> 
>> That's more complicated code than it would have to be, so I wanted to convert by function to use Flink's own state TTL. I started writing this:
>> 
>>        MapStateDescriptor<String, String> stateDesc = new MapStateDescriptor<>(
>>                "deviceState", String.class, String.class);
>>        StateTtlConfig ttlConfig = StateTtlConfig
>> .newBuilder(Time.milliseconds(stateRetentionMillis))
>>                // TODO EventTime is not supported?
>> .setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime)
>>                .build();
>>        stateDesc.enableTimeToLive(ttlConfig);
>> 
>> So, I realized that ProcessingTime is the only existing TimeCharacteristic in StateTtlConfig.
>> 
>> Based on some comments in Flink tickets it seems that it was a conscious choice, because supporting EventTime TTL would be much heavier:
>> 
>> https://issues.apache.org/jira/browse/FLINK-3089?focusedCommentId=16318013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16318013
>> 
>> So I can't exactly match the current behaviour that guarantees to keep the state available for 24 hours (or whatever is passed as --stateRetentionMillis).
>> 
>> However, if we accept the restriction and switch to processing time in state cleanup, what does it mean?
>> 
>> - As long as stream keeps up with the input rate (from kafka), there's no big difference, because 24 hours in processing time ~= 24 hours in even time.
>> - If the stream is lagging behind a lot, then it would be possible that the state is cleaned "too early". However we aim at not having a lot of lag, so this is not a real issue – job would be scaled up to catch up before it starts lagging too much to get misses because of cleared state. Still, if we fail to scale up quickly enough, the state might be cleared too early and cause real trouble.
>> - One problem is that if the stream is quickly processing a long backlog (say, start streaming 7 days back in event time), then the state size can temporarily grow bigger than usual – maybe this wouldn't be a big problem, but it could at least require extraneous upscaling of resources.
>> - After restoring from a savepoint, the processing time on the state is as much older than what was the time of downtime due to job restart. Even this is not a huge issue as long as the deployment downtime is short compared to the 24 hour TTL.
>> 
>> Any way, all these issues combined, I'm a bit confused on the whole TTL feature. Can it be used in event time based streaming in any sensible way? It seems like it would be more like a cache then, and can't be relied on well enough.
>> 
>> Thanks.
>> 
>> Juho
> 
> 


Re: State TTL in Flink 1.6.0

Posted by Chesnay Schepler <ch...@apache.org>.
Just a quick note for the docs: 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl

On 22.08.2018 10:53, Juho Autio wrote:
> First, I couldn't find anything about State TTL in Flink docs, is 
> there anything like that? I can manage based on Javadocs & source 
> code, but just wondering.
>
> Then to main main question, why doesn't the TTL support event time, 
> and is there any sensible use case for the TTL if the streaming 
> charateristic of my job is event time?
>
> I have a job that is cleaning up old entries from a keyed MapState by 
> calling registerEventTimeTimer & implementing the onTimer method. This 
> way I can keep the state for a certain time in _event time_.
>
> That's more complicated code than it would have to be, so I wanted to 
> convert by function to use Flink's own state TTL. I started writing this:
>
>         MapStateDescriptor<String, String> stateDesc = new 
> MapStateDescriptor<>(
>                 "deviceState", String.class, String.class);
>         StateTtlConfig ttlConfig = StateTtlConfig
> .newBuilder(Time.milliseconds(stateRetentionMillis))
>                 // TODO EventTime is not supported?
> .setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime)
>                 .build();
>         stateDesc.enableTimeToLive(ttlConfig);
>
> So, I realized that ProcessingTime is the only existing 
> TimeCharacteristic in StateTtlConfig.
>
> Based on some comments in Flink tickets it seems that it was a 
> conscious choice, because supporting EventTime TTL would be much heavier:
>
> https://issues.apache.org/jira/browse/FLINK-3089?focusedCommentId=16318013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16318013
>
> So I can't exactly match the current behaviour that guarantees to keep 
> the state available for 24 hours (or whatever is passed as 
> --stateRetentionMillis).
>
> However, if we accept the restriction and switch to processing time in 
> state cleanup, what does it mean?
>
> - As long as stream keeps up with the input rate (from kafka), there's 
> no big difference, because 24 hours in processing time ~= 24 hours in 
> even time.
> - If the stream is lagging behind a lot, then it would be possible 
> that the state is cleaned "too early". However we aim at not having a 
> lot of lag, so this is not a real issue – job would be scaled up to 
> catch up before it starts lagging too much to get misses because of 
> cleared state. Still, if we fail to scale up quickly enough, the state 
> might be cleared too early and cause real trouble.
> - One problem is that if the stream is quickly processing a long 
> backlog (say, start streaming 7 days back in event time), then the 
> state size can temporarily grow bigger than usual – maybe this 
> wouldn't be a big problem, but it could at least require extraneous 
> upscaling of resources.
> - After restoring from a savepoint, the processing time on the state 
> is as much older than what was the time of downtime due to job 
> restart. Even this is not a huge issue as long as the deployment 
> downtime is short compared to the 24 hour TTL.
>
> Any way, all these issues combined, I'm a bit confused on the whole 
> TTL feature. Can it be used in event time based streaming in any 
> sensible way? It seems like it would be more like a cache then, and 
> can't be relied on well enough.
>
> Thanks.
>
> Juho