You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 林德强 <li...@gmail.com> on 2018/03/16 02:11:07 UTC

Slow flink checkpoint

Hi,

I'm run a job on Flink streaming. I found with the increase in the number
of  'InternalTimer' object the checkpoint more and more slowly. Is there
any way to solve this problem ? such as make the "timeServiceManager"
snapshot async.





Thanks

Re: Slow flink checkpoint

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

yes, that is correct, the timer service is currently only available in main-memory and only with synchronous snapshots. this topic is on our TODO list for after the Flink 1.5 release.

Best,
Stefan

> Am 16.03.2018 um 09:03 schrieb Fabian Hueske <fh...@gmail.com>:
> 
> Hi,
> 
> AFAIK, that's not possible. 
> The only "solution" is to reduce the number of timers. Whether that's possible or not, depends on the application.
> For example, if you use timers to clean up state, you can work with an upper and lower bound and only register one timer for each (upper - lower) interval.
> 
> Best, Fabian
> 
> 2018-03-16 3:11 GMT+01:00 林德强 <lindeqiang1988@gmail.com <ma...@gmail.com>>:
> Hi,
> I'm run a job on Flink streaming. I found with the increase in the number of  'InternalTimer' object the checkpoint more and more slowly. Is there any way to solve this problem ? such as make the "timeServiceManager" snapshot async.
> 
> 
> 
> 
> Thanks
> 


Re: Slow flink checkpoint

Posted by makeyang <ri...@hotmail.com>.
the test is very promising.
the time sync part takes from couple of seconds to couple of mill-seconds.
1000x time reduce(overall time not save since it is just move from sync to
async)
are u guys interested in this change?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: 答复: Slow flink checkpoint

Posted by Fabian Hueske <fh...@gmail.com>.
Thanks MaKeyang!

I've given you contributor permissions and assigned the issue to you.

Best, Fabian

2018-04-16 13:19 GMT+02:00 ma ky <ri...@hotmail.com>:

> Fabian:
>     thanks for u replay.
>     I have create a jira issue:
> https://issues.apache.org/jira/browse/FLINK-9182?jql=
> project%20%3D%20FLINK%20AND%20issuetype%20%3D%
> 20Improvement%20AND%20created%20%3E%3D%20-10m
>
>    I'll pull the code ASAP.
>
>
>
>
> MaKeyang
> TIG.JD.COM
> 京东基础架构 <http://tig.jd.com/>
> tig.jd.com
> TIG官网
>
>
> ------------------------------
> *发件人:* Fabian Hueske <fh...@gmail.com>
> *发送时间:* 2018年4月16日 16:21
> *收件人:* makeyang
> *抄送:* user; Aljoscha Krettek
> *主题:* Re: Slow flink checkpoint
>
> Hi everybody,
>
> Thanks so much for looking into this issue and posting the detailed
> description of your approach.
> As said before, improving the checkpointing performance for timers is a
> very important improvement for Flink.
>
> I'm not familiar with the internals of the timer service checkpointing,
> but adding an add and delete version field and perform async checkpoints
> based on these fields seems like a good approach to me.
> IIRC, Aljoscha (in CC) implemented the timer service and its
> checkpointing. He might have more comments.
>
> I'd suggest to create a JIRA (everybody can do that) and repost the
> description of your approach there.
> If you have the code ready, you can also open a PR and reference the JIRA.
>
> Best, Fabian
>
> 2018-04-16 9:03 GMT+02:00 makeyang <ri...@hotmail.com>:
>
> since flink forward SF has done.
> can you guys give some minutes to take a look at this issue and give some
> thoughts on it? help to review/comments on my desgin? or give us a design
> so
> that I can help to implement it.
>
> thanks a lot.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
> nabble.com/
>
>
>

答复: Slow flink checkpoint

Posted by ma ky <ri...@hotmail.com>.
Fabian:
    thanks for u replay.
    I have create a jira issue:
https://issues.apache.org/jira/browse/FLINK-9182?jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20Improvement%20AND%20created%20%3E%3D%20-10m

   I'll pull the code ASAP.



MaKeyang
TIG.JD.COM<http://TIG.JD.COM>
京东基础架构<http://tig.jd.com/>
tig.jd.com
TIG官网



________________________________
发件人: Fabian Hueske <fh...@gmail.com>
发送时间: 2018年4月16日 16:21
收件人: makeyang
抄送: user; Aljoscha Krettek
主题: Re: Slow flink checkpoint

Hi everybody,

Thanks so much for looking into this issue and posting the detailed description of your approach.
As said before, improving the checkpointing performance for timers is a very important improvement for Flink.

I'm not familiar with the internals of the timer service checkpointing, but adding an add and delete version field and perform async checkpoints based on these fields seems like a good approach to me.
IIRC, Aljoscha (in CC) implemented the timer service and its checkpointing. He might have more comments.

I'd suggest to create a JIRA (everybody can do that) and repost the description of your approach there.
If you have the code ready, you can also open a PR and reference the JIRA.

Best, Fabian

2018-04-16 9:03 GMT+02:00 makeyang <ri...@hotmail.com>>:
since flink forward SF has done.
can you guys give some minutes to take a look at this issue and give some
thoughts on it? help to review/comments on my desgin? or give us a design so
that I can help to implement it.

thanks a lot.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Slow flink checkpoint

Posted by Fabian Hueske <fh...@gmail.com>.
Hi everybody,

Thanks so much for looking into this issue and posting the detailed
description of your approach.
As said before, improving the checkpointing performance for timers is a
very important improvement for Flink.

I'm not familiar with the internals of the timer service checkpointing, but
adding an add and delete version field and perform async checkpoints based
on these fields seems like a good approach to me.
IIRC, Aljoscha (in CC) implemented the timer service and its checkpointing.
He might have more comments.

I'd suggest to create a JIRA (everybody can do that) and repost the
description of your approach there.
If you have the code ready, you can also open a PR and reference the JIRA.

Best, Fabian

2018-04-16 9:03 GMT+02:00 makeyang <ri...@hotmail.com>:

> since flink forward SF has done.
> can you guys give some minutes to take a look at this issue and give some
> thoughts on it? help to review/comments on my desgin? or give us a design
> so
> that I can help to implement it.
>
> thanks a lot.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Slow flink checkpoint

Posted by makeyang <ri...@hotmail.com>.
since flink forward SF has done.
can you guys give some minutes to take a look at this issue and give some
thoughts on it? help to review/comments on my desgin? or give us a design so
that I can help to implement it.

thanks a lot.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Slow flink checkpoint

Posted by 林德强 <li...@gmail.com>.
Hi Stefan , Fabian ,
		Keyang  is engineer in our team, he has do a lot of efforts on the timers' snapshot async. What do you think of his idea?


Best,
Deqiang
TIG.JD.COM <http://tig.jd.com/>


> 在 2018年4月1日,下午7:21,makeyang <ri...@hotmail.com> 写道:
> 
> I have put a lot of efforts on this issue and try to resolve it:
> 1. let me describe current timers' snapshot path first:
>    a) for each keygroup, invoke
> InternalTimeServiceManager.snapshotStateForKeyGroup
>    b) InternalTimeServiceManager create a
> InternalTimerServiceSerializationProxy to write snapshot
>    c) InternalTimerServiceSerializationProxy iterat <String,//which is
> service name,
>    HeapInternalTimerService> tuple and write service name and
> snapshotTimersForKeyGroup, then get InternalTimersSnapshotWriter to
> writeTimersSnapshot
>    d) in method writeTimersSnapshot of InternalTimersSnapshotWriter, first
> write keyserializer and namespaceserializer, then get eventTimers and
> processingTimers of InternalTimersSnapshot, which is Set of InternalTimer
> and serializer them.
> 
> 2. my first try is shallow copy the <Strin, HeapInternalTimerService> tuples
> and then shallow copy the eventTimers and processingTimers, then use another
> thread to snapshot them without blocking the event processing thread. but it
> turns out that shallow copy of the eventTimers and processingTimers are time
> consumed and this solution failed
> 
> 3. then I try to borrow the idea of data structure CopyOnWriteStateTable and
> try to manage timers with it. but after digging more, I found out that there
> is a more easy way to achieve asynchronous snapshot timers due to one fact:
> InternalTimer is immutable. we can achieve asynchronous with a more easy way
> based on this fact: 
>    a)maintain a stateTableVersion, which is exactly the same thing as
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing
> as CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a
> readwrite lock, which is used to protect snapshotVersions and
> stateTableVersion
>    b)for each InternalTimer, add 2 more properties: create version and
> delete version beside 3 existing properties: timestamp, key and namespace.
> each time a Timer is registered in timerservice, it is created with
> stateTableVersion as its create version while delete version is -1. each
> time when timer is deleted in timerservice, it is marked delete for giving
> it a delete verison equals to stateTableVersion without physically delete it
> from timerservice. 
>    c)each time when try to snapshot timers, InternalTimeServiceManager
> increase its stateTableVersion and add this stateTableVersion in
> snapshotVersions. these 2 operators are protected by write lock of
> InternalTimeServiceManager. that current stateTableVersion take as snapshot
> version of this snapshot
>    d)shallow copy <String,HeapInternalTimerService> tuples 
>    e)then use a another thread asynchronous snapshot whole things:
> keyserialized, namespaceserializer and timers. for timers which is not
> deleted(delete version is -1) and create version less than snapshot version,
> serialized it. for timers whose delete version is not -1 and is bigger than
> or equals snapshot version, serialized it. otherwise, it will not be
> serialized by this snapshot. 
>    f)when everything is serialized, remove snapshot version in
> snapshotVersions, which is still in another thread and this action is
> guarded by write lock.
>    g)last thing: timer physical deletion. 2 places to physically delete
> timers: each time when timer is deleted in timerservice, it is marked delete
> for giving it a delete verison equals to stateTableVersion without
> physically delete it from timerservice. after this, check if timer's delete
> version is less than min value of snapshotVersions with read lock
> guarded(which means there is no active timer snapshot running) and if that
> is true, physically delete it. the other place to delete is in snapshot
> timer's iterat: when timer's delete version is less than min value of
> snapshotVersions, which means the timer is deleted and no running snapshot
> should keep it.
>    h) some more additions: processingTimeTimers and eventTimeTimers for
> each group used to be hashset and now it is changed to concurrenthashmap
> with key+namesapce+timestamp as its hash key.
> 
> the code is done and test is still runnng. I post this comments not only try
> to hear u guys voice, but also try to figure out some more questios related
> to currently timer snapshot code path. my questions are below:
> 1. in method onProcessingTime of HeapInternalTimerService, it is invoked by
> another thread of ProcessingTimeService, and in this thread, it will remove
> timer in HeapInternalTimerService. while in current timer snapshot path, I
> haven't found there is any shallow copy of processingTimeTimers and
> eventTimeTimers. how could this won't cause concurrent modification
> exception?
> 2. since onProcessingTime is trigged in another thread, when timers are
> snapshot in working thread, what if then a timer is fired and triggerTarget
> is processed, which could cause state changed, then asynchronous
> keyedstatsnapshot is trigged. won't this cause state inconsistent? let's
> image this case: all kedyed state is only chaned by timer. so Add timer1,
> timer2, timer3, timer4 and timer5 and since no timer is processed, keyed
> state is nothing. then timer1 and timer2 is processed, keyed state is k2.
> and left timer3, timer4 and timer5 in timer servcie. then snapshot timer3,
> timer4 and timer5 in synchronous way. then try to snapshot keyed state
> asynchronous while timer3 is processed and keyed state is k3. the eventually
> snapshot is timer3, timer4, timer5 and k3. as far as I understand, it should
> be timer3, timer4, timer5 and k2. please help me out this. 
> 
> thanks very much
> by the way, if u guys won't mind, can anyone of u open a jira issue to track
> this and when time is ok, I'll make contribution on this issue.
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Slow flink checkpoint

Posted by makeyang <ri...@hotmail.com>.
I have put a lot of efforts on this issue and try to resolve it:
1. let me describe current timers' snapshot path first:
    a) for each keygroup, invoke
InternalTimeServiceManager.snapshotStateForKeyGroup
    b) InternalTimeServiceManager create a
InternalTimerServiceSerializationProxy to write snapshot
    c) InternalTimerServiceSerializationProxy iterat <String,//which is
service name,
    HeapInternalTimerService> tuple and write service name and
snapshotTimersForKeyGroup, then get InternalTimersSnapshotWriter to
writeTimersSnapshot
    d) in method writeTimersSnapshot of InternalTimersSnapshotWriter, first
write keyserializer and namespaceserializer, then get eventTimers and
processingTimers of InternalTimersSnapshot, which is Set of InternalTimer
and serializer them.

2. my first try is shallow copy the <Strin, HeapInternalTimerService> tuples
and then shallow copy the eventTimers and processingTimers, then use another
thread to snapshot them without blocking the event processing thread. but it
turns out that shallow copy of the eventTimers and processingTimers are time
consumed and this solution failed

3. then I try to borrow the idea of data structure CopyOnWriteStateTable and
try to manage timers with it. but after digging more, I found out that there
is a more easy way to achieve asynchronous snapshot timers due to one fact:
InternalTimer is immutable. we can achieve asynchronous with a more easy way
based on this fact: 
    a)maintain a stateTableVersion, which is exactly the same thing as
CopyOnWriteStateTable and snapshotVersions which is exactly the same thing
as CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a
readwrite lock, which is used to protect snapshotVersions and
stateTableVersion
    b)for each InternalTimer, add 2 more properties: create version and
delete version beside 3 existing properties: timestamp, key and namespace.
each time a Timer is registered in timerservice, it is created with
stateTableVersion as its create version while delete version is -1. each
time when timer is deleted in timerservice, it is marked delete for giving
it a delete verison equals to stateTableVersion without physically delete it
from timerservice. 
    c)each time when try to snapshot timers, InternalTimeServiceManager
increase its stateTableVersion and add this stateTableVersion in
snapshotVersions. these 2 operators are protected by write lock of
InternalTimeServiceManager. that current stateTableVersion take as snapshot
version of this snapshot
    d)shallow copy <String,HeapInternalTimerService> tuples 
    e)then use a another thread asynchronous snapshot whole things:
keyserialized, namespaceserializer and timers. for timers which is not
deleted(delete version is -1) and create version less than snapshot version,
serialized it. for timers whose delete version is not -1 and is bigger than
or equals snapshot version, serialized it. otherwise, it will not be
serialized by this snapshot. 
    f)when everything is serialized, remove snapshot version in
snapshotVersions, which is still in another thread and this action is
guarded by write lock.
    g)last thing: timer physical deletion. 2 places to physically delete
timers: each time when timer is deleted in timerservice, it is marked delete
for giving it a delete verison equals to stateTableVersion without
physically delete it from timerservice. after this, check if timer's delete
version is less than min value of snapshotVersions with read lock
guarded(which means there is no active timer snapshot running) and if that
is true, physically delete it. the other place to delete is in snapshot
timer's iterat: when timer's delete version is less than min value of
snapshotVersions, which means the timer is deleted and no running snapshot
should keep it.
    h) some more additions: processingTimeTimers and eventTimeTimers for
each group used to be hashset and now it is changed to concurrenthashmap
with key+namesapce+timestamp as its hash key.

the code is done and test is still runnng. I post this comments not only try
to hear u guys voice, but also try to figure out some more questios related
to currently timer snapshot code path. my questions are below:
1. in method onProcessingTime of HeapInternalTimerService, it is invoked by
another thread of ProcessingTimeService, and in this thread, it will remove
timer in HeapInternalTimerService. while in current timer snapshot path, I
haven't found there is any shallow copy of processingTimeTimers and
eventTimeTimers. how could this won't cause concurrent modification
exception?
2. since onProcessingTime is trigged in another thread, when timers are
snapshot in working thread, what if then a timer is fired and triggerTarget
is processed, which could cause state changed, then asynchronous
keyedstatsnapshot is trigged. won't this cause state inconsistent? let's
image this case: all kedyed state is only chaned by timer. so Add timer1,
timer2, timer3, timer4 and timer5 and since no timer is processed, keyed
state is nothing. then timer1 and timer2 is processed, keyed state is k2.
and left timer3, timer4 and timer5 in timer servcie. then snapshot timer3,
timer4 and timer5 in synchronous way. then try to snapshot keyed state
asynchronous while timer3 is processed and keyed state is k3. the eventually
snapshot is timer3, timer4, timer5 and k3. as far as I understand, it should
be timer3, timer4, timer5 and k2. please help me out this. 

thanks very much
by the way, if u guys won't mind, can anyone of u open a jira issue to track
this and when time is ok, I'll make contribution on this issue.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Slow flink checkpoint

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Yes, you cannot start a separate thread to cleanup the state.
State is managed by Flink and can only be accessed at certain points in
time when the user code is called.

If you are using event time, another trick you could play is to only
register all timers on (currentWatermark + 1).
That will cause the trigger to fire whenever the watermark advances. You
could store all relevant timestamps in a ListState and act on all timers
that are less than the currentWatermark.
Also, since there is only a single timer per timestamp (currentWM + 1)
there will be only one watermark per key.

Best, Fabian

2018-03-16 13:56 GMT+01:00 林德强 <li...@gmail.com>:

> Hi Fabian ,
>    Reduce the number of timers is a good idea.
>    But in my application the timer is different from the key  registered
> follow the keyBy . May be it can't work with an upper and lower bound.
>
>    I try modify the flink resource and start a thread to clean the
> expired keyed sate, but it doesn't work well because of concurrency issues.
>
>
>
>
>
>
> Best,
> Deqiang
>
> 2018-03-16 16:03 GMT+08:00 Fabian Hueske <fh...@gmail.com>:
>
>> Hi,
>>
>> AFAIK, that's not possible.
>> The only "solution" is to reduce the number of timers. Whether that's
>> possible or not, depends on the application.
>> For example, if you use timers to clean up state, you can work with an
>> upper and lower bound and only register one timer for each (upper - lower)
>> interval.
>>
>> Best, Fabian
>>
>> 2018-03-16 3:11 GMT+01:00 林德强 <li...@gmail.com>:
>>
>>> Hi,
>>>
>>> I'm run a job on Flink streaming. I found with the increase in the
>>> number of  'InternalTimer' object the checkpoint more and more slowly. Is
>>> there any way to solve this problem ? such as make the
>>> "timeServiceManager" snapshot async.
>>>
>>>
>>>
>>>
>>>
>>> Thanks
>>>
>>
>>
>

Re: Slow flink checkpoint

Posted by 林德强 <li...@gmail.com>.
Hi Fabian ,
   Reduce the number of timers is a good idea.
   But in my application the timer is different from the key  registered
follow the keyBy . May be it can't work with an upper and lower bound.

   I try modify the flink resource and start a thread to clean the expired
keyed sate, but it doesn't work well because of concurrency issues.






Best,
Deqiang

2018-03-16 16:03 GMT+08:00 Fabian Hueske <fh...@gmail.com>:

> Hi,
>
> AFAIK, that's not possible.
> The only "solution" is to reduce the number of timers. Whether that's
> possible or not, depends on the application.
> For example, if you use timers to clean up state, you can work with an
> upper and lower bound and only register one timer for each (upper - lower)
> interval.
>
> Best, Fabian
>
> 2018-03-16 3:11 GMT+01:00 林德强 <li...@gmail.com>:
>
>> Hi,
>>
>> I'm run a job on Flink streaming. I found with the increase in the number
>> of  'InternalTimer' object the checkpoint more and more slowly. Is there
>> any way to solve this problem ? such as make the "timeServiceManager"
>> snapshot async.
>>
>>
>>
>>
>>
>> Thanks
>>
>
>

Re: Slow flink checkpoint

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

AFAIK, that's not possible.
The only "solution" is to reduce the number of timers. Whether that's
possible or not, depends on the application.
For example, if you use timers to clean up state, you can work with an
upper and lower bound and only register one timer for each (upper - lower)
interval.

Best, Fabian

2018-03-16 3:11 GMT+01:00 林德强 <li...@gmail.com>:

> Hi,
>
> I'm run a job on Flink streaming. I found with the increase in the number
> of  'InternalTimer' object the checkpoint more and more slowly. Is there
> any way to solve this problem ? such as make the "timeServiceManager"
> snapshot async.
>
>
>
>
>
> Thanks
>