You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/14 16:58:02 UTC
[3/5] flink git commit: [FLINK-5282] Fix closing streams on exception
in SavepointV0Serializer
[FLINK-5282] Fix closing streams on exception in SavepointV0Serializer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8cda6a22
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8cda6a22
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8cda6a22
Branch: refs/heads/master
Commit: 8cda6a2260bbbd8e84349f0204d2980cfdd5a48a
Parents: 35f4ea7
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Dec 7 21:25:29 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 14 17:50:51 2016 +0100
----------------------------------------------------------------------
.../savepoint/SavepointV0Serializer.java | 20 +++++++++++++-------
1 file changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8cda6a22/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
index e4125e5..dc307e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
@@ -286,16 +287,21 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
CheckpointStreamFactory.CheckpointStateOutputStream keyedStateOut =
checkpointStreamFactory.createCheckpointStateOutputStream(checkpointID, 0L);
- final long offset = keyedStateOut.getPos();
+ try {
+ final long offset = keyedStateOut.getPos();
- InstantiationUtil.serializeObject(keyedStateOut, oldKeyedState);
- StreamStateHandle streamStateHandle = keyedStateOut.closeAndGetHandle();
+ InstantiationUtil.serializeObject(keyedStateOut, oldKeyedState);
+ StreamStateHandle streamStateHandle = keyedStateOut.closeAndGetHandle();
+ keyedStateOut = null; // makes IOUtils.closeQuietly(...) ignore this
- if (null != streamStateHandle) {
- KeyGroupRangeOffsets keyGroupRangeOffsets =
- new KeyGroupRangeOffsets(parallelInstanceIdx, parallelInstanceIdx, new long[]{offset});
+ if (null != streamStateHandle) {
+ KeyGroupRangeOffsets keyGroupRangeOffsets =
+ new KeyGroupRangeOffsets(parallelInstanceIdx, parallelInstanceIdx, new long[]{offset});
- return new MigrationKeyGroupStateHandle(keyGroupRangeOffsets, streamStateHandle);
+ return new MigrationKeyGroupStateHandle(keyGroupRangeOffsets, streamStateHandle);
+ }
+ } finally {
+ IOUtils.closeQuietly(keyedStateOut);
}
}
return null;