You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/05/03 09:42:19 UTC

[GitHub] flink pull request #5908: [FLINK-9182]async checkpoints for timer service

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r185734334
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -395,36 +402,102 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times
     	 *
     	 * @param context context that provides information and means required for taking a snapshot
     	 */
    -	public void snapshotState(StateSnapshotContext context) throws Exception {
    +	public void snapshotState(StateSnapshotContext context, OperatorSnapshotFutures snapshotInProgress) throws Exception {
     		if (getKeyedStateBackend() != null) {
     			KeyedStateCheckpointOutputStream out;
    -
    +			OperatorStateCheckpointOutputStream metaOut;
     			try {
     				out = context.getRawKeyedOperatorStateOutput();
     			} catch (Exception exception) {
     				throw new Exception("Could not open raw keyed operator state stream for " +
     					getOperatorName() + '.', exception);
     			}
    -
     			try {
    -				KeyGroupsList allKeyGroups = out.getKeyGroupList();
    -				for (int keyGroupIdx : allKeyGroups) {
    -					out.startNewKeyGroup(keyGroupIdx);
    -
    -					timeServiceManager.snapshotStateForKeyGroup(
    -						new DataOutputViewStreamWrapper(out), keyGroupIdx);
    -				}
    +				metaOut = context.getRawKeyedOperatorStateMetaOutput();
     			} catch (Exception exception) {
    -				throw new Exception("Could not write timer service of " + getOperatorName() +
    -					" to checkpoint state stream.", exception);
    -			} finally {
    -				try {
    -					out.close();
    -				} catch (Exception closeException) {
    -					LOG.warn("Could not close raw keyed operator state stream for {}. This " +
    -						"might have prevented deleting some state data.", getOperatorName(), closeException);
    -				}
    +				throw new Exception("Could not open raw operator state stream for " +
    +					getOperatorName() + '.', exception);
     			}
    +			final Tuple4<Integer, Map<String, HeapInternalTimerService>, Integer, TreeSet<Integer>> ret = timeServiceManager.startOneSnapshotState();
    +			final int currentSnapshotVersion = ret.f0;
    +			final Map<String, HeapInternalTimerService> timerServices = ret.f1;
    +			final Integer stateTableVersion = ret.f2;
    +			final TreeSet<Integer> snapshotVersions = ret.f3;
    +			LOG.info("snapshotVersions after calling startOneSnapshotState:" + snapshotVersions.toString());
    +			Callable<Boolean> snapshotTimerCallable = new Callable() {
    +				@Override
    +				public Boolean call() {
    +					try {
    +						KeyGroupsList allKeyGroups = out.getKeyGroupList();
    +						metaOut.startNewPartition();
    +						DataOutputViewStreamWrapper metaWrapper = new DataOutputViewStreamWrapper(metaOut);
    +						metaWrapper.writeInt(stateTableVersion);
    +						if (snapshotVersions.size() > 0) {
    +							metaWrapper.writeInt(snapshotVersions.size());
    +							for (Integer i : snapshotVersions) {
    +								metaWrapper.writeInt(i);
    +							}
    +						}
    +						else {
    +							metaWrapper.writeInt(0);
    +						}
    +						int keyGroupCount = allKeyGroups.getNumberOfKeyGroups();
    +						metaWrapper.writeInt(keyGroupCount);
    +						for (int keyGroupIdx : allKeyGroups) {
    +							out.startNewKeyGroup(keyGroupIdx);
    +							metaWrapper.writeInt(keyGroupIdx);
    +							InternalTimerServiceSerializationProxy serializationProxy =
    +								new InternalTimerServiceSerializationProxy(timerServices, keyGroupIdx,
    +									currentSnapshotVersion, timeServiceManager, metaWrapper);
    +
    +							serializationProxy.write(new DataOutputViewStreamWrapper(out));
    +
    +						}
    +						LOG.info("return Tuple4 and snapshotVersions:" + snapshotVersions.toString());
    +						return true;
    +					} catch (Exception exception) {
    +						LOG.error("Could not write timer service of " + getOperatorName() +
    +							" to checkpoint state stream.", exception);
    +						return false;
    +					} finally {
    +						timeServiceManager.stopOneSnapshotState(currentSnapshotVersion);
    +						StateSnapshotContextSynchronousImpl snapshotContext = (StateSnapshotContextSynchronousImpl) context;
    +						try {
    +							snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
    +						} catch (IOException e) {
    +							LOG.warn("setKeyedStateRawFuture in callable excpetion", e);
    +							return false;
    --- End diff --
    
    Putting returns inside a finally block here is a bad idea because this can swallow and hide `Throwables`.


---