You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "makeyang (JIRA)" <ji...@apache.org> on 2018/04/16 11:15:00 UTC
[jira] [Created] (FLINK-9182) async checkpoints for timer service
makeyang created FLINK-9182:
-------------------------------
Summary: async checkpoints for timer service
Key: FLINK-9182
URL: https://issues.apache.org/jira/browse/FLINK-9182
Project: Flink
Issue Type: Improvement
Components: State Backends, Checkpointing
Affects Versions: 1.4.2, 1.5.0
Reporter: makeyang
Fix For: 1.4.3, 1.5.1
# problem description:
## with the increase in the number of 'InternalTimer' object the checkpoint more and more slowly
# improvement desgin
## maintain a stateTableVersion, which is exactly the same thing as CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a readwrite lock, which is used to protect snapshotVersions and stateTableVersion
## for each InternalTimer, add 2 more properties: create version and delete version beside 3 existing properties: timestamp, key and namespace. each time a Timer is registered in timerservice, it is created with stateTableVersion as its create version while delete version is -1. each time when timer is deleted in timerservice, it is marked delete for giving it a delete verison equals to stateTableVersion without physically delete it from timerservice.
## each time when try to snapshot timers, InternalTimeServiceManager increase its stateTableVersion and add this stateTableVersion in snapshotVersions. these 2 operators are protected by write lock of InternalTimeServiceManager. that current stateTableVersion take as snapshot version of this snapshot
## shallow copy <String,HeapInternalTimerService> tuples
## then use a another thread asynchronous snapshot whole things: keyserialized, namespaceserializer and timers. for timers which is not deleted(delete version is -1) and create version less than snapshot version, serialized it. for timers whose delete version is not -1 and is bigger than or equals snapshot version, serialized it. otherwise, it will not be serialized by this snapshot.
## when everything is serialized, remove snapshot version in snapshotVersions, which is still in another thread and this action is guarded by write lock.
## last thing: timer physical deletion. 2 places to physically delete timers: each time when timer is deleted in timerservice, it is marked delete for giving it a delete verison equals to stateTableVersion without physically delete it from timerservice. after this, check if snapshotVersions size is 0 (which means there is no running snapshot) and if true, delete timer .the other place to delete is in snapshot timer's iterat: when timer's delete version is less than min value of snapshotVersions, which means the timer is deleted and no running snapshot should keep it.
## some more additions: processingTimeTimers and eventTimeTimers for each group used to be hashset and now it is changed to concurrenthashmap with key+namesapce+timestamp as its hash key.
# related mail list thread
## http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
# github pull request
## //coming soon
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)