You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/03 09:43:00 UTC
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462200#comment-16462200 ]
ASF GitHub Bot commented on FLINK-9182:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5908#discussion_r185734334
--- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
@@ -395,36 +402,102 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times
*
* @param context context that provides information and means required for taking a snapshot
*/
- public void snapshotState(StateSnapshotContext context) throws Exception {
+ public void snapshotState(StateSnapshotContext context, OperatorSnapshotFutures snapshotInProgress) throws Exception {
if (getKeyedStateBackend() != null) {
KeyedStateCheckpointOutputStream out;
-
+ OperatorStateCheckpointOutputStream metaOut;
try {
out = context.getRawKeyedOperatorStateOutput();
} catch (Exception exception) {
throw new Exception("Could not open raw keyed operator state stream for " +
getOperatorName() + '.', exception);
}
-
try {
- KeyGroupsList allKeyGroups = out.getKeyGroupList();
- for (int keyGroupIdx : allKeyGroups) {
- out.startNewKeyGroup(keyGroupIdx);
-
- timeServiceManager.snapshotStateForKeyGroup(
- new DataOutputViewStreamWrapper(out), keyGroupIdx);
- }
+ metaOut = context.getRawKeyedOperatorStateMetaOutput();
} catch (Exception exception) {
- throw new Exception("Could not write timer service of " + getOperatorName() +
- " to checkpoint state stream.", exception);
- } finally {
- try {
- out.close();
- } catch (Exception closeException) {
- LOG.warn("Could not close raw keyed operator state stream for {}. This " +
- "might have prevented deleting some state data.", getOperatorName(), closeException);
- }
+ throw new Exception("Could not open raw operator state stream for " +
+ getOperatorName() + '.', exception);
}
+ final Tuple4<Integer, Map<String, HeapInternalTimerService>, Integer, TreeSet<Integer>> ret = timeServiceManager.startOneSnapshotState();
+ final int currentSnapshotVersion = ret.f0;
+ final Map<String, HeapInternalTimerService> timerServices = ret.f1;
+ final Integer stateTableVersion = ret.f2;
+ final TreeSet<Integer> snapshotVersions = ret.f3;
+ LOG.info("snapshotVersions after calling startOneSnapshotState:" + snapshotVersions.toString());
+ Callable<Boolean> snapshotTimerCallable = new Callable() {
+ @Override
+ public Boolean call() {
+ try {
+ KeyGroupsList allKeyGroups = out.getKeyGroupList();
+ metaOut.startNewPartition();
+ DataOutputViewStreamWrapper metaWrapper = new DataOutputViewStreamWrapper(metaOut);
+ metaWrapper.writeInt(stateTableVersion);
+ if (snapshotVersions.size() > 0) {
+ metaWrapper.writeInt(snapshotVersions.size());
+ for (Integer i : snapshotVersions) {
+ metaWrapper.writeInt(i);
+ }
+ }
+ else {
+ metaWrapper.writeInt(0);
+ }
+ int keyGroupCount = allKeyGroups.getNumberOfKeyGroups();
+ metaWrapper.writeInt(keyGroupCount);
+ for (int keyGroupIdx : allKeyGroups) {
+ out.startNewKeyGroup(keyGroupIdx);
+ metaWrapper.writeInt(keyGroupIdx);
+ InternalTimerServiceSerializationProxy serializationProxy =
+ new InternalTimerServiceSerializationProxy(timerServices, keyGroupIdx,
+ currentSnapshotVersion, timeServiceManager, metaWrapper);
+
+ serializationProxy.write(new DataOutputViewStreamWrapper(out));
+
+ }
+ LOG.info("return Tuple4 and snapshotVersions:" + snapshotVersions.toString());
+ return true;
+ } catch (Exception exception) {
+ LOG.error("Could not write timer service of " + getOperatorName() +
+ " to checkpoint state stream.", exception);
+ return false;
+ } finally {
+ timeServiceManager.stopOneSnapshotState(currentSnapshotVersion);
+ StateSnapshotContextSynchronousImpl snapshotContext = (StateSnapshotContextSynchronousImpl) context;
+ try {
+ snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
+ } catch (IOException e) {
+ LOG.warn("setKeyedStateRawFuture in callable excpetion", e);
+ return false;
--- End diff --
Putting returns inside a finally block here is a bad idea because this can swallow and hide `Throwables`.
> async checkpoints for timer service
> -----------------------------------
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.5.0, 1.4.2
> Reporter: makeyang
> Assignee: makeyang
> Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
> ## with the increase in the number of 'InternalTimer' object the checkpoint more and more slowly
> # improvement desgin
> ## maintain a stateTableVersion, which is exactly the same thing as CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a readwrite lock, which is used to protect snapshotVersions and stateTableVersion
> ## for each InternalTimer, add 2 more properties: create version and delete version beside 3 existing properties: timestamp, key and namespace. each time a Timer is registered in timerservice, it is created with stateTableVersion as its create version while delete version is -1. each time when timer is deleted in timerservice, it is marked delete for giving it a delete verison equals to stateTableVersion without physically delete it from timerservice.
> ## each time when try to snapshot timers, InternalTimeServiceManager increase its stateTableVersion and add this stateTableVersion in snapshotVersions. these 2 operators are protected by write lock of InternalTimeServiceManager. that current stateTableVersion take as snapshot version of this snapshot
> ## shallow copy <String,HeapInternalTimerService> tuples
> ## then use a another thread asynchronous snapshot whole things: keyserialized, namespaceserializer and timers. for timers which is not deleted(delete version is -1) and create version less than snapshot version, serialized it. for timers whose delete version is not -1 and is bigger than or equals snapshot version, serialized it. otherwise, it will not be serialized by this snapshot.
> ## when everything is serialized, remove snapshot version in snapshotVersions, which is still in another thread and this action is guarded by write lock.
> ## last thing: timer physical deletion. 2 places to physically delete timers: each time when timer is deleted in timerservice, it is marked delete for giving it a delete verison equals to stateTableVersion without physically delete it from timerservice. after this, check if snapshotVersions size is 0 (which means there is no running snapshot) and if true, delete timer .the other place to delete is in snapshot timer's iterat: when timer's delete version is less than min value of snapshotVersions, which means the timer is deleted and no running snapshot should keep it.
> ## some more additions: processingTimeTimers and eventTimeTimers for each group used to be hashset and now it is changed to concurrenthashmap with key+namesapce+timestamp as its hash key.
> # related mail list thread
> ## http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
> # github pull request
> ## //coming soon
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)