You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/05/03 09:42:19 UTC
[GitHub] flink pull request #5908: [FLINK-9182]async checkpoints for timer service
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5908#discussion_r185734334
--- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
@@ -395,36 +402,102 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times
*
* @param context context that provides information and means required for taking a snapshot
*/
- public void snapshotState(StateSnapshotContext context) throws Exception {
+ public void snapshotState(StateSnapshotContext context, OperatorSnapshotFutures snapshotInProgress) throws Exception {
if (getKeyedStateBackend() != null) {
KeyedStateCheckpointOutputStream out;
-
+ OperatorStateCheckpointOutputStream metaOut;
try {
out = context.getRawKeyedOperatorStateOutput();
} catch (Exception exception) {
throw new Exception("Could not open raw keyed operator state stream for " +
getOperatorName() + '.', exception);
}
-
try {
- KeyGroupsList allKeyGroups = out.getKeyGroupList();
- for (int keyGroupIdx : allKeyGroups) {
- out.startNewKeyGroup(keyGroupIdx);
-
- timeServiceManager.snapshotStateForKeyGroup(
- new DataOutputViewStreamWrapper(out), keyGroupIdx);
- }
+ metaOut = context.getRawKeyedOperatorStateMetaOutput();
} catch (Exception exception) {
- throw new Exception("Could not write timer service of " + getOperatorName() +
- " to checkpoint state stream.", exception);
- } finally {
- try {
- out.close();
- } catch (Exception closeException) {
- LOG.warn("Could not close raw keyed operator state stream for {}. This " +
- "might have prevented deleting some state data.", getOperatorName(), closeException);
- }
+ throw new Exception("Could not open raw operator state stream for " +
+ getOperatorName() + '.', exception);
}
+ final Tuple4<Integer, Map<String, HeapInternalTimerService>, Integer, TreeSet<Integer>> ret = timeServiceManager.startOneSnapshotState();
+ final int currentSnapshotVersion = ret.f0;
+ final Map<String, HeapInternalTimerService> timerServices = ret.f1;
+ final Integer stateTableVersion = ret.f2;
+ final TreeSet<Integer> snapshotVersions = ret.f3;
+ LOG.info("snapshotVersions after calling startOneSnapshotState:" + snapshotVersions.toString());
+ Callable<Boolean> snapshotTimerCallable = new Callable() {
+ @Override
+ public Boolean call() {
+ try {
+ KeyGroupsList allKeyGroups = out.getKeyGroupList();
+ metaOut.startNewPartition();
+ DataOutputViewStreamWrapper metaWrapper = new DataOutputViewStreamWrapper(metaOut);
+ metaWrapper.writeInt(stateTableVersion);
+ if (snapshotVersions.size() > 0) {
+ metaWrapper.writeInt(snapshotVersions.size());
+ for (Integer i : snapshotVersions) {
+ metaWrapper.writeInt(i);
+ }
+ }
+ else {
+ metaWrapper.writeInt(0);
+ }
+ int keyGroupCount = allKeyGroups.getNumberOfKeyGroups();
+ metaWrapper.writeInt(keyGroupCount);
+ for (int keyGroupIdx : allKeyGroups) {
+ out.startNewKeyGroup(keyGroupIdx);
+ metaWrapper.writeInt(keyGroupIdx);
+ InternalTimerServiceSerializationProxy serializationProxy =
+ new InternalTimerServiceSerializationProxy(timerServices, keyGroupIdx,
+ currentSnapshotVersion, timeServiceManager, metaWrapper);
+
+ serializationProxy.write(new DataOutputViewStreamWrapper(out));
+
+ }
+ LOG.info("return Tuple4 and snapshotVersions:" + snapshotVersions.toString());
+ return true;
+ } catch (Exception exception) {
+ LOG.error("Could not write timer service of " + getOperatorName() +
+ " to checkpoint state stream.", exception);
+ return false;
+ } finally {
+ timeServiceManager.stopOneSnapshotState(currentSnapshotVersion);
+ StateSnapshotContextSynchronousImpl snapshotContext = (StateSnapshotContextSynchronousImpl) context;
+ try {
+ snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
+ } catch (IOException e) {
+ LOG.warn("setKeyedStateRawFuture in callable excpetion", e);
+ return false;
--- End diff --
Putting returns inside a finally block here is a bad idea because this can swallow and hide `Throwables`.
---