You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Fabian Hueske <fh...@gmail.com> on 2018/05/02 12:36:44 UTC

Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

Hi Bowen,

1. The motivation to keep the TTL logic outside of the state backend was
mainly to avoid state backend custom implementations. If we have a generic
approach that would work for all state backends, we could try to put the
logic into a base class like AbstractStateBackend. After all, state cleanup
is tightly related to the responsibilities of state backends.
2. -
3. You're right. We should first call the user code before cleaning up. The
main problem that I see right now is that we have to distinguish between
user and TTL timers. AFAIK, the timer service does not support timer tags
(or another method) to distinguish timers.

I've given you the permissions to create and edit wiki pages.

Best, Fabian

2018-04-30 7:47 GMT+02:00 Bowen Li <bo...@gmail.com>:

> Thanks Fabian! Here're my comments inline, and let me know your thoughts.
>
> 1. Where should the TTL code reside? In the state backend or in the
> operator?
>
> I believe TTL code should not reside in state backend, because a critical
> design is that TTL is independent of and transparent to state backends.
>
> According to my current knowledge, I think it probably should live with
> operators in flink-streaming-java.
>
>
> 2. How to get notified about state accesses? I guess this depends on 1.
>
> You previously suggested using callbacks. I believe that's the right way
> to do decoupling.
>
>
> 3. How to avoid conflicts of TTL timers and user timers?
>
> User timers might always be invoked first? This is not urgent, shall we
> bake it for more time and discuss it along the way?
>
>
>
> Besides, I don't have access to create a FLIP page under
> https://cwiki.apache.org/confluence/display/FLINK/
> Flink+Improvement+Proposals. Can you grant me the proper access?
>
> Thanks,
>
> Bowen
>
>
>
>
> On Tue, Apr 24, 2018 at 2:40 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Bowen,
>>
>> Thanks for updating the proposal. This looks pretty good (as I said
>> before).
>> There are a few areas, that are not yet fully fleshed out:
>>
>> 1. Where should the TTL code reside? In the state backend or in the
>> operator?
>> 2. How to get notified about state accesses? I guess this depends on 1.
>> 3. How to avoid conflicts of TTL timers and user timers?
>>
>> @Stefan (in CC) might have some ideas on these issues as well.
>>
>> Cheers, Fabian
>>
>> 2018-04-22 21:14 GMT+02:00 Bowen <bo...@gmail.com>:
>>
>>> Hello community,
>>>
>>> We've come up with a completely new design for Flink state TTL, documented
>>> here
>>> <https://docs.google.com/document/d/1PPwHMDHGWMOO8YGv08BNi8Fqe_h7SjeALRzmW-ZxSfY/edit?usp=sharing>,
>>> and have run it by a few Flink PMC/committers.
>>>
>>> What do you think? We'd love to hear feedbacks from you
>>>
>>> Thanks,
>>> Bowen
>>>
>>>
>>> On Wed, Feb 7, 2018 at 12:53 PM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi Bowen,
>>>>
>>>> Thanks for the proposal! I think state TTL would be a great feature!
>>>> Actually, we have implemented this for SQL / Table API [1].
>>>> I've added a couple of comments to the design doc.
>>>>
>>>> In principle, I'm not sure if this functionality should be added to the
>>>> state backends.
>>>> We could also use the existing timer service which would have a few nice
>>>> benefits (see my comments in the docs).
>>>>
>>>> Best, Fabian
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>>> dev/table/streaming.html#idle-state-retention-time
>>>>
>>>> 2018-02-06 8:26 GMT+01:00 Bowen Li <bo...@gmail.com>:
>>>>
>>>> > Hi guys,
>>>> >
>>>> > I want to propose a new FLIP -- FLIP-25 - Support User State TTL
>>>> Natively
>>>> > in Flink. This has been one of most handy and most frequently asked
>>>> > features in Flink community. The jira ticket is FLINK-3089
>>>> > <https://issues.apache.org/jira/browse/FLINK-3089>.
>>>> >
>>>> > I've written a rough design
>>>> > <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
>>>> > uureyEr_nPAvSo/edit#>
>>>> > doc
>>>> > <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
>>>> > uureyEr_nPAvSo/edit#>,
>>>> > and developed prototypes for both heap and rocksdb state backends.
>>>> >
>>>> > My question is: shall we create a FLIP page for this? Can I be
>>>> granted the
>>>> > privileges of creating pages in
>>>> > https://cwiki.apache.org/confluence/display/FLINK/
>>>> > Flink+Improvement+Proposals
>>>> > ?
>>>> >
>>>> > Thanks,
>>>> > Bowen
>>>> >
>>>>
>>>
>>>
>>
>

Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

Posted by Bowen Li <bo...@gmail.com>.
Hi,

I like that this design combines the few previously discussed approaches
together to address a much broader scope of TTL use cases. Each previously
discussed approach alone focuses on a narrower scenario than this one, and has
its own limitations.

I'm glad to see the community is willing to spend additional efforts and go
a step further to give users more flexibilities and options by integrating
those solutions together. The combinations of `TtlConfig`, especially
Exact/Relaxed cleanup + Exact/Relaxed visibility, seem to cover most, if
not all, frequent use cases. Taking data compliance requirements into
consideration is a big plus.

Have left some comments in the doc.

Thanks, Bowen


On Mon, Jun 4, 2018 at 1:34 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> I like the separation of visibility and clean-up.
> So far the design only addressed the clean-up aspect and aimed to have
> state accessible as long as possible, i.e., until it was cleared.
> We did not consider the use case of compliance. Support for strict
> visibility is a good idea, IMO.
>
> Moreover, the proposal addresses other aspects that have been discussed
> before, such as support for event & processing time and different timer
> reset strategies.
>
> Best, Fabian
>
>
>
>
>
> 2018-06-04 9:52 GMT+02:00 sihua zhou <su...@163.com>:
>
>> Hi andrey,
>>
>> Thanks for this doc! TBH, personally I prefer the approach you outlined
>> in the doc over the previous one that purly based on timers. I think this
>> approach looks very similar to the approach I outlined in this thread
>> before, so it still face the challenges that @Bowen outlined, but I think
>> maybe we can try to overcome them. Will have a closer look at the doc you
>> post and leave some comments if I can.
>>
>> Best, Sihua
>>
>>
>>
>> On 06/4/2018 15:27,Andrey Zagrebin<an...@data-artisans.com>
>> <an...@data-artisans.com> wrote:
>>
>> Hi everybody,
>>
>> We have been recently brainstorming ideas around state TTL in Flink
>> and compiled our thoughts in the following design doc:
>> https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf
>> 59g12pSGNXRtNFi-tgM <https://docs.google.com/docum
>> ent/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM>
>>
>> Thanks to the community, many things in the doc are based on the previous
>> discussions.
>>
>> As there are a lot of TTL requirements related to data privacy
>> regulations (quite hot topic in EU)
>> and better cleanup strategies sometimes need more research and maybe
>> POCs,
>> we suggest to start with implementing TTL API itself
>> and rather without major changes in current state performance.
>>
>> In a nut shell, the approach requires only appending expiration timestamp
>> bytes to each state value/entry.
>> Firstly, just block access to expired state and clean it up on explicit
>> touching,
>> then gradually adopt cleanup strategies with different guarantees to
>> address space concerns better,
>> including:
>> - filter out expired state during checkpointing process
>> - exact cleanup with timer service (though still requires double storing
>> of keys in both backends)
>> - piggy-back rocksdb compaction using custom filter by TTL (similar to
>> cassandra custom filter)
>> - cleanup of heap regions around randomly accessed bucket
>>
>> Please, feel free to give any feedback and comments.
>>
>> Thanks,
>> Andrey
>>
>> On 27 May 2018, at 09:46, sihua zhou <su...@163.com> wrote:
>>
>> Hi Bowen,
>>
>>
>> Thanks for your clarification, I agree that we should wait for the timer
>> on RocksDB to be finished, after that we could even do some micro-benchmark
>> before start implementing.
>>
>>
>> Best, Sihua
>>
>>
>>
>>
>>
>>
>> On 05/27/2018 15:07,Bowen Li<bo...@gmail.com> wrote:
>> Thank you Fabian and Sihua. I explained more in the doc itself and its
>> comments. I believe the bottom line of v1 are 1) it shouldn't be worse
>> than
>> how users implement TTL themselves today, 2) we should develop a generic
>> architecture for TTL for all (currently two) state backends (impl can
>> vary), 3) optimizations and improvements can come at v2 or later version.
>>
>> For Sihua proposal, similar to the old plan we came up, I share similar
>> concerns as before and wonder if you have answers:
>>
>> - it requires building custom compaction for both state backends, it's a
>> bit unclear in:
>> - when and who and how? The 'when' might be the hardest one, because
>> it really depends on user's use cases. E.g. if it's once a day, at what
>> time in a day?
>> - how well it will integrate with Flink's checkpoint/savepoint
>> mechanism?
>> - any performance regression indications in either state backends?
>> - how much is the ROI if it requires very complicated implementation?
>> - I'm afraid, eventually, the optimization will easily go to a tricky
>> direction we may want to avoid - shall we come up with extra design to
>> amortize the cost?
>> - I'm afraid the custom compaction logic will have to make some quite
>> different assumptions of different state backends. E.g. It's harder to
>> estimate total memory required for user's app in Heap statebackend then,
>> because it depends on when you trigger the compaction and how strictly you
>> will stick to the schedule everyday. Any undeterministic behavior may lead
>> to users allocating less memory than enough, and eventually causes user's
>> apps to be unstable
>> - I want to highlight that lots of users actually want the exact TTL
>> feature. How users implement TTL with timers today actually implies that
>> their logic depends on exact TTL for both shrinking their state size and
>> expiring a key at exactly an expected time, I chatted with a few different
>> Flink users recently and confirmed it. That's why I want to add exact TTL
>> as a potential goal and motivation if possible, along with relaxed TTL and
>> avoiding indefinitely growing state. If we don't provide that out of box,
>> many users may still use the timer way themselves
>>
>> To the concern of doubling keys - in Heap state backend, the key is only a
>> reference so there's only one copy, that's not a problem; in rocksdb state
>> backend, yes, the state size will bigger. Well, First, I believe this's a
>> tradeoff for clearer architecture. Frankly, unlike memory, disk space
>> (even
>> SSD) is relatively cheap and accessible, and we don't normally take it as
>> a
>> big constraint. Second, w.r.t. to performance, I believe rocksdb timers
>> will sit in a different column family than others, which may not cause
>> noticeable perf issue. The rocksdb timer service is on is way, and I want
>> to see how it's implemented first before asserting if there're truly any
>> potential perf burden. Finally, there're also improvements we can make
>> after v1, including relaxed TTL and smaller timer state size. E.g. Flink
>> can approximate timers within a user configured time range (say within 5
>> sec) into a single timer. I don't have concretely plan for that yet, but
>> it's doable.
>>
>> Stefan is adding rocksdb timer and bringing timer service more closely to
>> keyed backends, which aligned very well with what we want in this FLIP. I
>> suggest we wait and keep a close eye on those efforts, and as they mature,
>> we'll have a much better idea of the whole picture.
>>
>> Thanks, Bowen
>>
>>
>>
>> On Sat, May 26, 2018 at 7:52 AM, sihua zhou <su...@163.com> wrote:
>>
>>
>>
>> Hi,
>>
>>
>> thanks for your reply Fabian, about the overhead of storing the key bytes
>> twice, I think maybe the situation is even a bit worse, in general, it
>> means that the total amount of data to be stored has doubled(for each key,
>> we need to store two records, one for timer, one for state). This maybe a
>> bit uncomfortable when the state backend is based on RocksDB, because the
>> timers are living together with the other states in the same RocksDB
>> instance, which means that with using TTL, the amount of the records in
>> RocksDB has to be doubled, I'm afraid this may hurt its performance.
>>
>>
>> Concerning the approach to add a timestamp to each value, TBH, I didn't
>> have a deep thought on it yet and also not sure about it...In general, it
>> can be described as follows:
>>
>>
>> - We attach a TS for every state record.
>> - When getting the record, we check the TS to see if its outdated.
>> - For the records that we will never touch again, we use the compaction to
>> remove them. maybe one day one compaction is enough.
>>
>>
>> Best, Sihua
>>
>>
>> On 05/16/2018 16:38,Fabian Hueske<fh...@gmail.com> wrote:
>> Hi,
>>
>>
>> Yes. IMO it makes sense to put the logic into the abstract base classes to
>> share the implementation across different state backends and state
>> primitives.
>>
>> The overhead of storing the key twice is a valid concern, but I'm not sure
>> about the approach to add a timestamp to each value.
>> How would we discard state then? Iterating always over all (or a range of)
>> keys to check if their state should be expired?
>> That would only work efficiently if we relax the clean-up logic which
>> could be a valid design decision.
>>
>>
>>
>> Best, Fabian
>>
>>
>>
>> 2018-05-14 9:33 GMT+02:00 sihua zhou <su...@163.com>:
>>
>> Hi Fabian,
>> thanks you very much for the reply, just a alternative. Can we implement
>> the TTL logical in `AbstractStateBackend` and `AbstractState`? A simplest
>> way is to append the `ts` to the state's value? and we use the backend's
>> `current time`(its also can be event time and process time) to judge
>> whether the data is outdated? The pros is that:
>> - state is puly backed by state backend.
>> - for each key-value, we only need to store the one copy of key? (if we
>> implement TTL base on timer, we need to store two copys of key, one for
>> the
>> timer and one for the keyed state)
>>
>>
>> What do you think?
>>
>>
>> Best,
>> Sihua
>>
>>
>> On 05/14/2018 15:20,Fabian Hueske<fh...@gmail.com> wrote:
>> Hi Sihua,
>>
>>
>> I think it makes sense to couple state TTL to the timer service. We'll
>> need some kind of timers to expire state, so I think we should reuse
>> components that we have instead of implementing another timer service.
>>
>> Moreover, using the same timer service and using the public state APIs
>> helps to have a consistent TTL behavior across different state backend.
>>
>>
>> Best, Fabian
>>
>>
>>
>> 2018-05-14 8:51 GMT+02:00 sihua zhou <su...@163.com>:
>>
>> Hi Bowen,
>> thanks for your doc! I left some comments on the doc, the main concerning
>> is that it makes me feel like a coupling that the TTL need to depend on
>> `timer`. Because I think the TTL is a property of the state, so it should
>> be backed by the state backend. If we implement the TTL base on the timer,
>> than it looks like a coupling... it makes me feel that the backend for
>> state becomes `state backend` + `timer`. And in fact, IMO, even the
>> `timer`
>> should depend on `state backend` in theroy, it's a type of HeapState that
>> scoped to the `key group`(not scoped to per key like the current keyed
>> state).
>>
>>
>> And I found the doc is for exact TTL, I wonder if we can support a relax
>> TTL that could provides a better performance. Because to me, the reason
>> that I need TTL is just to prevent the state size growing infinitly(I
>> believe I'm not the only one like this), so a relax version is enough, if
>> there is a relax TTL which have a better performance, I would prefer that.
>>
>>
>> What do you think?
>>
>>
>> Best,
>> Sihua
>>
>>
>>
>>
>>
>>
>> On 05/14/2018 14:31,Bowen Li<bo...@gmail.com> wrote:
>> Thank you, Fabian! I've created the FLIP-25 page
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>> 25%3A+Support+User+State+TTL+Natively>
>> .
>>
>> To continue our discussion points:
>> 1. I see what you mean now. I totally agree. Since we don't completely
>> know
>> it now, shall we experiment or prototype a little bit before deciding
>> this?
>> 2. -
>> 3. Adding tags to timers is an option.
>>
>> Another option I came up with recently, is like this: let
>> *InternalTimerService
>> *maintains user timers and TTL timers separately. Implementation classes
>> of
>> InternalTimerService should add two new collections of timers,  e.g.
>> *Ttl*ProcessingTimeTimersQueue
>> and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
>> InternalTimerService#onProcessingTime() and advanceWatermark(), they will
>> first iterate through ProcessingTimeTimers and EventTimeTimers (user
>> timers) and then through *Ttl*ProcessingTimeTimers and
>> *Ttl*EventTimeTimers
>>
>> (Ttl timers).
>>
>> We'll also add the following new internal APIs to register Ttl timers:
>>
>> ```
>> @Internal
>> public void registerTtlProcessingTimeTimer(N namespace, long time);
>>
>> @Internal
>> public void registerTtlEventTimeTimer(N namespace, long time);
>> ```
>>
>> The biggest advantage, compared to option 1, is that it doesn't impact
>> existing timer-related checkpoint/savepoint, restore and migrations.
>>
>> What do you think?  And, any other Flink committers want to chime in for
>> ideas? I've also documented the above two discussion points to the FLIP
>> page.
>>
>> Thanks,
>> Bowen
>>
>>
>> On Wed, May 2, 2018 at 5:36 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>
>> Hi Bowen,
>>
>> 1. The motivation to keep the TTL logic outside of the state backend was
>> mainly to avoid state backend custom implementations. If we have a generic
>> approach that would work for all state backends, we could try to put the
>> logic into a base class like AbstractStateBackend. After all, state
>> cleanup
>> is tightly related to the responsibilities of state backends.
>> 2. -
>> 3. You're right. We should first call the user code before cleaning up.
>> The main problem that I see right now is that we have to distinguish
>> between user and TTL timers. AFAIK, the timer service does not support
>> timer tags (or another method) to distinguish timers.
>>
>> I've given you the permissions to create and edit wiki pages.
>>
>> Best, Fabian
>>
>> 2018-04-30 7:47 GMT+02:00 Bowen Li <bo...@gmail.com>:
>>
>>
>> Thanks Fabian! Here're my comments inline, and let me know your thoughts.
>>
>> 1. Where should the TTL code reside? In the state backend or in the
>> operator?
>>
>> I believe TTL code should not reside in state backend, because a critical
>> design is that TTL is independent of and transparent to state backends.
>>
>> According to my current knowledge, I think it probably should live with
>> operators in flink-streaming-java.
>>
>>
>> 2. How to get notified about state accesses? I guess this depends on 1.
>>
>> You previously suggested using callbacks. I believe that's the right way
>> to do decoupling.
>>
>>
>> 3. How to avoid conflicts of TTL timers and user timers?
>>
>> User timers might always be invoked first? This is not urgent, shall we
>> bake it for more time and discuss it along the way?
>>
>>
>>
>> Besides, I don't have access to create a FLIP page under
>> https://cwiki.apache.org/confluence/display/FLINK/Flin
>> k+Improvement+Proposals. Can you grant me the proper access?
>>
>> Thanks,
>>
>> Bowen
>>
>>
>>
>>
>> On Tue, Apr 24, 2018 at 2:40 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>
>> Hi Bowen,
>>
>> Thanks for updating the proposal. This looks pretty good (as I said
>> before).
>> There are a few areas, that are not yet fully fleshed out:
>>
>> 1. Where should the TTL code reside? In the state backend or in the
>> operator?
>> 2. How to get notified about state accesses? I guess this depends on 1.
>> 3. How to avoid conflicts of TTL timers and user timers?
>>
>> @Stefan (in CC) might have some ideas on these issues as well.
>>
>> Cheers, Fabian
>>
>> 2018-04-22 21:14 GMT+02:00 Bowen <bo...@gmail.com>:
>>
>>
>> Hello community,
>>
>> We've come up with a completely new design for Flink state TTL, documented
>> here
>>
>> <https://docs.google.com/document/d/1PPwHMDHGWMOO8YGv08BNi8Fqe_
>> h7SjeALRzmW-ZxSfY/edit?usp=sharing>,
>>
>> and have run it by a few Flink PMC/committers.
>>
>> What do you think? We'd love to hear feedbacks from you
>>
>> Thanks,
>> Bowen
>>
>>
>> On Wed, Feb 7, 2018 at 12:53 PM, Fabian Hueske <fh...@gmail.com>
>> wrote:
>>
>> Hi Bowen,
>>
>> Thanks for the proposal! I think state TTL would be a great feature!
>> Actually, we have implemented this for SQL / Table API [1].
>> I've added a couple of comments to the design doc.
>>
>> In principle, I'm not sure if this functionality should be added to the
>> state backends.
>> We could also use the existing timer service which would have a few
>> nice
>> benefits (see my comments in the docs).
>>
>> Best, Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/table/streaming.html#idle-state-retention-time
>>
>> 2018-02-06 8:26 GMT+01:00 Bowen Li <bo...@gmail.com>:
>>
>> Hi guys,
>>
>> I want to propose a new FLIP -- FLIP-25 - Support User State TTL
>> Natively
>> in Flink. This has been one of most handy and most frequently asked
>> features in Flink community. The jira ticket is FLINK-3089
>> <https://issues.apache.org/jira/browse/FLINK-3089>.
>>
>> I've written a rough design
>> <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
>> uureyEr_nPAvSo/edit#>
>> doc
>> <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
>> uureyEr_nPAvSo/edit#>,
>> and developed prototypes for both heap and rocksdb state backends.
>>
>> My question is: shall we create a FLIP page for this? Can I be
>> granted the
>> privileges of creating pages in
>> https://cwiki.apache.org/confluence/display/FLINK/
>> Flink+Improvement+Proposals
>> ?
>>
>> Thanks,
>> Bowen
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>

Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

I like the separation of visibility and clean-up.
So far the design only addressed the clean-up aspect and aimed to have
state accessible as long as possible, i.e., until it was cleared.
We did not consider the use case of compliance. Support for strict
visibility is a good idea, IMO.

Moreover, the proposal addresses other aspects that have been discussed
before, such as support for event & processing time and different timer
reset strategies.

Best, Fabian





2018-06-04 9:52 GMT+02:00 sihua zhou <su...@163.com>:

> Hi andrey,
>
> Thanks for this doc! TBH, personally I prefer the approach you outlined in
> the doc over the previous one that purly based on timers. I think this
> approach looks very similar to the approach I outlined in this thread
> before, so it still face the challenges that @Bowen outlined, but I think
> maybe we can try to overcome them. Will have a closer look at the doc you
> post and leave some comments if I can.
>
> Best, Sihua
>
>
>
> On 06/4/2018 15:27,Andrey Zagrebin<an...@data-artisans.com>
> <an...@data-artisans.com> wrote:
>
> Hi everybody,
>
> We have been recently brainstorming ideas around state TTL in Flink
> and compiled our thoughts in the following design doc:
> https://docs.google.com/document/d/1SI_WoXAfOd4_
> NKpGyk4yh3mf59g12pSGNXRtNFi-tgM <https://docs.google.com/
> document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM>
>
> Thanks to the community, many things in the doc are based on the previous
> discussions.
>
> As there are a lot of TTL requirements related to data privacy regulations
> (quite hot topic in EU)
> and better cleanup strategies sometimes need more research and maybe POCs,
> we suggest to start with implementing TTL API itself
> and rather without major changes in current state performance.
>
> In a nut shell, the approach requires only appending expiration timestamp
> bytes to each state value/entry.
> Firstly, just block access to expired state and clean it up on explicit
> touching,
> then gradually adopt cleanup strategies with different guarantees to
> address space concerns better,
> including:
> - filter out expired state during checkpointing process
> - exact cleanup with timer service (though still requires double storing
> of keys in both backends)
> - piggy-back rocksdb compaction using custom filter by TTL (similar to
> cassandra custom filter)
> - cleanup of heap regions around randomly accessed bucket
>
> Please, feel free to give any feedback and comments.
>
> Thanks,
> Andrey
>
> On 27 May 2018, at 09:46, sihua zhou <su...@163.com> wrote:
>
> Hi Bowen,
>
>
> Thanks for your clarification, I agree that we should wait for the timer
> on RocksDB to be finished, after that we could even do some micro-benchmark
> before start implementing.
>
>
> Best, Sihua
>
>
>
>
>
>
> On 05/27/2018 15:07,Bowen Li<bo...@gmail.com> wrote:
> Thank you Fabian and Sihua. I explained more in the doc itself and its
> comments. I believe the bottom line of v1 are 1) it shouldn't be worse than
> how users implement TTL themselves today, 2) we should develop a generic
> architecture for TTL for all (currently two) state backends (impl can
> vary), 3) optimizations and improvements can come at v2 or later version.
>
> For Sihua proposal, similar to the old plan we came up, I share similar
> concerns as before and wonder if you have answers:
>
> - it requires building custom compaction for both state backends, it's a
> bit unclear in:
> - when and who and how? The 'when' might be the hardest one, because
> it really depends on user's use cases. E.g. if it's once a day, at what
> time in a day?
> - how well it will integrate with Flink's checkpoint/savepoint
> mechanism?
> - any performance regression indications in either state backends?
> - how much is the ROI if it requires very complicated implementation?
> - I'm afraid, eventually, the optimization will easily go to a tricky
> direction we may want to avoid - shall we come up with extra design to
> amortize the cost?
> - I'm afraid the custom compaction logic will have to make some quite
> different assumptions of different state backends. E.g. It's harder to
> estimate total memory required for user's app in Heap statebackend then,
> because it depends on when you trigger the compaction and how strictly you
> will stick to the schedule everyday. Any undeterministic behavior may lead
> to users allocating less memory than enough, and eventually causes user's
> apps to be unstable
> - I want to highlight that lots of users actually want the exact TTL
> feature. How users implement TTL with timers today actually implies that
> their logic depends on exact TTL for both shrinking their state size and
> expiring a key at exactly an expected time, I chatted with a few different
> Flink users recently and confirmed it. That's why I want to add exact TTL
> as a potential goal and motivation if possible, along with relaxed TTL and
> avoiding indefinitely growing state. If we don't provide that out of box,
> many users may still use the timer way themselves
>
> To the concern of doubling keys - in Heap state backend, the key is only a
> reference so there's only one copy, that's not a problem; in rocksdb state
> backend, yes, the state size will bigger. Well, First, I believe this's a
> tradeoff for clearer architecture. Frankly, unlike memory, disk space (even
> SSD) is relatively cheap and accessible, and we don't normally take it as a
> big constraint. Second, w.r.t. to performance, I believe rocksdb timers
> will sit in a different column family than others, which may not cause
> noticeable perf issue. The rocksdb timer service is on is way, and I want
> to see how it's implemented first before asserting if there're truly any
> potential perf burden. Finally, there're also improvements we can make
> after v1, including relaxed TTL and smaller timer state size. E.g. Flink
> can approximate timers within a user configured time range (say within 5
> sec) into a single timer. I don't have concretely plan for that yet, but
> it's doable.
>
> Stefan is adding rocksdb timer and bringing timer service more closely to
> keyed backends, which aligned very well with what we want in this FLIP. I
> suggest we wait and keep a close eye on those efforts, and as they mature,
> we'll have a much better idea of the whole picture.
>
> Thanks, Bowen
>
>
>
> On Sat, May 26, 2018 at 7:52 AM, sihua zhou <su...@163.com> wrote:
>
>
>
> Hi,
>
>
> thanks for your reply Fabian, about the overhead of storing the key bytes
> twice, I think maybe the situation is even a bit worse, in general, it
> means that the total amount of data to be stored has doubled(for each key,
> we need to store two records, one for timer, one for state). This maybe a
> bit uncomfortable when the state backend is based on RocksDB, because the
> timers are living together with the other states in the same RocksDB
> instance, which means that with using TTL, the amount of the records in
> RocksDB has to be doubled, I'm afraid this may hurt its performance.
>
>
> Concerning the approach to add a timestamp to each value, TBH, I didn't
> have a deep thought on it yet and also not sure about it...In general, it
> can be described as follows:
>
>
> - We attach a TS for every state record.
> - When getting the record, we check the TS to see if its outdated.
> - For the records that we will never touch again, we use the compaction to
> remove them. maybe one day one compaction is enough.
>
>
> Best, Sihua
>
>
> On 05/16/2018 16:38,Fabian Hueske<fh...@gmail.com> wrote:
> Hi,
>
>
> Yes. IMO it makes sense to put the logic into the abstract base classes to
> share the implementation across different state backends and state
> primitives.
>
> The overhead of storing the key twice is a valid concern, but I'm not sure
> about the approach to add a timestamp to each value.
> How would we discard state then? Iterating always over all (or a range of)
> keys to check if their state should be expired?
> That would only work efficiently if we relax the clean-up logic which
> could be a valid design decision.
>
>
>
> Best, Fabian
>
>
>
> 2018-05-14 9:33 GMT+02:00 sihua zhou <su...@163.com>:
>
> Hi Fabian,
> thanks you very much for the reply, just a alternative. Can we implement
> the TTL logical in `AbstractStateBackend` and `AbstractState`? A simplest
> way is to append the `ts` to the state's value? and we use the backend's
> `current time`(its also can be event time and process time) to judge
> whether the data is outdated? The pros is that:
> - state is puly backed by state backend.
> - for each key-value, we only need to store the one copy of key? (if we
> implement TTL base on timer, we need to store two copys of key, one for the
> timer and one for the keyed state)
>
>
> What do you think?
>
>
> Best,
> Sihua
>
>
> On 05/14/2018 15:20,Fabian Hueske<fh...@gmail.com> wrote:
> Hi Sihua,
>
>
> I think it makes sense to couple state TTL to the timer service. We'll
> need some kind of timers to expire state, so I think we should reuse
> components that we have instead of implementing another timer service.
>
> Moreover, using the same timer service and using the public state APIs
> helps to have a consistent TTL behavior across different state backend.
>
>
> Best, Fabian
>
>
>
> 2018-05-14 8:51 GMT+02:00 sihua zhou <su...@163.com>:
>
> Hi Bowen,
> thanks for your doc! I left some comments on the doc, the main concerning
> is that it makes me feel like a coupling that the TTL need to depend on
> `timer`. Because I think the TTL is a property of the state, so it should
> be backed by the state backend. If we implement the TTL base on the timer,
> than it looks like a coupling... it makes me feel that the backend for
> state becomes `state backend` + `timer`. And in fact, IMO, even the `timer`
> should depend on `state backend` in theroy, it's a type of HeapState that
> scoped to the `key group`(not scoped to per key like the current keyed
> state).
>
>
> And I found the doc is for exact TTL, I wonder if we can support a relax
> TTL that could provides a better performance. Because to me, the reason
> that I need TTL is just to prevent the state size growing infinitly(I
> believe I'm not the only one like this), so a relax version is enough, if
> there is a relax TTL which have a better performance, I would prefer that.
>
>
> What do you think?
>
>
> Best,
> Sihua
>
>
>
>
>
>
> On 05/14/2018 14:31,Bowen Li<bo...@gmail.com> wrote:
> Thank you, Fabian! I've created the FLIP-25 page
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 25%3A+Support+User+State+TTL+Natively>
> .
>
> To continue our discussion points:
> 1. I see what you mean now. I totally agree. Since we don't completely know
> it now, shall we experiment or prototype a little bit before deciding this?
> 2. -
> 3. Adding tags to timers is an option.
>
> Another option I came up with recently, is like this: let
> *InternalTimerService
> *maintains user timers and TTL timers separately. Implementation classes of
> InternalTimerService should add two new collections of timers,  e.g.
> *Ttl*ProcessingTimeTimersQueue
> and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
> InternalTimerService#onProcessingTime() and advanceWatermark(), they will
> first iterate through ProcessingTimeTimers and EventTimeTimers (user
> timers) and then through *Ttl*ProcessingTimeTimers and *Ttl*EventTimeTimers
>
> (Ttl timers).
>
> We'll also add the following new internal APIs to register Ttl timers:
>
> ```
> @Internal
> public void registerTtlProcessingTimeTimer(N namespace, long time);
>
> @Internal
> public void registerTtlEventTimeTimer(N namespace, long time);
> ```
>
> The biggest advantage, compared to option 1, is that it doesn't impact
> existing timer-related checkpoint/savepoint, restore and migrations.
>
> What do you think?  And, any other Flink committers want to chime in for
> ideas? I've also documented the above two discussion points to the FLIP
> page.
>
> Thanks,
> Bowen
>
>
> On Wed, May 2, 2018 at 5:36 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>
> Hi Bowen,
>
> 1. The motivation to keep the TTL logic outside of the state backend was
> mainly to avoid state backend custom implementations. If we have a generic
> approach that would work for all state backends, we could try to put the
> logic into a base class like AbstractStateBackend. After all, state cleanup
> is tightly related to the responsibilities of state backends.
> 2. -
> 3. You're right. We should first call the user code before cleaning up.
> The main problem that I see right now is that we have to distinguish
> between user and TTL timers. AFAIK, the timer service does not support
> timer tags (or another method) to distinguish timers.
>
> I've given you the permissions to create and edit wiki pages.
>
> Best, Fabian
>
> 2018-04-30 7:47 GMT+02:00 Bowen Li <bo...@gmail.com>:
>
>
> Thanks Fabian! Here're my comments inline, and let me know your thoughts.
>
> 1. Where should the TTL code reside? In the state backend or in the
> operator?
>
> I believe TTL code should not reside in state backend, because a critical
> design is that TTL is independent of and transparent to state backends.
>
> According to my current knowledge, I think it probably should live with
> operators in flink-streaming-java.
>
>
> 2. How to get notified about state accesses? I guess this depends on 1.
>
> You previously suggested using callbacks. I believe that's the right way
> to do decoupling.
>
>
> 3. How to avoid conflicts of TTL timers and user timers?
>
> User timers might always be invoked first? This is not urgent, shall we
> bake it for more time and discuss it along the way?
>
>
>
> Besides, I don't have access to create a FLIP page under
> https://cwiki.apache.org/confluence/display/FLINK/Flin
> k+Improvement+Proposals. Can you grant me the proper access?
>
> Thanks,
>
> Bowen
>
>
>
>
> On Tue, Apr 24, 2018 at 2:40 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>
> Hi Bowen,
>
> Thanks for updating the proposal. This looks pretty good (as I said
> before).
> There are a few areas, that are not yet fully fleshed out:
>
> 1. Where should the TTL code reside? In the state backend or in the
> operator?
> 2. How to get notified about state accesses? I guess this depends on 1.
> 3. How to avoid conflicts of TTL timers and user timers?
>
> @Stefan (in CC) might have some ideas on these issues as well.
>
> Cheers, Fabian
>
> 2018-04-22 21:14 GMT+02:00 Bowen <bo...@gmail.com>:
>
>
> Hello community,
>
> We've come up with a completely new design for Flink state TTL, documented
> here
>
> <https://docs.google.com/document/d/1PPwHMDHGWMOO8YGv08BNi8Fqe_
> h7SjeALRzmW-ZxSfY/edit?usp=sharing>,
>
> and have run it by a few Flink PMC/committers.
>
> What do you think? We'd love to hear feedbacks from you
>
> Thanks,
> Bowen
>
>
> On Wed, Feb 7, 2018 at 12:53 PM, Fabian Hueske <fh...@gmail.com>
> wrote:
>
> Hi Bowen,
>
> Thanks for the proposal! I think state TTL would be a great feature!
> Actually, we have implemented this for SQL / Table API [1].
> I've added a couple of comments to the design doc.
>
> In principle, I'm not sure if this functionality should be added to the
> state backends.
> We could also use the existing timer service which would have a few
> nice
> benefits (see my comments in the docs).
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> dev/table/streaming.html#idle-state-retention-time
>
> 2018-02-06 8:26 GMT+01:00 Bowen Li <bo...@gmail.com>:
>
> Hi guys,
>
> I want to propose a new FLIP -- FLIP-25 - Support User State TTL
> Natively
> in Flink. This has been one of most handy and most frequently asked
> features in Flink community. The jira ticket is FLINK-3089
> <https://issues.apache.org/jira/browse/FLINK-3089>.
>
> I've written a rough design
> <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
> uureyEr_nPAvSo/edit#>
> doc
> <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
> uureyEr_nPAvSo/edit#>,
> and developed prototypes for both heap and rocksdb state backends.
>
> My question is: shall we create a FLIP page for this? Can I be
> granted the
> privileges of creating pages in
> https://cwiki.apache.org/confluence/display/FLINK/
> Flink+Improvement+Proposals
> ?
>
> Thanks,
> Bowen
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

Posted by sihua zhou <su...@163.com>.
Hi andrey,


Thanks for this doc! TBH, personally I prefer the approach you outlined in the doc over the previous one that purly based on timers. I think this approach looks very similar to the approach I outlined in this thread before, so it still face the challenges that @Bowen outlined, but I think maybe we can try to overcome them. Will have a closer look at the doc you post and leave some comments if I can.


Best, Sihua






On 06/4/2018 15:27,Andrey Zagrebin<an...@data-artisans.com> wrote:
Hi everybody,

We have been recently brainstorming ideas around state TTL in Flink
and compiled our thoughts in the following design doc:
https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM <https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM>
Thanks to the community, many things in the doc are based on the previous discussions.

As there are a lot of TTL requirements related to data privacy regulations (quite hot topic in EU)
and better cleanup strategies sometimes need more research and maybe POCs,
we suggest to start with implementing TTL API itself
and rather without major changes in current state performance.

In a nut shell, the approach requires only appending expiration timestamp bytes to each state value/entry.
Firstly, just block access to expired state and clean it up on explicit touching,
then gradually adopt cleanup strategies with different guarantees to address space concerns better,
including:
- filter out expired state during checkpointing process
- exact cleanup with timer service (though still requires double storing of keys in both backends)
- piggy-back rocksdb compaction using custom filter by TTL (similar to cassandra custom filter)
- cleanup of heap regions around randomly accessed bucket

Please, feel free to give any feedback and comments.

Thanks,
Andrey

On 27 May 2018, at 09:46, sihua zhou <su...@163.com> wrote:

Hi Bowen,


Thanks for your clarification, I agree that we should wait for the timer on RocksDB to be finished, after that we could even do some micro-benchmark before start implementing.


Best, Sihua






On 05/27/2018 15:07,Bowen Li<bo...@gmail.com> wrote:
Thank you Fabian and Sihua. I explained more in the doc itself and its
comments. I believe the bottom line of v1 are 1) it shouldn't be worse than
how users implement TTL themselves today, 2) we should develop a generic
architecture for TTL for all (currently two) state backends (impl can
vary), 3) optimizations and improvements can come at v2 or later version.

For Sihua proposal, similar to the old plan we came up, I share similar
concerns as before and wonder if you have answers:

- it requires building custom compaction for both state backends, it's a
bit unclear in:
- when and who and how? The 'when' might be the hardest one, because
it really depends on user's use cases. E.g. if it's once a day, at what
time in a day?
- how well it will integrate with Flink's checkpoint/savepoint
mechanism?
- any performance regression indications in either state backends?
- how much is the ROI if it requires very complicated implementation?
- I'm afraid, eventually, the optimization will easily go to a tricky
direction we may want to avoid - shall we come up with extra design to
amortize the cost?
- I'm afraid the custom compaction logic will have to make some quite
different assumptions of different state backends. E.g. It's harder to
estimate total memory required for user's app in Heap statebackend then,
because it depends on when you trigger the compaction and how strictly you
will stick to the schedule everyday. Any undeterministic behavior may lead
to users allocating less memory than enough, and eventually causes user's
apps to be unstable
- I want to highlight that lots of users actually want the exact TTL
feature. How users implement TTL with timers today actually implies that
their logic depends on exact TTL for both shrinking their state size and
expiring a key at exactly an expected time, I chatted with a few different
Flink users recently and confirmed it. That's why I want to add exact TTL
as a potential goal and motivation if possible, along with relaxed TTL and
avoiding indefinitely growing state. If we don't provide that out of box,
many users may still use the timer way themselves

To the concern of doubling keys - in Heap state backend, the key is only a
reference so there's only one copy, that's not a problem; in rocksdb state
backend, yes, the state size will bigger. Well, First, I believe this's a
tradeoff for clearer architecture. Frankly, unlike memory, disk space (even
SSD) is relatively cheap and accessible, and we don't normally take it as a
big constraint. Second, w.r.t. to performance, I believe rocksdb timers
will sit in a different column family than others, which may not cause
noticeable perf issue. The rocksdb timer service is on is way, and I want
to see how it's implemented first before asserting if there're truly any
potential perf burden. Finally, there're also improvements we can make
after v1, including relaxed TTL and smaller timer state size. E.g. Flink
can approximate timers within a user configured time range (say within 5
sec) into a single timer. I don't have concretely plan for that yet, but
it's doable.

Stefan is adding rocksdb timer and bringing timer service more closely to
keyed backends, which aligned very well with what we want in this FLIP. I
suggest we wait and keep a close eye on those efforts, and as they mature,
we'll have a much better idea of the whole picture.

Thanks, Bowen



On Sat, May 26, 2018 at 7:52 AM, sihua zhou <su...@163.com> wrote:



Hi,


thanks for your reply Fabian, about the overhead of storing the key bytes
twice, I think maybe the situation is even a bit worse, in general, it
means that the total amount of data to be stored has doubled(for each key,
we need to store two records, one for timer, one for state). This maybe a
bit uncomfortable when the state backend is based on RocksDB, because the
timers are living together with the other states in the same RocksDB
instance, which means that with using TTL, the amount of the records in
RocksDB has to be doubled, I'm afraid this may hurt its performance.


Concerning the approach to add a timestamp to each value, TBH, I didn't
have a deep thought on it yet and also not sure about it...In general, it
can be described as follows:


- We attach a TS for every state record.
- When getting the record, we check the TS to see if its outdated.
- For the records that we will never touch again, we use the compaction to
remove them. maybe one day one compaction is enough.


Best, Sihua


On 05/16/2018 16:38,Fabian Hueske<fh...@gmail.com> wrote:
Hi,


Yes. IMO it makes sense to put the logic into the abstract base classes to
share the implementation across different state backends and state
primitives.

The overhead of storing the key twice is a valid concern, but I'm not sure
about the approach to add a timestamp to each value.
How would we discard state then? Iterating always over all (or a range of)
keys to check if their state should be expired?
That would only work efficiently if we relax the clean-up logic which
could be a valid design decision.



Best, Fabian



2018-05-14 9:33 GMT+02:00 sihua zhou <su...@163.com>:

Hi Fabian,
thanks you very much for the reply, just a alternative. Can we implement
the TTL logical in `AbstractStateBackend` and `AbstractState`? A simplest
way is to append the `ts` to the state's value? and we use the backend's
`current time`(its also can be event time and process time) to judge
whether the data is outdated? The pros is that:
- state is puly backed by state backend.
- for each key-value, we only need to store the one copy of key? (if we
implement TTL base on timer, we need to store two copys of key, one for the
timer and one for the keyed state)


What do you think?


Best,
Sihua


On 05/14/2018 15:20,Fabian Hueske<fh...@gmail.com> wrote:
Hi Sihua,


I think it makes sense to couple state TTL to the timer service. We'll
need some kind of timers to expire state, so I think we should reuse
components that we have instead of implementing another timer service.

Moreover, using the same timer service and using the public state APIs
helps to have a consistent TTL behavior across different state backend.


Best, Fabian



2018-05-14 8:51 GMT+02:00 sihua zhou <su...@163.com>:

Hi Bowen,
thanks for your doc! I left some comments on the doc, the main concerning
is that it makes me feel like a coupling that the TTL need to depend on
`timer`. Because I think the TTL is a property of the state, so it should
be backed by the state backend. If we implement the TTL base on the timer,
than it looks like a coupling... it makes me feel that the backend for
state becomes `state backend` + `timer`. And in fact, IMO, even the `timer`
should depend on `state backend` in theroy, it's a type of HeapState that
scoped to the `key group`(not scoped to per key like the current keyed
state).


And I found the doc is for exact TTL, I wonder if we can support a relax
TTL that could provides a better performance. Because to me, the reason
that I need TTL is just to prevent the state size growing infinitly(I
believe I'm not the only one like this), so a relax version is enough, if
there is a relax TTL which have a better performance, I would prefer that.


What do you think?


Best,
Sihua






On 05/14/2018 14:31,Bowen Li<bo...@gmail.com> wrote:
Thank you, Fabian! I've created the FLIP-25 page
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-
25%3A+Support+User+State+TTL+Natively>
.

To continue our discussion points:
1. I see what you mean now. I totally agree. Since we don't completely know
it now, shall we experiment or prototype a little bit before deciding this?
2. -
3. Adding tags to timers is an option.

Another option I came up with recently, is like this: let
*InternalTimerService
*maintains user timers and TTL timers separately. Implementation classes of
InternalTimerService should add two new collections of timers,  e.g.
*Ttl*ProcessingTimeTimersQueue
and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
InternalTimerService#onProcessingTime() and advanceWatermark(), they will
first iterate through ProcessingTimeTimers and EventTimeTimers (user
timers) and then through *Ttl*ProcessingTimeTimers and *Ttl*EventTimeTimers

(Ttl timers).

We'll also add the following new internal APIs to register Ttl timers:

```
@Internal
public void registerTtlProcessingTimeTimer(N namespace, long time);

@Internal
public void registerTtlEventTimeTimer(N namespace, long time);
```

The biggest advantage, compared to option 1, is that it doesn't impact
existing timer-related checkpoint/savepoint, restore and migrations.

What do you think?  And, any other Flink committers want to chime in for
ideas? I've also documented the above two discussion points to the FLIP
page.

Thanks,
Bowen


On Wed, May 2, 2018 at 5:36 AM, Fabian Hueske <fh...@gmail.com> wrote:


Hi Bowen,

1. The motivation to keep the TTL logic outside of the state backend was
mainly to avoid state backend custom implementations. If we have a generic
approach that would work for all state backends, we could try to put the
logic into a base class like AbstractStateBackend. After all, state cleanup
is tightly related to the responsibilities of state backends.
2. -
3. You're right. We should first call the user code before cleaning up.
The main problem that I see right now is that we have to distinguish
between user and TTL timers. AFAIK, the timer service does not support
timer tags (or another method) to distinguish timers.

I've given you the permissions to create and edit wiki pages.

Best, Fabian

2018-04-30 7:47 GMT+02:00 Bowen Li <bo...@gmail.com>:


Thanks Fabian! Here're my comments inline, and let me know your thoughts.

1. Where should the TTL code reside? In the state backend or in the
operator?

I believe TTL code should not reside in state backend, because a critical
design is that TTL is independent of and transparent to state backends.

According to my current knowledge, I think it probably should live with
operators in flink-streaming-java.


2. How to get notified about state accesses? I guess this depends on 1.

You previously suggested using callbacks. I believe that's the right way
to do decoupling.


3. How to avoid conflicts of TTL timers and user timers?

User timers might always be invoked first? This is not urgent, shall we
bake it for more time and discuss it along the way?



Besides, I don't have access to create a FLIP page under
https://cwiki.apache.org/confluence/display/FLINK/Flin
k+Improvement+Proposals. Can you grant me the proper access?

Thanks,

Bowen




On Tue, Apr 24, 2018 at 2:40 AM, Fabian Hueske <fh...@gmail.com> wrote:


Hi Bowen,

Thanks for updating the proposal. This looks pretty good (as I said
before).
There are a few areas, that are not yet fully fleshed out:

1. Where should the TTL code reside? In the state backend or in the
operator?
2. How to get notified about state accesses? I guess this depends on 1.
3. How to avoid conflicts of TTL timers and user timers?

@Stefan (in CC) might have some ideas on these issues as well.

Cheers, Fabian

2018-04-22 21:14 GMT+02:00 Bowen <bo...@gmail.com>:


Hello community,

We've come up with a completely new design for Flink state TTL, documented
here

<https://docs.google.com/document/d/1PPwHMDHGWMOO8YGv08BNi8Fqe_
h7SjeALRzmW-ZxSfY/edit?usp=sharing>,

and have run it by a few Flink PMC/committers.

What do you think? We'd love to hear feedbacks from you

Thanks,
Bowen


On Wed, Feb 7, 2018 at 12:53 PM, Fabian Hueske <fh...@gmail.com>
wrote:

Hi Bowen,

Thanks for the proposal! I think state TTL would be a great feature!
Actually, we have implemented this for SQL / Table API [1].
I've added a couple of comments to the design doc.

In principle, I'm not sure if this functionality should be added to the
state backends.
We could also use the existing timer service which would have a few
nice
benefits (see my comments in the docs).

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/
dev/table/streaming.html#idle-state-retention-time

2018-02-06 8:26 GMT+01:00 Bowen Li <bo...@gmail.com>:

Hi guys,

I want to propose a new FLIP -- FLIP-25 - Support User State TTL
Natively
in Flink. This has been one of most handy and most frequently asked
features in Flink community. The jira ticket is FLINK-3089
<https://issues.apache.org/jira/browse/FLINK-3089>.

I've written a rough design
<https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
uureyEr_nPAvSo/edit#>
doc
<https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
uureyEr_nPAvSo/edit#>,
and developed prototypes for both heap and rocksdb state backends.

My question is: shall we create a FLIP page for this? Can I be
granted the
privileges of creating pages in
https://cwiki.apache.org/confluence/display/FLINK/
Flink+Improvement+Proposals
?

Thanks,
Bowen















Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

Posted by Andrey Zagrebin <an...@data-artisans.com>.
Hi everybody,

We have been recently brainstorming ideas around state TTL in Flink 
and compiled our thoughts in the following design doc:
https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM <https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgM>
Thanks to the community, many things in the doc are based on the previous discussions.

As there are a lot of TTL requirements related to data privacy regulations (quite hot topic in EU) 
and better cleanup strategies sometimes need more research and maybe POCs, 
we suggest to start with implementing TTL API itself 
and rather without major changes in current state performance.

In a nut shell, the approach requires only appending expiration timestamp bytes to each state value/entry.
Firstly, just block access to expired state and clean it up on explicit touching, 
then gradually adopt cleanup strategies with different guarantees to address space concerns better, 
including:
- filter out expired state during checkpointing process
- exact cleanup with timer service (though still requires double storing of keys in both backends)
- piggy-back rocksdb compaction using custom filter by TTL (similar to cassandra custom filter)
- cleanup of heap regions around randomly accessed bucket

Please, feel free to give any feedback and comments.

Thanks,
Andrey

> On 27 May 2018, at 09:46, sihua zhou <su...@163.com> wrote:
> 
> Hi Bowen,
> 
> 
> Thanks for your clarification, I agree that we should wait for the timer on RocksDB to be finished, after that we could even do some micro-benchmark before start implementing.
> 
> 
> Best, Sihua
> 
> 
> 
> 
> 
> 
> On 05/27/2018 15:07,Bowen Li<bo...@gmail.com> wrote:
> Thank you Fabian and Sihua. I explained more in the doc itself and its
> comments. I believe the bottom line of v1 are 1) it shouldn't be worse than
> how users implement TTL themselves today, 2) we should develop a generic
> architecture for TTL for all (currently two) state backends (impl can
> vary), 3) optimizations and improvements can come at v2 or later version.
> 
> For Sihua proposal, similar to the old plan we came up, I share similar
> concerns as before and wonder if you have answers:
> 
> - it requires building custom compaction for both state backends, it's a
> bit unclear in:
> - when and who and how? The 'when' might be the hardest one, because
> it really depends on user's use cases. E.g. if it's once a day, at what
> time in a day?
> - how well it will integrate with Flink's checkpoint/savepoint
> mechanism?
> - any performance regression indications in either state backends?
> - how much is the ROI if it requires very complicated implementation?
> - I'm afraid, eventually, the optimization will easily go to a tricky
> direction we may want to avoid - shall we come up with extra design to
> amortize the cost?
> - I'm afraid the custom compaction logic will have to make some quite
> different assumptions of different state backends. E.g. It's harder to
> estimate total memory required for user's app in Heap statebackend then,
> because it depends on when you trigger the compaction and how strictly you
> will stick to the schedule everyday. Any undeterministic behavior may lead
> to users allocating less memory than enough, and eventually causes user's
> apps to be unstable
> - I want to highlight that lots of users actually want the exact TTL
> feature. How users implement TTL with timers today actually implies that
> their logic depends on exact TTL for both shrinking their state size and
> expiring a key at exactly an expected time, I chatted with a few different
> Flink users recently and confirmed it. That's why I want to add exact TTL
> as a potential goal and motivation if possible, along with relaxed TTL and
> avoiding indefinitely growing state. If we don't provide that out of box,
> many users may still use the timer way themselves
> 
> To the concern of doubling keys - in Heap state backend, the key is only a
> reference so there's only one copy, that's not a problem; in rocksdb state
> backend, yes, the state size will bigger. Well, First, I believe this's a
> tradeoff for clearer architecture. Frankly, unlike memory, disk space (even
> SSD) is relatively cheap and accessible, and we don't normally take it as a
> big constraint. Second, w.r.t. to performance, I believe rocksdb timers
> will sit in a different column family than others, which may not cause
> noticeable perf issue. The rocksdb timer service is on is way, and I want
> to see how it's implemented first before asserting if there're truly any
> potential perf burden. Finally, there're also improvements we can make
> after v1, including relaxed TTL and smaller timer state size. E.g. Flink
> can approximate timers within a user configured time range (say within 5
> sec) into a single timer. I don't have concretely plan for that yet, but
> it's doable.
> 
> Stefan is adding rocksdb timer and bringing timer service more closely to
> keyed backends, which aligned very well with what we want in this FLIP. I
> suggest we wait and keep a close eye on those efforts, and as they mature,
> we'll have a much better idea of the whole picture.
> 
> Thanks, Bowen
> 
> 
> 
> On Sat, May 26, 2018 at 7:52 AM, sihua zhou <su...@163.com> wrote:
> 
> 
> 
> Hi,
> 
> 
> thanks for your reply Fabian, about the overhead of storing the key bytes
> twice, I think maybe the situation is even a bit worse, in general, it
> means that the total amount of data to be stored has doubled(for each key,
> we need to store two records, one for timer, one for state). This maybe a
> bit uncomfortable when the state backend is based on RocksDB, because the
> timers are living together with the other states in the same RocksDB
> instance, which means that with using TTL, the amount of the records in
> RocksDB has to be doubled, I'm afraid this may hurt its performance.
> 
> 
> Concerning the approach to add a timestamp to each value, TBH, I didn't
> have a deep thought on it yet and also not sure about it...In general, it
> can be described as follows:
> 
> 
> - We attach a TS for every state record.
> - When getting the record, we check the TS to see if its outdated.
> - For the records that we will never touch again, we use the compaction to
> remove them. maybe one day one compaction is enough.
> 
> 
> Best, Sihua
> 
> 
> On 05/16/2018 16:38,Fabian Hueske<fh...@gmail.com> wrote:
> Hi,
> 
> 
> Yes. IMO it makes sense to put the logic into the abstract base classes to
> share the implementation across different state backends and state
> primitives.
> 
> The overhead of storing the key twice is a valid concern, but I'm not sure
> about the approach to add a timestamp to each value.
> How would we discard state then? Iterating always over all (or a range of)
> keys to check if their state should be expired?
> That would only work efficiently if we relax the clean-up logic which
> could be a valid design decision.
> 
> 
> 
> Best, Fabian
> 
> 
> 
> 2018-05-14 9:33 GMT+02:00 sihua zhou <su...@163.com>:
> 
> Hi Fabian,
> thanks you very much for the reply, just a alternative. Can we implement
> the TTL logical in `AbstractStateBackend` and `AbstractState`? A simplest
> way is to append the `ts` to the state's value? and we use the backend's
> `current time`(its also can be event time and process time) to judge
> whether the data is outdated? The pros is that:
> - state is puly backed by state backend.
> - for each key-value, we only need to store the one copy of key? (if we
> implement TTL base on timer, we need to store two copys of key, one for the
> timer and one for the keyed state)
> 
> 
> What do you think?
> 
> 
> Best,
> Sihua
> 
> 
> On 05/14/2018 15:20,Fabian Hueske<fh...@gmail.com> wrote:
> Hi Sihua,
> 
> 
> I think it makes sense to couple state TTL to the timer service. We'll
> need some kind of timers to expire state, so I think we should reuse
> components that we have instead of implementing another timer service.
> 
> Moreover, using the same timer service and using the public state APIs
> helps to have a consistent TTL behavior across different state backend.
> 
> 
> Best, Fabian
> 
> 
> 
> 2018-05-14 8:51 GMT+02:00 sihua zhou <su...@163.com>:
> 
> Hi Bowen,
> thanks for your doc! I left some comments on the doc, the main concerning
> is that it makes me feel like a coupling that the TTL need to depend on
> `timer`. Because I think the TTL is a property of the state, so it should
> be backed by the state backend. If we implement the TTL base on the timer,
> than it looks like a coupling... it makes me feel that the backend for
> state becomes `state backend` + `timer`. And in fact, IMO, even the `timer`
> should depend on `state backend` in theroy, it's a type of HeapState that
> scoped to the `key group`(not scoped to per key like the current keyed
> state).
> 
> 
> And I found the doc is for exact TTL, I wonder if we can support a relax
> TTL that could provides a better performance. Because to me, the reason
> that I need TTL is just to prevent the state size growing infinitly(I
> believe I'm not the only one like this), so a relax version is enough, if
> there is a relax TTL which have a better performance, I would prefer that.
> 
> 
> What do you think?
> 
> 
> Best,
> Sihua
> 
> 
> 
> 
> 
> 
> On 05/14/2018 14:31,Bowen Li<bo...@gmail.com> wrote:
> Thank you, Fabian! I've created the FLIP-25 page
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 25%3A+Support+User+State+TTL+Natively>
> .
> 
> To continue our discussion points:
> 1. I see what you mean now. I totally agree. Since we don't completely know
> it now, shall we experiment or prototype a little bit before deciding this?
> 2. -
> 3. Adding tags to timers is an option.
> 
> Another option I came up with recently, is like this: let
> *InternalTimerService
> *maintains user timers and TTL timers separately. Implementation classes of
> InternalTimerService should add two new collections of timers,  e.g.
> *Ttl*ProcessingTimeTimersQueue
> and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
> InternalTimerService#onProcessingTime() and advanceWatermark(), they will
> first iterate through ProcessingTimeTimers and EventTimeTimers (user
> timers) and then through *Ttl*ProcessingTimeTimers and *Ttl*EventTimeTimers
> 
> (Ttl timers).
> 
> We'll also add the following new internal APIs to register Ttl timers:
> 
> ```
> @Internal
> public void registerTtlProcessingTimeTimer(N namespace, long time);
> 
> @Internal
> public void registerTtlEventTimeTimer(N namespace, long time);
> ```
> 
> The biggest advantage, compared to option 1, is that it doesn't impact
> existing timer-related checkpoint/savepoint, restore and migrations.
> 
> What do you think?  And, any other Flink committers want to chime in for
> ideas? I've also documented the above two discussion points to the FLIP
> page.
> 
> Thanks,
> Bowen
> 
> 
> On Wed, May 2, 2018 at 5:36 AM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> 
> Hi Bowen,
> 
> 1. The motivation to keep the TTL logic outside of the state backend was
> mainly to avoid state backend custom implementations. If we have a generic
> approach that would work for all state backends, we could try to put the
> logic into a base class like AbstractStateBackend. After all, state cleanup
> is tightly related to the responsibilities of state backends.
> 2. -
> 3. You're right. We should first call the user code before cleaning up.
> The main problem that I see right now is that we have to distinguish
> between user and TTL timers. AFAIK, the timer service does not support
> timer tags (or another method) to distinguish timers.
> 
> I've given you the permissions to create and edit wiki pages.
> 
> Best, Fabian
> 
> 2018-04-30 7:47 GMT+02:00 Bowen Li <bo...@gmail.com>:
> 
> 
> Thanks Fabian! Here're my comments inline, and let me know your thoughts.
> 
> 1. Where should the TTL code reside? In the state backend or in the
> operator?
> 
> I believe TTL code should not reside in state backend, because a critical
> design is that TTL is independent of and transparent to state backends.
> 
> According to my current knowledge, I think it probably should live with
> operators in flink-streaming-java.
> 
> 
> 2. How to get notified about state accesses? I guess this depends on 1.
> 
> You previously suggested using callbacks. I believe that's the right way
> to do decoupling.
> 
> 
> 3. How to avoid conflicts of TTL timers and user timers?
> 
> User timers might always be invoked first? This is not urgent, shall we
> bake it for more time and discuss it along the way?
> 
> 
> 
> Besides, I don't have access to create a FLIP page under
> https://cwiki.apache.org/confluence/display/FLINK/Flin
> k+Improvement+Proposals. Can you grant me the proper access?
> 
> Thanks,
> 
> Bowen
> 
> 
> 
> 
> On Tue, Apr 24, 2018 at 2:40 AM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> 
> Hi Bowen,
> 
> Thanks for updating the proposal. This looks pretty good (as I said
> before).
> There are a few areas, that are not yet fully fleshed out:
> 
> 1. Where should the TTL code reside? In the state backend or in the
> operator?
> 2. How to get notified about state accesses? I guess this depends on 1.
> 3. How to avoid conflicts of TTL timers and user timers?
> 
> @Stefan (in CC) might have some ideas on these issues as well.
> 
> Cheers, Fabian
> 
> 2018-04-22 21:14 GMT+02:00 Bowen <bo...@gmail.com>:
> 
> 
> Hello community,
> 
> We've come up with a completely new design for Flink state TTL, documented
> here
> 
> <https://docs.google.com/document/d/1PPwHMDHGWMOO8YGv08BNi8Fqe_
> h7SjeALRzmW-ZxSfY/edit?usp=sharing>,
> 
> and have run it by a few Flink PMC/committers.
> 
> What do you think? We'd love to hear feedbacks from you
> 
> Thanks,
> Bowen
> 
> 
> On Wed, Feb 7, 2018 at 12:53 PM, Fabian Hueske <fh...@gmail.com>
> wrote:
> 
> Hi Bowen,
> 
> Thanks for the proposal! I think state TTL would be a great feature!
> Actually, we have implemented this for SQL / Table API [1].
> I've added a couple of comments to the design doc.
> 
> In principle, I'm not sure if this functionality should be added to the
> state backends.
> We could also use the existing timer service which would have a few
> nice
> benefits (see my comments in the docs).
> 
> Best, Fabian
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> dev/table/streaming.html#idle-state-retention-time
> 
> 2018-02-06 8:26 GMT+01:00 Bowen Li <bo...@gmail.com>:
> 
> Hi guys,
> 
> I want to propose a new FLIP -- FLIP-25 - Support User State TTL
> Natively
> in Flink. This has been one of most handy and most frequently asked
> features in Flink community. The jira ticket is FLINK-3089
> <https://issues.apache.org/jira/browse/FLINK-3089>.
> 
> I've written a rough design
> <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
> uureyEr_nPAvSo/edit#>
> doc
> <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
> uureyEr_nPAvSo/edit#>,
> and developed prototypes for both heap and rocksdb state backends.
> 
> My question is: shall we create a FLIP page for this? Can I be
> granted the
> privileges of creating pages in
> https://cwiki.apache.org/confluence/display/FLINK/
> Flink+Improvement+Proposals
> ?
> 
> Thanks,
> Bowen
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 


Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

Posted by sihua zhou <su...@163.com>.
Hi Bowen,


Thanks for your clarification, I agree that we should wait for the timer on RocksDB to be finished, after that we could even do some micro-benchmark before start implementing.


Best, Sihua






On 05/27/2018 15:07,Bowen Li<bo...@gmail.com> wrote:
Thank you Fabian and Sihua. I explained more in the doc itself and its
comments. I believe the bottom line of v1 are 1) it shouldn't be worse than
how users implement TTL themselves today, 2) we should develop a generic
architecture for TTL for all (currently two) state backends (impl can
vary), 3) optimizations and improvements can come at v2 or later version.

For Sihua proposal, similar to the old plan we came up, I share similar
concerns as before and wonder if you have answers:

- it requires building custom compaction for both state backends, it's a
bit unclear in:
- when and who and how? The 'when' might be the hardest one, because
it really depends on user's use cases. E.g. if it's once a day, at what
time in a day?
- how well it will integrate with Flink's checkpoint/savepoint
mechanism?
- any performance regression indications in either state backends?
- how much is the ROI if it requires very complicated implementation?
- I'm afraid, eventually, the optimization will easily go to a tricky
direction we may want to avoid - shall we come up with extra design to
amortize the cost?
- I'm afraid the custom compaction logic will have to make some quite
different assumptions of different state backends. E.g. It's harder to
estimate total memory required for user's app in Heap statebackend then,
because it depends on when you trigger the compaction and how strictly you
will stick to the schedule everyday. Any undeterministic behavior may lead
to users allocating less memory than enough, and eventually causes user's
apps to be unstable
- I want to highlight that lots of users actually want the exact TTL
feature. How users implement TTL with timers today actually implies that
their logic depends on exact TTL for both shrinking their state size and
expiring a key at exactly an expected time, I chatted with a few different
Flink users recently and confirmed it. That's why I want to add exact TTL
as a potential goal and motivation if possible, along with relaxed TTL and
avoiding indefinitely growing state. If we don't provide that out of box,
many users may still use the timer way themselves

To the concern of doubling keys - in Heap state backend, the key is only a
reference so there's only one copy, that's not a problem; in rocksdb state
backend, yes, the state size will bigger. Well, First, I believe this's a
tradeoff for clearer architecture. Frankly, unlike memory, disk space (even
SSD) is relatively cheap and accessible, and we don't normally take it as a
big constraint. Second, w.r.t. to performance, I believe rocksdb timers
will sit in a different column family than others, which may not cause
noticeable perf issue. The rocksdb timer service is on is way, and I want
to see how it's implemented first before asserting if there're truly any
potential perf burden. Finally, there're also improvements we can make
after v1, including relaxed TTL and smaller timer state size. E.g. Flink
can approximate timers within a user configured time range (say within 5
sec) into a single timer. I don't have concretely plan for that yet, but
it's doable.

Stefan is adding rocksdb timer and bringing timer service more closely to
keyed backends, which aligned very well with what we want in this FLIP. I
suggest we wait and keep a close eye on those efforts, and as they mature,
we'll have a much better idea of the whole picture.

Thanks, Bowen



On Sat, May 26, 2018 at 7:52 AM, sihua zhou <su...@163.com> wrote:



Hi,


thanks for your reply Fabian, about the overhead of storing the key bytes
twice, I think maybe the situation is even a bit worse, in general, it
means that the total amount of data to be stored has doubled(for each key,
we need to store two records, one for timer, one for state). This maybe a
bit uncomfortable when the state backend is based on RocksDB, because the
timers are living together with the other states in the same RocksDB
instance, which means that with using TTL, the amount of the records in
RocksDB has to be doubled, I'm afraid this may hurt its performance.


Concerning the approach to add a timestamp to each value, TBH, I didn't
have a deep thought on it yet and also not sure about it...In general, it
can be described as follows:


- We attach a TS for every state record.
- When getting the record, we check the TS to see if its outdated.
- For the records that we will never touch again, we use the compaction to
remove them. maybe one day one compaction is enough.


Best, Sihua


On 05/16/2018 16:38,Fabian Hueske<fh...@gmail.com> wrote:
Hi,


Yes. IMO it makes sense to put the logic into the abstract base classes to
share the implementation across different state backends and state
primitives.

The overhead of storing the key twice is a valid concern, but I'm not sure
about the approach to add a timestamp to each value.
How would we discard state then? Iterating always over all (or a range of)
keys to check if their state should be expired?
That would only work efficiently if we relax the clean-up logic which
could be a valid design decision.



Best, Fabian



2018-05-14 9:33 GMT+02:00 sihua zhou <su...@163.com>:

Hi Fabian,
thanks you very much for the reply, just a alternative. Can we implement
the TTL logical in `AbstractStateBackend` and `AbstractState`? A simplest
way is to append the `ts` to the state's value? and we use the backend's
`current time`(its also can be event time and process time) to judge
whether the data is outdated? The pros is that:
- state is puly backed by state backend.
- for each key-value, we only need to store the one copy of key? (if we
implement TTL base on timer, we need to store two copys of key, one for the
timer and one for the keyed state)


What do you think?


Best,
Sihua


On 05/14/2018 15:20,Fabian Hueske<fh...@gmail.com> wrote:
Hi Sihua,


I think it makes sense to couple state TTL to the timer service. We'll
need some kind of timers to expire state, so I think we should reuse
components that we have instead of implementing another timer service.

Moreover, using the same timer service and using the public state APIs
helps to have a consistent TTL behavior across different state backend.


Best, Fabian



2018-05-14 8:51 GMT+02:00 sihua zhou <su...@163.com>:

Hi Bowen,
thanks for your doc! I left some comments on the doc, the main concerning
is that it makes me feel like a coupling that the TTL need to depend on
`timer`. Because I think the TTL is a property of the state, so it should
be backed by the state backend. If we implement the TTL base on the timer,
than it looks like a coupling... it makes me feel that the backend for
state becomes `state backend` + `timer`. And in fact, IMO, even the `timer`
should depend on `state backend` in theroy, it's a type of HeapState that
scoped to the `key group`(not scoped to per key like the current keyed
state).


And I found the doc is for exact TTL, I wonder if we can support a relax
TTL that could provides a better performance. Because to me, the reason
that I need TTL is just to prevent the state size growing infinitly(I
believe I'm not the only one like this), so a relax version is enough, if
there is a relax TTL which have a better performance, I would prefer that.


What do you think?


Best,
Sihua






On 05/14/2018 14:31,Bowen Li<bo...@gmail.com> wrote:
Thank you, Fabian! I've created the FLIP-25 page
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-
25%3A+Support+User+State+TTL+Natively>
.

To continue our discussion points:
1. I see what you mean now. I totally agree. Since we don't completely know
it now, shall we experiment or prototype a little bit before deciding this?
2. -
3. Adding tags to timers is an option.

Another option I came up with recently, is like this: let
*InternalTimerService
*maintains user timers and TTL timers separately. Implementation classes of
InternalTimerService should add two new collections of timers,  e.g.
*Ttl*ProcessingTimeTimersQueue
and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
InternalTimerService#onProcessingTime() and advanceWatermark(), they will
first iterate through ProcessingTimeTimers and EventTimeTimers (user
timers) and then through *Ttl*ProcessingTimeTimers and *Ttl*EventTimeTimers

(Ttl timers).

We'll also add the following new internal APIs to register Ttl timers:

```
@Internal
public void registerTtlProcessingTimeTimer(N namespace, long time);

@Internal
public void registerTtlEventTimeTimer(N namespace, long time);
```

The biggest advantage, compared to option 1, is that it doesn't impact
existing timer-related checkpoint/savepoint, restore and migrations.

What do you think?  And, any other Flink committers want to chime in for
ideas? I've also documented the above two discussion points to the FLIP
page.

Thanks,
Bowen


On Wed, May 2, 2018 at 5:36 AM, Fabian Hueske <fh...@gmail.com> wrote:


Hi Bowen,

1. The motivation to keep the TTL logic outside of the state backend was
mainly to avoid state backend custom implementations. If we have a generic
approach that would work for all state backends, we could try to put the
logic into a base class like AbstractStateBackend. After all, state cleanup
is tightly related to the responsibilities of state backends.
2. -
3. You're right. We should first call the user code before cleaning up.
The main problem that I see right now is that we have to distinguish
between user and TTL timers. AFAIK, the timer service does not support
timer tags (or another method) to distinguish timers.

I've given you the permissions to create and edit wiki pages.

Best, Fabian

2018-04-30 7:47 GMT+02:00 Bowen Li <bo...@gmail.com>:


Thanks Fabian! Here're my comments inline, and let me know your thoughts.

1. Where should the TTL code reside? In the state backend or in the
operator?

I believe TTL code should not reside in state backend, because a critical
design is that TTL is independent of and transparent to state backends.

According to my current knowledge, I think it probably should live with
operators in flink-streaming-java.


2. How to get notified about state accesses? I guess this depends on 1.

You previously suggested using callbacks. I believe that's the right way
to do decoupling.


3. How to avoid conflicts of TTL timers and user timers?

User timers might always be invoked first? This is not urgent, shall we
bake it for more time and discuss it along the way?



Besides, I don't have access to create a FLIP page under
https://cwiki.apache.org/confluence/display/FLINK/Flin
k+Improvement+Proposals. Can you grant me the proper access?

Thanks,

Bowen




On Tue, Apr 24, 2018 at 2:40 AM, Fabian Hueske <fh...@gmail.com> wrote:


Hi Bowen,

Thanks for updating the proposal. This looks pretty good (as I said
before).
There are a few areas, that are not yet fully fleshed out:

1. Where should the TTL code reside? In the state backend or in the
operator?
2. How to get notified about state accesses? I guess this depends on 1.
3. How to avoid conflicts of TTL timers and user timers?

@Stefan (in CC) might have some ideas on these issues as well.

Cheers, Fabian

2018-04-22 21:14 GMT+02:00 Bowen <bo...@gmail.com>:


Hello community,

We've come up with a completely new design for Flink state TTL, documented
here

<https://docs.google.com/document/d/1PPwHMDHGWMOO8YGv08BNi8Fqe_
h7SjeALRzmW-ZxSfY/edit?usp=sharing>,

and have run it by a few Flink PMC/committers.

What do you think? We'd love to hear feedbacks from you

Thanks,
Bowen


On Wed, Feb 7, 2018 at 12:53 PM, Fabian Hueske <fh...@gmail.com>
wrote:

Hi Bowen,

Thanks for the proposal! I think state TTL would be a great feature!
Actually, we have implemented this for SQL / Table API [1].
I've added a couple of comments to the design doc.

In principle, I'm not sure if this functionality should be added to the
state backends.
We could also use the existing timer service which would have a few
nice
benefits (see my comments in the docs).

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/
dev/table/streaming.html#idle-state-retention-time

2018-02-06 8:26 GMT+01:00 Bowen Li <bo...@gmail.com>:

Hi guys,

I want to propose a new FLIP -- FLIP-25 - Support User State TTL
Natively
in Flink. This has been one of most handy and most frequently asked
features in Flink community. The jira ticket is FLINK-3089
<https://issues.apache.org/jira/browse/FLINK-3089>.

I've written a rough design
<https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
uureyEr_nPAvSo/edit#>
doc
<https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
uureyEr_nPAvSo/edit#>,
and developed prototypes for both heap and rocksdb state backends.

My question is: shall we create a FLIP page for this? Can I be
granted the
privileges of creating pages in
https://cwiki.apache.org/confluence/display/FLINK/
Flink+Improvement+Proposals
?

Thanks,
Bowen














Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

Posted by Bowen Li <bo...@gmail.com>.
Thank you Fabian and Sihua. I explained more in the doc itself and its
comments. I believe the bottom line of v1 are 1) it shouldn't be worse than
how users implement TTL themselves today, 2) we should develop a generic
architecture for TTL for all (currently two) state backends (impl can
vary), 3) optimizations and improvements can come at v2 or later version.

For Sihua proposal, similar to the old plan we came up, I share similar
concerns as before and wonder if you have answers:

   - it requires building custom compaction for both state backends, it's a
   bit unclear in:
      - when and who and how? The 'when' might be the hardest one, because
      it really depends on user's use cases. E.g. if it's once a day, at what
      time in a day?
      - how well it will integrate with Flink's checkpoint/savepoint
      mechanism?
      - any performance regression indications in either state backends?
      - how much is the ROI if it requires very complicated implementation?
      - I'm afraid, eventually, the optimization will easily go to a tricky
      direction we may want to avoid - shall we come up with extra design to
      amortize the cost?
   - I'm afraid the custom compaction logic will have to make some quite
   different assumptions of different state backends. E.g. It's harder to
   estimate total memory required for user's app in Heap statebackend then,
   because it depends on when you trigger the compaction and how strictly you
   will stick to the schedule everyday. Any undeterministic behavior may lead
   to users allocating less memory than enough, and eventually causes user's
   apps to be unstable
   - I want to highlight that lots of users actually want the exact TTL
   feature. How users implement TTL with timers today actually implies that
   their logic depends on exact TTL for both shrinking their state size and
   expiring a key at exactly an expected time, I chatted with a few different
   Flink users recently and confirmed it. That's why I want to add exact TTL
   as a potential goal and motivation if possible, along with relaxed TTL and
   avoiding indefinitely growing state. If we don't provide that out of box,
   many users may still use the timer way themselves

To the concern of doubling keys - in Heap state backend, the key is only a
reference so there's only one copy, that's not a problem; in rocksdb state
backend, yes, the state size will bigger. Well, First, I believe this's a
tradeoff for clearer architecture. Frankly, unlike memory, disk space (even
SSD) is relatively cheap and accessible, and we don't normally take it as a
big constraint. Second, w.r.t. to performance, I believe rocksdb timers
will sit in a different column family than others, which may not cause
noticeable perf issue. The rocksdb timer service is on is way, and I want
to see how it's implemented first before asserting if there're truly any
potential perf burden. Finally, there're also improvements we can make
after v1, including relaxed TTL and smaller timer state size. E.g. Flink
can approximate timers within a user configured time range (say within 5
sec) into a single timer. I don't have concretely plan for that yet, but
it's doable.

Stefan is adding rocksdb timer and bringing timer service more closely to
keyed backends, which aligned very well with what we want in this FLIP. I
suggest we wait and keep a close eye on those efforts, and as they mature,
we'll have a much better idea of the whole picture.

Thanks, Bowen



On Sat, May 26, 2018 at 7:52 AM, sihua zhou <su...@163.com> wrote:

>
>
> Hi,
>
>
> thanks for your reply Fabian, about the overhead of storing the key bytes
> twice, I think maybe the situation is even a bit worse, in general, it
> means that the total amount of data to be stored has doubled(for each key,
> we need to store two records, one for timer, one for state). This maybe a
> bit uncomfortable when the state backend is based on RocksDB, because the
> timers are living together with the other states in the same RocksDB
> instance, which means that with using TTL, the amount of the records in
> RocksDB has to be doubled, I'm afraid this may hurt its performance.
>
>
> Concerning the approach to add a timestamp to each value, TBH, I didn't
> have a deep thought on it yet and also not sure about it...In general, it
> can be described as follows:
>
>
> - We attach a TS for every state record.
> - When getting the record, we check the TS to see if its outdated.
> - For the records that we will never touch again, we use the compaction to
> remove them. maybe one day one compaction is enough.
>
>
> Best, Sihua
>
>
> On 05/16/2018 16:38,Fabian Hueske<fh...@gmail.com> wrote:
> Hi,
>
>
> Yes. IMO it makes sense to put the logic into the abstract base classes to
> share the implementation across different state backends and state
> primitives.
>
> The overhead of storing the key twice is a valid concern, but I'm not sure
> about the approach to add a timestamp to each value.
> How would we discard state then? Iterating always over all (or a range of)
> keys to check if their state should be expired?
> That would only work efficiently if we relax the clean-up logic which
> could be a valid design decision.
>
>
>
> Best, Fabian
>
>
>
> 2018-05-14 9:33 GMT+02:00 sihua zhou <su...@163.com>:
>
> Hi Fabian,
> thanks you very much for the reply, just a alternative. Can we implement
> the TTL logical in `AbstractStateBackend` and `AbstractState`? A simplest
> way is to append the `ts` to the state's value? and we use the backend's
> `current time`(its also can be event time and process time) to judge
> whether the data is outdated? The pros is that:
> - state is puly backed by state backend.
> - for each key-value, we only need to store the one copy of key? (if we
> implement TTL base on timer, we need to store two copys of key, one for the
> timer and one for the keyed state)
>
>
> What do you think?
>
>
> Best,
> Sihua
>
>
> On 05/14/2018 15:20,Fabian Hueske<fh...@gmail.com> wrote:
> Hi Sihua,
>
>
> I think it makes sense to couple state TTL to the timer service. We'll
> need some kind of timers to expire state, so I think we should reuse
> components that we have instead of implementing another timer service.
>
> Moreover, using the same timer service and using the public state APIs
> helps to have a consistent TTL behavior across different state backend.
>
>
> Best, Fabian
>
>
>
> 2018-05-14 8:51 GMT+02:00 sihua zhou <su...@163.com>:
>
> Hi Bowen,
> thanks for your doc! I left some comments on the doc, the main concerning
> is that it makes me feel like a coupling that the TTL need to depend on
> `timer`. Because I think the TTL is a property of the state, so it should
> be backed by the state backend. If we implement the TTL base on the timer,
> than it looks like a coupling... it makes me feel that the backend for
> state becomes `state backend` + `timer`. And in fact, IMO, even the `timer`
> should depend on `state backend` in theroy, it's a type of HeapState that
> scoped to the `key group`(not scoped to per key like the current keyed
> state).
>
>
> And I found the doc is for exact TTL, I wonder if we can support a relax
> TTL that could provides a better performance. Because to me, the reason
> that I need TTL is just to prevent the state size growing infinitly(I
> believe I'm not the only one like this), so a relax version is enough, if
> there is a relax TTL which have a better performance, I would prefer that.
>
>
> What do you think?
>
>
> Best,
> Sihua
>
>
>
>
>
>
> On 05/14/2018 14:31,Bowen Li<bo...@gmail.com> wrote:
> Thank you, Fabian! I've created the FLIP-25 page
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 25%3A+Support+User+State+TTL+Natively>
> .
>
> To continue our discussion points:
> 1. I see what you mean now. I totally agree. Since we don't completely know
> it now, shall we experiment or prototype a little bit before deciding this?
> 2. -
> 3. Adding tags to timers is an option.
>
> Another option I came up with recently, is like this: let
> *InternalTimerService
> *maintains user timers and TTL timers separately. Implementation classes of
> InternalTimerService should add two new collections of timers,  e.g.
> *Ttl*ProcessingTimeTimersQueue
> and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
> InternalTimerService#onProcessingTime() and advanceWatermark(), they will
> first iterate through ProcessingTimeTimers and EventTimeTimers (user
> timers) and then through *Ttl*ProcessingTimeTimers and *Ttl*EventTimeTimers
>
> (Ttl timers).
>
> We'll also add the following new internal APIs to register Ttl timers:
>
> ```
> @Internal
> public void registerTtlProcessingTimeTimer(N namespace, long time);
>
> @Internal
> public void registerTtlEventTimeTimer(N namespace, long time);
> ```
>
> The biggest advantage, compared to option 1, is that it doesn't impact
> existing timer-related checkpoint/savepoint, restore and migrations.
>
> What do you think?  And, any other Flink committers want to chime in for
> ideas? I've also documented the above two discussion points to the FLIP
> page.
>
> Thanks,
> Bowen
>
>
> On Wed, May 2, 2018 at 5:36 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>
> Hi Bowen,
>
> 1. The motivation to keep the TTL logic outside of the state backend was
> mainly to avoid state backend custom implementations. If we have a generic
> approach that would work for all state backends, we could try to put the
> logic into a base class like AbstractStateBackend. After all, state cleanup
> is tightly related to the responsibilities of state backends.
> 2. -
> 3. You're right. We should first call the user code before cleaning up.
> The main problem that I see right now is that we have to distinguish
> between user and TTL timers. AFAIK, the timer service does not support
> timer tags (or another method) to distinguish timers.
>
> I've given you the permissions to create and edit wiki pages.
>
> Best, Fabian
>
> 2018-04-30 7:47 GMT+02:00 Bowen Li <bo...@gmail.com>:
>
>
> Thanks Fabian! Here're my comments inline, and let me know your thoughts.
>
> 1. Where should the TTL code reside? In the state backend or in the
> operator?
>
> I believe TTL code should not reside in state backend, because a critical
> design is that TTL is independent of and transparent to state backends.
>
> According to my current knowledge, I think it probably should live with
> operators in flink-streaming-java.
>
>
> 2. How to get notified about state accesses? I guess this depends on 1.
>
> You previously suggested using callbacks. I believe that's the right way
> to do decoupling.
>
>
> 3. How to avoid conflicts of TTL timers and user timers?
>
> User timers might always be invoked first? This is not urgent, shall we
> bake it for more time and discuss it along the way?
>
>
>
> Besides, I don't have access to create a FLIP page under
> https://cwiki.apache.org/confluence/display/FLINK/Flin
> k+Improvement+Proposals. Can you grant me the proper access?
>
> Thanks,
>
> Bowen
>
>
>
>
> On Tue, Apr 24, 2018 at 2:40 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>
> Hi Bowen,
>
> Thanks for updating the proposal. This looks pretty good (as I said
> before).
> There are a few areas, that are not yet fully fleshed out:
>
> 1. Where should the TTL code reside? In the state backend or in the
> operator?
> 2. How to get notified about state accesses? I guess this depends on 1.
> 3. How to avoid conflicts of TTL timers and user timers?
>
> @Stefan (in CC) might have some ideas on these issues as well.
>
> Cheers, Fabian
>
> 2018-04-22 21:14 GMT+02:00 Bowen <bo...@gmail.com>:
>
>
> Hello community,
>
> We've come up with a completely new design for Flink state TTL, documented
> here
>
> <https://docs.google.com/document/d/1PPwHMDHGWMOO8YGv08BNi8Fqe_
> h7SjeALRzmW-ZxSfY/edit?usp=sharing>,
>
> and have run it by a few Flink PMC/committers.
>
> What do you think? We'd love to hear feedbacks from you
>
> Thanks,
> Bowen
>
>
> On Wed, Feb 7, 2018 at 12:53 PM, Fabian Hueske <fh...@gmail.com>
> wrote:
>
> Hi Bowen,
>
> Thanks for the proposal! I think state TTL would be a great feature!
> Actually, we have implemented this for SQL / Table API [1].
> I've added a couple of comments to the design doc.
>
> In principle, I'm not sure if this functionality should be added to the
> state backends.
> We could also use the existing timer service which would have a few
> nice
> benefits (see my comments in the docs).
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> dev/table/streaming.html#idle-state-retention-time
>
> 2018-02-06 8:26 GMT+01:00 Bowen Li <bo...@gmail.com>:
>
> Hi guys,
>
> I want to propose a new FLIP -- FLIP-25 - Support User State TTL
> Natively
> in Flink. This has been one of most handy and most frequently asked
> features in Flink community. The jira ticket is FLINK-3089
> <https://issues.apache.org/jira/browse/FLINK-3089>.
>
> I've written a rough design
> <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
> uureyEr_nPAvSo/edit#>
> doc
> <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
> uureyEr_nPAvSo/edit#>,
> and developed prototypes for both heap and rocksdb state backends.
>
> My question is: shall we create a FLIP page for this? Can I be
> granted the
> privileges of creating pages in
> https://cwiki.apache.org/confluence/display/FLINK/
> Flink+Improvement+Proposals
> ?
>
> Thanks,
> Bowen
>
>
>
>
>
>
>
>
>
>
>
>
>

Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

Posted by sihua zhou <su...@163.com>.

Hi,


thanks for your reply Fabian, about the overhead of storing the key bytes twice, I think maybe the situation is even a bit worse, in general, it means that the total amount of data to be stored has doubled(for each key, we need to store two records, one for timer, one for state). This maybe a bit uncomfortable when the state backend is based on RocksDB, because the timers are living together with the other states in the same RocksDB instance, which means that with using TTL, the amount of the records in RocksDB has to be doubled, I'm afraid this may hurt its performance.


Concerning the approach to add a timestamp to each value, TBH, I didn't have a deep thought on it yet and also not sure about it...In general, it can be described as follows:


- We attach a TS for every state record.
- When getting the record, we check the TS to see if its outdated.
- For the records that we will never touch again, we use the compaction to remove them. maybe one day one compaction is enough.


Best, Sihua


On 05/16/2018 16:38,Fabian Hueske<fh...@gmail.com> wrote:
Hi,


Yes. IMO it makes sense to put the logic into the abstract base classes to share the implementation across different state backends and state primitives.

The overhead of storing the key twice is a valid concern, but I'm not sure about the approach to add a timestamp to each value.
How would we discard state then? Iterating always over all (or a range of) keys to check if their state should be expired?
That would only work efficiently if we relax the clean-up logic which could be a valid design decision.



Best, Fabian



2018-05-14 9:33 GMT+02:00 sihua zhou <su...@163.com>:

Hi Fabian,
thanks you very much for the reply, just a alternative. Can we implement the TTL logical in `AbstractStateBackend` and `AbstractState`? A simplest way is to append the `ts` to the state's value? and we use the backend's `current time`(its also can be event time and process time) to judge whether the data is outdated? The pros is that:
- state is puly backed by state backend.
- for each key-value, we only need to store the one copy of key? (if we implement TTL base on timer, we need to store two copys of key, one for the timer and one for the keyed state)


What do you think?


Best,
Sihua


On 05/14/2018 15:20,Fabian Hueske<fh...@gmail.com> wrote:
Hi Sihua,


I think it makes sense to couple state TTL to the timer service. We'll need some kind of timers to expire state, so I think we should reuse components that we have instead of implementing another timer service.

Moreover, using the same timer service and using the public state APIs helps to have a consistent TTL behavior across different state backend.


Best, Fabian



2018-05-14 8:51 GMT+02:00 sihua zhou <su...@163.com>:

Hi Bowen,
thanks for your doc! I left some comments on the doc, the main concerning is that it makes me feel like a coupling that the TTL need to depend on `timer`. Because I think the TTL is a property of the state, so it should be backed by the state backend. If we implement the TTL base on the timer, than it looks like a coupling... it makes me feel that the backend for state becomes `state backend` + `timer`. And in fact, IMO, even the `timer` should depend on `state backend` in theroy, it's a type of HeapState that scoped to the `key group`(not scoped to per key like the current keyed state).


And I found the doc is for exact TTL, I wonder if we can support a relax TTL that could provides a better performance. Because to me, the reason that I need TTL is just to prevent the state size growing infinitly(I believe I'm not the only one like this), so a relax version is enough, if there is a relax TTL which have a better performance, I would prefer that.


What do you think?


Best,
Sihua






On 05/14/2018 14:31,Bowen Li<bo...@gmail.com> wrote:
Thank you, Fabian! I've created the FLIP-25 page
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively>
.

To continue our discussion points:
1. I see what you mean now. I totally agree. Since we don't completely know
it now, shall we experiment or prototype a little bit before deciding this?
2. -
3. Adding tags to timers is an option.

Another option I came up with recently, is like this: let *InternalTimerService
*maintains user timers and TTL timers separately. Implementation classes of
InternalTimerService should add two new collections of timers,  e.g.
*Ttl*ProcessingTimeTimersQueue
and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
InternalTimerService#onProcessingTime() and advanceWatermark(), they will
first iterate through ProcessingTimeTimers and EventTimeTimers (user
timers) and then through *Ttl*ProcessingTimeTimers and *Ttl*EventTimeTimers

(Ttl timers).

We'll also add the following new internal APIs to register Ttl timers:

```
@Internal
public void registerTtlProcessingTimeTimer(N namespace, long time);

@Internal
public void registerTtlEventTimeTimer(N namespace, long time);
```

The biggest advantage, compared to option 1, is that it doesn't impact
existing timer-related checkpoint/savepoint, restore and migrations.

What do you think?  And, any other Flink committers want to chime in for
ideas? I've also documented the above two discussion points to the FLIP
page.

Thanks,
Bowen


On Wed, May 2, 2018 at 5:36 AM, Fabian Hueske <fh...@gmail.com> wrote:


Hi Bowen,

1. The motivation to keep the TTL logic outside of the state backend was
mainly to avoid state backend custom implementations. If we have a generic
approach that would work for all state backends, we could try to put the
logic into a base class like AbstractStateBackend. After all, state cleanup
is tightly related to the responsibilities of state backends.
2. -
3. You're right. We should first call the user code before cleaning up.
The main problem that I see right now is that we have to distinguish
between user and TTL timers. AFAIK, the timer service does not support
timer tags (or another method) to distinguish timers.

I've given you the permissions to create and edit wiki pages.

Best, Fabian

2018-04-30 7:47 GMT+02:00 Bowen Li <bo...@gmail.com>:


Thanks Fabian! Here're my comments inline, and let me know your thoughts.

1. Where should the TTL code reside? In the state backend or in the
operator?

I believe TTL code should not reside in state backend, because a critical
design is that TTL is independent of and transparent to state backends.

According to my current knowledge, I think it probably should live with
operators in flink-streaming-java.


2. How to get notified about state accesses? I guess this depends on 1.

You previously suggested using callbacks. I believe that's the right way
to do decoupling.


3. How to avoid conflicts of TTL timers and user timers?

User timers might always be invoked first? This is not urgent, shall we
bake it for more time and discuss it along the way?



Besides, I don't have access to create a FLIP page under
https://cwiki.apache.org/confluence/display/FLINK/Flin
k+Improvement+Proposals. Can you grant me the proper access?

Thanks,

Bowen




On Tue, Apr 24, 2018 at 2:40 AM, Fabian Hueske <fh...@gmail.com> wrote:


Hi Bowen,

Thanks for updating the proposal. This looks pretty good (as I said
before).
There are a few areas, that are not yet fully fleshed out:

1. Where should the TTL code reside? In the state backend or in the
operator?
2. How to get notified about state accesses? I guess this depends on 1.
3. How to avoid conflicts of TTL timers and user timers?

@Stefan (in CC) might have some ideas on these issues as well.

Cheers, Fabian

2018-04-22 21:14 GMT+02:00 Bowen <bo...@gmail.com>:


Hello community,

We've come up with a completely new design for Flink state TTL, documented
here

<https://docs.google.com/document/d/1PPwHMDHGWMOO8YGv08BNi8Fqe_h7SjeALRzmW-ZxSfY/edit?usp=sharing>,

and have run it by a few Flink PMC/committers.

What do you think? We'd love to hear feedbacks from you

Thanks,
Bowen


On Wed, Feb 7, 2018 at 12:53 PM, Fabian Hueske <fh...@gmail.com>
wrote:

Hi Bowen,

Thanks for the proposal! I think state TTL would be a great feature!
Actually, we have implemented this for SQL / Table API [1].
I've added a couple of comments to the design doc.

In principle, I'm not sure if this functionality should be added to the
state backends.
We could also use the existing timer service which would have a few
nice
benefits (see my comments in the docs).

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/
dev/table/streaming.html#idle-state-retention-time

2018-02-06 8:26 GMT+01:00 Bowen Li <bo...@gmail.com>:

Hi guys,

I want to propose a new FLIP -- FLIP-25 - Support User State TTL
Natively
in Flink. This has been one of most handy and most frequently asked
features in Flink community. The jira ticket is FLINK-3089
<https://issues.apache.org/jira/browse/FLINK-3089>.

I've written a rough design
<https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
uureyEr_nPAvSo/edit#>
doc
<https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
uureyEr_nPAvSo/edit#>,
and developed prototypes for both heap and rocksdb state backends.

My question is: shall we create a FLIP page for this? Can I be
granted the
privileges of creating pages in
https://cwiki.apache.org/confluence/display/FLINK/
Flink+Improvement+Proposals
?

Thanks,
Bowen













Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Yes. IMO it makes sense to put the logic into the abstract base classes to
share the implementation across different state backends and state
primitives.
The overhead of storing the key twice is a valid concern, but I'm not sure
about the approach to add a timestamp to each value.
How would we discard state then? Iterating always over all (or a range of)
keys to check if their state should be expired?
That would only work efficiently if we relax the clean-up logic which could
be a valid design decision.

Best, Fabian

2018-05-14 9:33 GMT+02:00 sihua zhou <su...@163.com>:

> Hi Fabian,
> thanks you very much for the reply, just a alternative. Can we implement
> the TTL logical in `AbstractStateBackend` and `AbstractState`? A simplest
> way is to append the `ts` to the state's value? and we use the backend's
> `current time`(its also can be event time and process time) to judge
> whether the data is outdated? The pros is that:
> - state is puly backed by state backend.
> - for each key-value, we only need to store the one copy of key? (if we
> implement TTL base on timer, we need to store two copys of key, one for the
> timer and one for the keyed state)
>
> What do you think?
>
> Best,
> Sihua
>
> On 05/14/2018 15:20,Fabian Hueske<fh...@gmail.com> <fh...@gmail.com>
> wrote:
>
> Hi Sihua,
>
> I think it makes sense to couple state TTL to the timer service. We'll
> need some kind of timers to expire state, so I think we should reuse
> components that we have instead of implementing another timer service.
> Moreover, using the same timer service and using the public state APIs
> helps to have a consistent TTL behavior across different state backend.
>
> Best, Fabian
>
> 2018-05-14 8:51 GMT+02:00 sihua zhou <su...@163.com>:
>
>> Hi Bowen,
>> thanks for your doc! I left some comments on the doc, the main concerning
>> is that it makes me feel like a coupling that the TTL need to depend on
>> `timer`. Because I think the TTL is a property of the state, so it should
>> be backed by the state backend. If we implement the TTL base on the timer,
>> than it looks like a coupling... it makes me feel that the backend for
>> state becomes `state backend` + `timer`. And in fact, IMO, even the `timer`
>> should depend on `state backend` in theroy, it's a type of HeapState that
>> scoped to the `key group`(not scoped to per key like the current keyed
>> state).
>>
>> And I found the doc is for exact TTL, I wonder if we can support a relax
>> TTL that could provides a better performance. Because to me, the reason
>> that I need TTL is just to prevent the state size growing infinitly(I
>> believe I'm not the only one like this), so a relax version is enough, if
>> there is a relax TTL which have a better performance, I would prefer that.
>>
>> What do you think?
>>
>> Best,
>> Sihua
>>
>>
>>
>> On 05/14/2018 14:31,Bowen Li<bo...@gmail.com> <bo...@gmail.com>
>> wrote:
>>
>> Thank you, Fabian! I've created the FLIP-25 page
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%
>> 3A+Support+User+State+TTL+Natively>
>> .
>>
>> To continue our discussion points:
>> 1. I see what you mean now. I totally agree. Since we don't completely
>> know
>> it now, shall we experiment or prototype a little bit before deciding
>> this?
>> 2. -
>> 3. Adding tags to timers is an option.
>>
>> Another option I came up with recently, is like this: let
>> *InternalTimerService
>> *maintains user timers and TTL timers separately. Implementation classes
>> of
>> InternalTimerService should add two new collections of timers,  e.g.
>> *Ttl*ProcessingTimeTimersQueue
>> and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
>> InternalTimerService#onProcessingTime() and advanceWatermark(), they will
>> first iterate through ProcessingTimeTimers and EventTimeTimers (user
>> timers) and then through *Ttl*ProcessingTimeTimers and
>> *Ttl*EventTimeTimers
>>
>> (Ttl timers).
>>
>> We'll also add the following new internal APIs to register Ttl timers:
>>
>> ```
>> @Internal
>> public void registerTtlProcessingTimeTimer(N namespace, long time);
>>
>> @Internal
>> public void registerTtlEventTimeTimer(N namespace, long time);
>> ```
>>
>> The biggest advantage, compared to option 1, is that it doesn't impact
>> existing timer-related checkpoint/savepoint, restore and migrations.
>>
>> What do you think?  And, any other Flink committers want to chime in for
>> ideas? I've also documented the above two discussion points to the FLIP
>> page.
>>
>> Thanks,
>> Bowen
>>
>>
>> On Wed, May 2, 2018 at 5:36 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>> Hi Bowen,
>>
>> 1. The motivation to keep the TTL logic outside of the state backend was
>> mainly to avoid state backend custom implementations. If we have a generic
>> approach that would work for all state backends, we could try to put the
>> logic into a base class like AbstractStateBackend. After all, state
>> cleanup
>> is tightly related to the responsibilities of state backends.
>> 2. -
>> 3. You're right. We should first call the user code before cleaning up.
>> The main problem that I see right now is that we have to distinguish
>> between user and TTL timers. AFAIK, the timer service does not support
>> timer tags (or another method) to distinguish timers.
>>
>> I've given you the permissions to create and edit wiki pages.
>>
>> Best, Fabian
>>
>> 2018-04-30 7:47 GMT+02:00 Bowen Li <bo...@gmail.com>:
>>
>> Thanks Fabian! Here're my comments inline, and let me know your thoughts.
>>
>> 1. Where should the TTL code reside? In the state backend or in the
>> operator?
>>
>> I believe TTL code should not reside in state backend, because a critical
>> design is that TTL is independent of and transparent to state backends.
>>
>> According to my current knowledge, I think it probably should live with
>> operators in flink-streaming-java.
>>
>>
>> 2. How to get notified about state accesses? I guess this depends on 1.
>>
>> You previously suggested using callbacks. I believe that's the right way
>> to do decoupling.
>>
>>
>> 3. How to avoid conflicts of TTL timers and user timers?
>>
>> User timers might always be invoked first? This is not urgent, shall we
>> bake it for more time and discuss it along the way?
>>
>>
>>
>> Besides, I don't have access to create a FLIP page under
>> https://cwiki.apache.org/confluence/display/FLINK/Flin
>> k+Improvement+Proposals. Can you grant me the proper access?
>>
>> Thanks,
>>
>> Bowen
>>
>>
>>
>>
>> On Tue, Apr 24, 2018 at 2:40 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>> Hi Bowen,
>>
>> Thanks for updating the proposal. This looks pretty good (as I said
>> before).
>> There are a few areas, that are not yet fully fleshed out:
>>
>> 1. Where should the TTL code reside? In the state backend or in the
>> operator?
>> 2. How to get notified about state accesses? I guess this depends on 1.
>> 3. How to avoid conflicts of TTL timers and user timers?
>>
>> @Stefan (in CC) might have some ideas on these issues as well.
>>
>> Cheers, Fabian
>>
>> 2018-04-22 21:14 GMT+02:00 Bowen <bo...@gmail.com>:
>>
>> Hello community,
>>
>> We've come up with a completely new design for Flink state TTL, documented
>> here
>> <https://docs.google.com/document/d/1PPwHMDHGWMOO8YGv08BNi8F
>> qe_h7SjeALRzmW-ZxSfY/edit?usp=sharing>,
>>
>> and have run it by a few Flink PMC/committers.
>>
>> What do you think? We'd love to hear feedbacks from you
>>
>> Thanks,
>> Bowen
>>
>>
>> On Wed, Feb 7, 2018 at 12:53 PM, Fabian Hueske <fh...@gmail.com>
>> wrote:
>>
>> Hi Bowen,
>>
>> Thanks for the proposal! I think state TTL would be a great feature!
>> Actually, we have implemented this for SQL / Table API [1].
>> I've added a couple of comments to the design doc.
>>
>> In principle, I'm not sure if this functionality should be added to the
>> state backends.
>> We could also use the existing timer service which would have a few
>> nice
>> benefits (see my comments in the docs).
>>
>> Best, Fabian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/table/streaming.html#idle-state-retention-time
>>
>> 2018-02-06 8:26 GMT+01:00 Bowen Li <bo...@gmail.com>:
>>
>> Hi guys,
>>
>> I want to propose a new FLIP -- FLIP-25 - Support User State TTL
>>
>> Natively
>>
>> in Flink. This has been one of most handy and most frequently asked
>> features in Flink community. The jira ticket is FLINK-3089
>> <https://issues.apache.org/jira/browse/FLINK-3089>.
>>
>> I've written a rough design
>> <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
>> uureyEr_nPAvSo/edit#>
>> doc
>> <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
>> uureyEr_nPAvSo/edit#>,
>> and developed prototypes for both heap and rocksdb state backends.
>>
>> My question is: shall we create a FLIP page for this? Can I be
>>
>> granted the
>>
>> privileges of creating pages in
>> https://cwiki.apache.org/confluence/display/FLINK/
>> Flink+Improvement+Proposals
>> ?
>>
>> Thanks,
>> Bowen
>>
>>
>>
>>
>>
>>
>>
>>
>

Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

Posted by sihua zhou <su...@163.com>.
Hi Fabian,
thanks you very much for the reply, just a alternative. Can we implement the TTL logical in `AbstractStateBackend` and `AbstractState`? A simplest way is to append the `ts` to the state's value? and we use the backend's `current time`(its also can be event time and process time) to judge whether the data is outdated? The pros is that:
- state is puly backed by state backend.
- for each key-value, we only need to store the one copy of key? (if we implement TTL base on timer, we need to store two copys of key, one for the timer and one for the keyed state)


What do you think?


Best,
Sihua


On 05/14/2018 15:20,Fabian Hueske<fh...@gmail.com> wrote:
Hi Sihua,


I think it makes sense to couple state TTL to the timer service. We'll need some kind of timers to expire state, so I think we should reuse components that we have instead of implementing another timer service.

Moreover, using the same timer service and using the public state APIs helps to have a consistent TTL behavior across different state backend.


Best, Fabian



2018-05-14 8:51 GMT+02:00 sihua zhou <su...@163.com>:

Hi Bowen,
thanks for your doc! I left some comments on the doc, the main concerning is that it makes me feel like a coupling that the TTL need to depend on `timer`. Because I think the TTL is a property of the state, so it should be backed by the state backend. If we implement the TTL base on the timer, than it looks like a coupling... it makes me feel that the backend for state becomes `state backend` + `timer`. And in fact, IMO, even the `timer` should depend on `state backend` in theroy, it's a type of HeapState that scoped to the `key group`(not scoped to per key like the current keyed state).


And I found the doc is for exact TTL, I wonder if we can support a relax TTL that could provides a better performance. Because to me, the reason that I need TTL is just to prevent the state size growing infinitly(I believe I'm not the only one like this), so a relax version is enough, if there is a relax TTL which have a better performance, I would prefer that.


What do you think?


Best,
Sihua






On 05/14/2018 14:31,Bowen Li<bo...@gmail.com> wrote:
Thank you, Fabian! I've created the FLIP-25 page
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively>
.

To continue our discussion points:
1. I see what you mean now. I totally agree. Since we don't completely know
it now, shall we experiment or prototype a little bit before deciding this?
2. -
3. Adding tags to timers is an option.

Another option I came up with recently, is like this: let *InternalTimerService
*maintains user timers and TTL timers separately. Implementation classes of
InternalTimerService should add two new collections of timers,  e.g.
*Ttl*ProcessingTimeTimersQueue
and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
InternalTimerService#onProcessingTime() and advanceWatermark(), they will
first iterate through ProcessingTimeTimers and EventTimeTimers (user
timers) and then through *Ttl*ProcessingTimeTimers and *Ttl*EventTimeTimers

(Ttl timers).

We'll also add the following new internal APIs to register Ttl timers:

```
@Internal
public void registerTtlProcessingTimeTimer(N namespace, long time);

@Internal
public void registerTtlEventTimeTimer(N namespace, long time);
```

The biggest advantage, compared to option 1, is that it doesn't impact
existing timer-related checkpoint/savepoint, restore and migrations.

What do you think?  And, any other Flink committers want to chime in for
ideas? I've also documented the above two discussion points to the FLIP
page.

Thanks,
Bowen


On Wed, May 2, 2018 at 5:36 AM, Fabian Hueske <fh...@gmail.com> wrote:


Hi Bowen,

1. The motivation to keep the TTL logic outside of the state backend was
mainly to avoid state backend custom implementations. If we have a generic
approach that would work for all state backends, we could try to put the
logic into a base class like AbstractStateBackend. After all, state cleanup
is tightly related to the responsibilities of state backends.
2. -
3. You're right. We should first call the user code before cleaning up.
The main problem that I see right now is that we have to distinguish
between user and TTL timers. AFAIK, the timer service does not support
timer tags (or another method) to distinguish timers.

I've given you the permissions to create and edit wiki pages.

Best, Fabian

2018-04-30 7:47 GMT+02:00 Bowen Li <bo...@gmail.com>:


Thanks Fabian! Here're my comments inline, and let me know your thoughts.

1. Where should the TTL code reside? In the state backend or in the
operator?

I believe TTL code should not reside in state backend, because a critical
design is that TTL is independent of and transparent to state backends.

According to my current knowledge, I think it probably should live with
operators in flink-streaming-java.


2. How to get notified about state accesses? I guess this depends on 1.

You previously suggested using callbacks. I believe that's the right way
to do decoupling.


3. How to avoid conflicts of TTL timers and user timers?

User timers might always be invoked first? This is not urgent, shall we
bake it for more time and discuss it along the way?



Besides, I don't have access to create a FLIP page under
https://cwiki.apache.org/confluence/display/FLINK/Flin
k+Improvement+Proposals. Can you grant me the proper access?

Thanks,

Bowen




On Tue, Apr 24, 2018 at 2:40 AM, Fabian Hueske <fh...@gmail.com> wrote:


Hi Bowen,

Thanks for updating the proposal. This looks pretty good (as I said
before).
There are a few areas, that are not yet fully fleshed out:

1. Where should the TTL code reside? In the state backend or in the
operator?
2. How to get notified about state accesses? I guess this depends on 1.
3. How to avoid conflicts of TTL timers and user timers?

@Stefan (in CC) might have some ideas on these issues as well.

Cheers, Fabian

2018-04-22 21:14 GMT+02:00 Bowen <bo...@gmail.com>:


Hello community,

We've come up with a completely new design for Flink state TTL, documented
here

<https://docs.google.com/document/d/1PPwHMDHGWMOO8YGv08BNi8Fqe_h7SjeALRzmW-ZxSfY/edit?usp=sharing>,

and have run it by a few Flink PMC/committers.

What do you think? We'd love to hear feedbacks from you

Thanks,
Bowen


On Wed, Feb 7, 2018 at 12:53 PM, Fabian Hueske <fh...@gmail.com>
wrote:

Hi Bowen,

Thanks for the proposal! I think state TTL would be a great feature!
Actually, we have implemented this for SQL / Table API [1].
I've added a couple of comments to the design doc.

In principle, I'm not sure if this functionality should be added to the
state backends.
We could also use the existing timer service which would have a few
nice
benefits (see my comments in the docs).

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/
dev/table/streaming.html#idle-state-retention-time

2018-02-06 8:26 GMT+01:00 Bowen Li <bo...@gmail.com>:

Hi guys,

I want to propose a new FLIP -- FLIP-25 - Support User State TTL
Natively
in Flink. This has been one of most handy and most frequently asked
features in Flink community. The jira ticket is FLINK-3089
<https://issues.apache.org/jira/browse/FLINK-3089>.

I've written a rough design
<https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
uureyEr_nPAvSo/edit#>
doc
<https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
uureyEr_nPAvSo/edit#>,
and developed prototypes for both heap and rocksdb state backends.

My question is: shall we create a FLIP page for this? Can I be
granted the
privileges of creating pages in
https://cwiki.apache.org/confluence/display/FLINK/
Flink+Improvement+Proposals
?

Thanks,
Bowen











Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Sihua,

I think it makes sense to couple state TTL to the timer service. We'll need
some kind of timers to expire state, so I think we should reuse components
that we have instead of implementing another timer service.
Moreover, using the same timer service and using the public state APIs
helps to have a consistent TTL behavior across different state backend.

Best, Fabian

2018-05-14 8:51 GMT+02:00 sihua zhou <su...@163.com>:

> Hi Bowen,
> thanks for your doc! I left some comments on the doc, the main concerning
> is that it makes me feel like a coupling that the TTL need to depend on
> `timer`. Because I think the TTL is a property of the state, so it should
> be backed by the state backend. If we implement the TTL base on the timer,
> than it looks like a coupling... it makes me feel that the backend for
> state becomes `state backend` + `timer`. And in fact, IMO, even the `timer`
> should depend on `state backend` in theroy, it's a type of HeapState that
> scoped to the `key group`(not scoped to per key like the current keyed
> state).
>
> And I found the doc is for exact TTL, I wonder if we can support a relax
> TTL that could provides a better performance. Because to me, the reason
> that I need TTL is just to prevent the state size growing infinitly(I
> believe I'm not the only one like this), so a relax version is enough, if
> there is a relax TTL which have a better performance, I would prefer that.
>
> What do you think?
>
> Best,
> Sihua
>
>
>
> On 05/14/2018 14:31,Bowen Li<bo...@gmail.com> <bo...@gmail.com>
> wrote:
>
> Thank you, Fabian! I've created the FLIP-25 page
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 25%3A+Support+User+State+TTL+Natively>
> .
>
> To continue our discussion points:
> 1. I see what you mean now. I totally agree. Since we don't completely know
> it now, shall we experiment or prototype a little bit before deciding this?
> 2. -
> 3. Adding tags to timers is an option.
>
> Another option I came up with recently, is like this: let
> *InternalTimerService
> *maintains user timers and TTL timers separately. Implementation classes of
> InternalTimerService should add two new collections of timers,  e.g.
> *Ttl*ProcessingTimeTimersQueue
> and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
> InternalTimerService#onProcessingTime() and advanceWatermark(), they will
> first iterate through ProcessingTimeTimers and EventTimeTimers (user
> timers) and then through *Ttl*ProcessingTimeTimers and *Ttl*EventTimeTimers
>
> (Ttl timers).
>
> We'll also add the following new internal APIs to register Ttl timers:
>
> ```
> @Internal
> public void registerTtlProcessingTimeTimer(N namespace, long time);
>
> @Internal
> public void registerTtlEventTimeTimer(N namespace, long time);
> ```
>
> The biggest advantage, compared to option 1, is that it doesn't impact
> existing timer-related checkpoint/savepoint, restore and migrations.
>
> What do you think?  And, any other Flink committers want to chime in for
> ideas? I've also documented the above two discussion points to the FLIP
> page.
>
> Thanks,
> Bowen
>
>
> On Wed, May 2, 2018 at 5:36 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi Bowen,
>
> 1. The motivation to keep the TTL logic outside of the state backend was
> mainly to avoid state backend custom implementations. If we have a generic
> approach that would work for all state backends, we could try to put the
> logic into a base class like AbstractStateBackend. After all, state cleanup
> is tightly related to the responsibilities of state backends.
> 2. -
> 3. You're right. We should first call the user code before cleaning up.
> The main problem that I see right now is that we have to distinguish
> between user and TTL timers. AFAIK, the timer service does not support
> timer tags (or another method) to distinguish timers.
>
> I've given you the permissions to create and edit wiki pages.
>
> Best, Fabian
>
> 2018-04-30 7:47 GMT+02:00 Bowen Li <bo...@gmail.com>:
>
> Thanks Fabian! Here're my comments inline, and let me know your thoughts.
>
> 1. Where should the TTL code reside? In the state backend or in the
> operator?
>
> I believe TTL code should not reside in state backend, because a critical
> design is that TTL is independent of and transparent to state backends.
>
> According to my current knowledge, I think it probably should live with
> operators in flink-streaming-java.
>
>
> 2. How to get notified about state accesses? I guess this depends on 1.
>
> You previously suggested using callbacks. I believe that's the right way
> to do decoupling.
>
>
> 3. How to avoid conflicts of TTL timers and user timers?
>
> User timers might always be invoked first? This is not urgent, shall we
> bake it for more time and discuss it along the way?
>
>
>
> Besides, I don't have access to create a FLIP page under
> https://cwiki.apache.org/confluence/display/FLINK/Flin
> k+Improvement+Proposals. Can you grant me the proper access?
>
> Thanks,
>
> Bowen
>
>
>
>
> On Tue, Apr 24, 2018 at 2:40 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi Bowen,
>
> Thanks for updating the proposal. This looks pretty good (as I said
> before).
> There are a few areas, that are not yet fully fleshed out:
>
> 1. Where should the TTL code reside? In the state backend or in the
> operator?
> 2. How to get notified about state accesses? I guess this depends on 1.
> 3. How to avoid conflicts of TTL timers and user timers?
>
> @Stefan (in CC) might have some ideas on these issues as well.
>
> Cheers, Fabian
>
> 2018-04-22 21:14 GMT+02:00 Bowen <bo...@gmail.com>:
>
> Hello community,
>
> We've come up with a completely new design for Flink state TTL, documented
> here
> <https://docs.google.com/document/d/1PPwHMDHGWMOO8YGv08BNi8Fqe_
> h7SjeALRzmW-ZxSfY/edit?usp=sharing>,
>
> and have run it by a few Flink PMC/committers.
>
> What do you think? We'd love to hear feedbacks from you
>
> Thanks,
> Bowen
>
>
> On Wed, Feb 7, 2018 at 12:53 PM, Fabian Hueske <fh...@gmail.com>
> wrote:
>
> Hi Bowen,
>
> Thanks for the proposal! I think state TTL would be a great feature!
> Actually, we have implemented this for SQL / Table API [1].
> I've added a couple of comments to the design doc.
>
> In principle, I'm not sure if this functionality should be added to the
> state backends.
> We could also use the existing timer service which would have a few
> nice
> benefits (see my comments in the docs).
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> dev/table/streaming.html#idle-state-retention-time
>
> 2018-02-06 8:26 GMT+01:00 Bowen Li <bo...@gmail.com>:
>
> Hi guys,
>
> I want to propose a new FLIP -- FLIP-25 - Support User State TTL
>
> Natively
>
> in Flink. This has been one of most handy and most frequently asked
> features in Flink community. The jira ticket is FLINK-3089
> <https://issues.apache.org/jira/browse/FLINK-3089>.
>
> I've written a rough design
> <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
> uureyEr_nPAvSo/edit#>
> doc
> <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
> uureyEr_nPAvSo/edit#>,
> and developed prototypes for both heap and rocksdb state backends.
>
> My question is: shall we create a FLIP page for this? Can I be
>
> granted the
>
> privileges of creating pages in
> https://cwiki.apache.org/confluence/display/FLINK/
> Flink+Improvement+Proposals
> ?
>
> Thanks,
> Bowen
>
>
>
>
>
>
>
>

Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

Posted by sihua zhou <su...@163.com>.
Hi Bowen,
thanks for your doc! I left some comments on the doc, the main concerning is that it makes me feel like a coupling that the TTL need to depend on `timer`. Because I think the TTL is a property of the state, so it should be backed by the state backend. If we implement the TTL base on the timer, than it looks like a coupling... it makes me feel that the backend for state becomes `state backend` + `timer`. And in fact, IMO, even the `timer` should depend on `state backend` in theroy, it's a type of HeapState that scoped to the `key group`(not scoped to per key like the current keyed state).


And I found the doc is for exact TTL, I wonder if we can support a relax TTL that could provides a better performance. Because to me, the reason that I need TTL is just to prevent the state size growing infinitly(I believe I'm not the only one like this), so a relax version is enough, if there is a relax TTL which have a better performance, I would prefer that.


What do you think?


Best,
Sihua






On 05/14/2018 14:31,Bowen Li<bo...@gmail.com> wrote:
Thank you, Fabian! I've created the FLIP-25 page
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively>
.

To continue our discussion points:
1. I see what you mean now. I totally agree. Since we don't completely know
it now, shall we experiment or prototype a little bit before deciding this?
2. -
3. Adding tags to timers is an option.

Another option I came up with recently, is like this: let *InternalTimerService
*maintains user timers and TTL timers separately. Implementation classes of
InternalTimerService should add two new collections of timers,  e.g.
*Ttl*ProcessingTimeTimersQueue
and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
InternalTimerService#onProcessingTime() and advanceWatermark(), they will
first iterate through ProcessingTimeTimers and EventTimeTimers (user
timers) and then through *Ttl*ProcessingTimeTimers and *Ttl*EventTimeTimers
(Ttl timers).

We'll also add the following new internal APIs to register Ttl timers:

```
@Internal
public void registerTtlProcessingTimeTimer(N namespace, long time);

@Internal
public void registerTtlEventTimeTimer(N namespace, long time);
```

The biggest advantage, compared to option 1, is that it doesn't impact
existing timer-related checkpoint/savepoint, restore and migrations.

What do you think?  And, any other Flink committers want to chime in for
ideas? I've also documented the above two discussion points to the FLIP
page.

Thanks,
Bowen


On Wed, May 2, 2018 at 5:36 AM, Fabian Hueske <fh...@gmail.com> wrote:

Hi Bowen,

1. The motivation to keep the TTL logic outside of the state backend was
mainly to avoid state backend custom implementations. If we have a generic
approach that would work for all state backends, we could try to put the
logic into a base class like AbstractStateBackend. After all, state cleanup
is tightly related to the responsibilities of state backends.
2. -
3. You're right. We should first call the user code before cleaning up.
The main problem that I see right now is that we have to distinguish
between user and TTL timers. AFAIK, the timer service does not support
timer tags (or another method) to distinguish timers.

I've given you the permissions to create and edit wiki pages.

Best, Fabian

2018-04-30 7:47 GMT+02:00 Bowen Li <bo...@gmail.com>:

Thanks Fabian! Here're my comments inline, and let me know your thoughts.

1. Where should the TTL code reside? In the state backend or in the
operator?

I believe TTL code should not reside in state backend, because a critical
design is that TTL is independent of and transparent to state backends.

According to my current knowledge, I think it probably should live with
operators in flink-streaming-java.


2. How to get notified about state accesses? I guess this depends on 1.

You previously suggested using callbacks. I believe that's the right way
to do decoupling.


3. How to avoid conflicts of TTL timers and user timers?

User timers might always be invoked first? This is not urgent, shall we
bake it for more time and discuss it along the way?



Besides, I don't have access to create a FLIP page under
https://cwiki.apache.org/confluence/display/FLINK/Flin
k+Improvement+Proposals. Can you grant me the proper access?

Thanks,

Bowen




On Tue, Apr 24, 2018 at 2:40 AM, Fabian Hueske <fh...@gmail.com> wrote:

Hi Bowen,

Thanks for updating the proposal. This looks pretty good (as I said
before).
There are a few areas, that are not yet fully fleshed out:

1. Where should the TTL code reside? In the state backend or in the
operator?
2. How to get notified about state accesses? I guess this depends on 1.
3. How to avoid conflicts of TTL timers and user timers?

@Stefan (in CC) might have some ideas on these issues as well.

Cheers, Fabian

2018-04-22 21:14 GMT+02:00 Bowen <bo...@gmail.com>:

Hello community,

We've come up with a completely new design for Flink state TTL, documented
here
<https://docs.google.com/document/d/1PPwHMDHGWMOO8YGv08BNi8Fqe_h7SjeALRzmW-ZxSfY/edit?usp=sharing>,
and have run it by a few Flink PMC/committers.

What do you think? We'd love to hear feedbacks from you

Thanks,
Bowen


On Wed, Feb 7, 2018 at 12:53 PM, Fabian Hueske <fh...@gmail.com>
wrote:

Hi Bowen,

Thanks for the proposal! I think state TTL would be a great feature!
Actually, we have implemented this for SQL / Table API [1].
I've added a couple of comments to the design doc.

In principle, I'm not sure if this functionality should be added to the
state backends.
We could also use the existing timer service which would have a few
nice
benefits (see my comments in the docs).

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/
dev/table/streaming.html#idle-state-retention-time

2018-02-06 8:26 GMT+01:00 Bowen Li <bo...@gmail.com>:

Hi guys,

I want to propose a new FLIP -- FLIP-25 - Support User State TTL
Natively
in Flink. This has been one of most handy and most frequently asked
features in Flink community. The jira ticket is FLINK-3089
<https://issues.apache.org/jira/browse/FLINK-3089>.

I've written a rough design
<https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
uureyEr_nPAvSo/edit#>
doc
<https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
uureyEr_nPAvSo/edit#>,
and developed prototypes for both heap and rocksdb state backends.

My question is: shall we create a FLIP page for this? Can I be
granted the
privileges of creating pages in
https://cwiki.apache.org/confluence/display/FLINK/
Flink+Improvement+Proposals
?

Thanks,
Bowen








Re: [Discuss] Proposing FLIP-25 - Support User State TTL Natively in Flink

Posted by Bowen Li <bo...@gmail.com>.
Thank you, Fabian! I've created the FLIP-25 page
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively>
.

To continue our discussion points:
1. I see what you mean now. I totally agree. Since we don't completely know
it now, shall we experiment or prototype a little bit before deciding this?
2. -
3. Adding tags to timers is an option.

Another option I came up with recently, is like this: let *InternalTimerService
*maintains user timers and TTL timers separately. Implementation classes of
InternalTimerService should add two new collections of timers,  e.g.
*Ttl*ProcessingTimeTimersQueue
and *Ttl*EventTimeTimersQueue for HeapInternalTimerService. Upon
InternalTimerService#onProcessingTime() and advanceWatermark(), they will
first iterate through ProcessingTimeTimers and EventTimeTimers (user
timers) and then through *Ttl*ProcessingTimeTimers and *Ttl*EventTimeTimers
(Ttl timers).

We'll also add the following new internal APIs to register Ttl timers:

```
@Internal
public void registerTtlProcessingTimeTimer(N namespace, long time);

@Internal
public void registerTtlEventTimeTimer(N namespace, long time);
```

The biggest advantage, compared to option 1, is that it doesn't impact
existing timer-related checkpoint/savepoint, restore and migrations.

What do you think?  And, any other Flink committers want to chime in for
ideas? I've also documented the above two discussion points to the FLIP
page.

Thanks,
Bowen


On Wed, May 2, 2018 at 5:36 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Bowen,
>
> 1. The motivation to keep the TTL logic outside of the state backend was
> mainly to avoid state backend custom implementations. If we have a generic
> approach that would work for all state backends, we could try to put the
> logic into a base class like AbstractStateBackend. After all, state cleanup
> is tightly related to the responsibilities of state backends.
> 2. -
> 3. You're right. We should first call the user code before cleaning up.
> The main problem that I see right now is that we have to distinguish
> between user and TTL timers. AFAIK, the timer service does not support
> timer tags (or another method) to distinguish timers.
>
> I've given you the permissions to create and edit wiki pages.
>
> Best, Fabian
>
> 2018-04-30 7:47 GMT+02:00 Bowen Li <bo...@gmail.com>:
>
>> Thanks Fabian! Here're my comments inline, and let me know your thoughts.
>>
>> 1. Where should the TTL code reside? In the state backend or in the
>> operator?
>>
>> I believe TTL code should not reside in state backend, because a critical
>> design is that TTL is independent of and transparent to state backends.
>>
>> According to my current knowledge, I think it probably should live with
>> operators in flink-streaming-java.
>>
>>
>> 2. How to get notified about state accesses? I guess this depends on 1.
>>
>> You previously suggested using callbacks. I believe that's the right way
>> to do decoupling.
>>
>>
>> 3. How to avoid conflicts of TTL timers and user timers?
>>
>> User timers might always be invoked first? This is not urgent, shall we
>> bake it for more time and discuss it along the way?
>>
>>
>>
>> Besides, I don't have access to create a FLIP page under
>> https://cwiki.apache.org/confluence/display/FLINK/Flin
>> k+Improvement+Proposals. Can you grant me the proper access?
>>
>> Thanks,
>>
>> Bowen
>>
>>
>>
>>
>> On Tue, Apr 24, 2018 at 2:40 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Bowen,
>>>
>>> Thanks for updating the proposal. This looks pretty good (as I said
>>> before).
>>> There are a few areas, that are not yet fully fleshed out:
>>>
>>> 1. Where should the TTL code reside? In the state backend or in the
>>> operator?
>>> 2. How to get notified about state accesses? I guess this depends on 1.
>>> 3. How to avoid conflicts of TTL timers and user timers?
>>>
>>> @Stefan (in CC) might have some ideas on these issues as well.
>>>
>>> Cheers, Fabian
>>>
>>> 2018-04-22 21:14 GMT+02:00 Bowen <bo...@gmail.com>:
>>>
>>>> Hello community,
>>>>
>>>> We've come up with a completely new design for Flink state TTL, documented
>>>> here
>>>> <https://docs.google.com/document/d/1PPwHMDHGWMOO8YGv08BNi8Fqe_h7SjeALRzmW-ZxSfY/edit?usp=sharing>,
>>>> and have run it by a few Flink PMC/committers.
>>>>
>>>> What do you think? We'd love to hear feedbacks from you
>>>>
>>>> Thanks,
>>>> Bowen
>>>>
>>>>
>>>> On Wed, Feb 7, 2018 at 12:53 PM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Bowen,
>>>>>
>>>>> Thanks for the proposal! I think state TTL would be a great feature!
>>>>> Actually, we have implemented this for SQL / Table API [1].
>>>>> I've added a couple of comments to the design doc.
>>>>>
>>>>> In principle, I'm not sure if this functionality should be added to the
>>>>> state backends.
>>>>> We could also use the existing timer service which would have a few
>>>>> nice
>>>>> benefits (see my comments in the docs).
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>>>> dev/table/streaming.html#idle-state-retention-time
>>>>>
>>>>> 2018-02-06 8:26 GMT+01:00 Bowen Li <bo...@gmail.com>:
>>>>>
>>>>> > Hi guys,
>>>>> >
>>>>> > I want to propose a new FLIP -- FLIP-25 - Support User State TTL
>>>>> Natively
>>>>> > in Flink. This has been one of most handy and most frequently asked
>>>>> > features in Flink community. The jira ticket is FLINK-3089
>>>>> > <https://issues.apache.org/jira/browse/FLINK-3089>.
>>>>> >
>>>>> > I've written a rough design
>>>>> > <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
>>>>> > uureyEr_nPAvSo/edit#>
>>>>> > doc
>>>>> > <https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKc
>>>>> > uureyEr_nPAvSo/edit#>,
>>>>> > and developed prototypes for both heap and rocksdb state backends.
>>>>> >
>>>>> > My question is: shall we create a FLIP page for this? Can I be
>>>>> granted the
>>>>> > privileges of creating pages in
>>>>> > https://cwiki.apache.org/confluence/display/FLINK/
>>>>> > Flink+Improvement+Proposals
>>>>> > ?
>>>>> >
>>>>> > Thanks,
>>>>> > Bowen
>>>>> >
>>>>>
>>>>
>>>>
>>>
>>
>