You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/03 09:43:00 UTC

[jira] [Commented] (FLINK-9182) async checkpoints for timer service

    [ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462200#comment-16462200 ] 

ASF GitHub Bot commented on FLINK-9182:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r185734334
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -395,36 +402,102 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times
     	 *
     	 * @param context context that provides information and means required for taking a snapshot
     	 */
    -	public void snapshotState(StateSnapshotContext context) throws Exception {
    +	public void snapshotState(StateSnapshotContext context, OperatorSnapshotFutures snapshotInProgress) throws Exception {
     		if (getKeyedStateBackend() != null) {
     			KeyedStateCheckpointOutputStream out;
    -
    +			OperatorStateCheckpointOutputStream metaOut;
     			try {
     				out = context.getRawKeyedOperatorStateOutput();
     			} catch (Exception exception) {
     				throw new Exception("Could not open raw keyed operator state stream for " +
     					getOperatorName() + '.', exception);
     			}
    -
     			try {
    -				KeyGroupsList allKeyGroups = out.getKeyGroupList();
    -				for (int keyGroupIdx : allKeyGroups) {
    -					out.startNewKeyGroup(keyGroupIdx);
    -
    -					timeServiceManager.snapshotStateForKeyGroup(
    -						new DataOutputViewStreamWrapper(out), keyGroupIdx);
    -				}
    +				metaOut = context.getRawKeyedOperatorStateMetaOutput();
     			} catch (Exception exception) {
    -				throw new Exception("Could not write timer service of " + getOperatorName() +
    -					" to checkpoint state stream.", exception);
    -			} finally {
    -				try {
    -					out.close();
    -				} catch (Exception closeException) {
    -					LOG.warn("Could not close raw keyed operator state stream for {}. This " +
    -						"might have prevented deleting some state data.", getOperatorName(), closeException);
    -				}
    +				throw new Exception("Could not open raw operator state stream for " +
    +					getOperatorName() + '.', exception);
     			}
    +			final Tuple4<Integer, Map<String, HeapInternalTimerService>, Integer, TreeSet<Integer>> ret = timeServiceManager.startOneSnapshotState();
    +			final int currentSnapshotVersion = ret.f0;
    +			final Map<String, HeapInternalTimerService> timerServices = ret.f1;
    +			final Integer stateTableVersion = ret.f2;
    +			final TreeSet<Integer> snapshotVersions = ret.f3;
    +			LOG.info("snapshotVersions after calling startOneSnapshotState:" + snapshotVersions.toString());
    +			Callable<Boolean> snapshotTimerCallable = new Callable() {
    +				@Override
    +				public Boolean call() {
    +					try {
    +						KeyGroupsList allKeyGroups = out.getKeyGroupList();
    +						metaOut.startNewPartition();
    +						DataOutputViewStreamWrapper metaWrapper = new DataOutputViewStreamWrapper(metaOut);
    +						metaWrapper.writeInt(stateTableVersion);
    +						if (snapshotVersions.size() > 0) {
    +							metaWrapper.writeInt(snapshotVersions.size());
    +							for (Integer i : snapshotVersions) {
    +								metaWrapper.writeInt(i);
    +							}
    +						}
    +						else {
    +							metaWrapper.writeInt(0);
    +						}
    +						int keyGroupCount = allKeyGroups.getNumberOfKeyGroups();
    +						metaWrapper.writeInt(keyGroupCount);
    +						for (int keyGroupIdx : allKeyGroups) {
    +							out.startNewKeyGroup(keyGroupIdx);
    +							metaWrapper.writeInt(keyGroupIdx);
    +							InternalTimerServiceSerializationProxy serializationProxy =
    +								new InternalTimerServiceSerializationProxy(timerServices, keyGroupIdx,
    +									currentSnapshotVersion, timeServiceManager, metaWrapper);
    +
    +							serializationProxy.write(new DataOutputViewStreamWrapper(out));
    +
    +						}
    +						LOG.info("return Tuple4 and snapshotVersions:" + snapshotVersions.toString());
    +						return true;
    +					} catch (Exception exception) {
    +						LOG.error("Could not write timer service of " + getOperatorName() +
    +							" to checkpoint state stream.", exception);
    +						return false;
    +					} finally {
    +						timeServiceManager.stopOneSnapshotState(currentSnapshotVersion);
    +						StateSnapshotContextSynchronousImpl snapshotContext = (StateSnapshotContextSynchronousImpl) context;
    +						try {
    +							snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
    +						} catch (IOException e) {
    +							LOG.warn("setKeyedStateRawFuture in callable excpetion", e);
    +							return false;
    --- End diff --
    
    Putting returns inside a finally block here is a bad idea because this can swallow and hide `Throwables`.


> async checkpoints for timer service
> -----------------------------------
>
>                 Key: FLINK-9182
>                 URL: https://issues.apache.org/jira/browse/FLINK-9182
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0, 1.4.2
>            Reporter: makeyang
>            Assignee: makeyang
>            Priority: Minor
>             Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a readwrite lock, which is used to protect snapshotVersions and stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete version beside 3 existing properties: timestamp, key and namespace. each time a Timer is registered in timerservice, it is created with stateTableVersion as its create version while delete version is -1. each time when timer is deleted in timerservice, it is marked delete for giving it a delete verison equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager increase its stateTableVersion and add this stateTableVersion in snapshotVersions. these 2 operators are protected by write lock of InternalTimeServiceManager. that current stateTableVersion take as snapshot version of this snapshot
>  ## shallow copy <String,HeapInternalTimerService> tuples
>  ## then use a another thread asynchronous snapshot whole things: keyserialized, namespaceserializer and timers. for timers which is not deleted(delete version is -1) and create version less than snapshot version, serialized it. for timers whose delete version is not -1 and is bigger than or equals snapshot version, serialized it. otherwise, it will not be serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in snapshotVersions, which is still in another thread and this action is guarded by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete timers: each time when timer is deleted in timerservice, it is marked delete for giving it a delete verison equals to stateTableVersion without physically delete it from timerservice. after this, check if snapshotVersions size is 0 (which means there is no running snapshot) and if true, delete timer .the other place to delete is in snapshot timer's iterat: when timer's delete version is less than min value of snapshotVersions, which means the timer is deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each group used to be hashset and now it is changed to concurrenthashmap with key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)