You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/14 16:58:02 UTC

[3/5] flink git commit: [FLINK-5282] Fix closing streams on exception in SavepointV0Serializer

[FLINK-5282] Fix closing streams on exception in SavepointV0Serializer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8cda6a22
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8cda6a22
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8cda6a22

Branch: refs/heads/master
Commit: 8cda6a2260bbbd8e84349f0204d2980cfdd5a48a
Parents: 35f4ea7
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Dec 7 21:25:29 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 14 17:50:51 2016 +0100

----------------------------------------------------------------------
 .../savepoint/SavepointV0Serializer.java        | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8cda6a22/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
index e4125e5..dc307e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
@@ -286,16 +287,21 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
 			CheckpointStreamFactory.CheckpointStateOutputStream keyedStateOut =
 					checkpointStreamFactory.createCheckpointStateOutputStream(checkpointID, 0L);
 
-			final long offset = keyedStateOut.getPos();
+			try {
+				final long offset = keyedStateOut.getPos();
 
-			InstantiationUtil.serializeObject(keyedStateOut, oldKeyedState);
-			StreamStateHandle streamStateHandle = keyedStateOut.closeAndGetHandle();
+				InstantiationUtil.serializeObject(keyedStateOut, oldKeyedState);
+				StreamStateHandle streamStateHandle = keyedStateOut.closeAndGetHandle();
+				keyedStateOut = null; // makes IOUtils.closeQuietly(...) ignore this
 
-			if (null != streamStateHandle) {
-				KeyGroupRangeOffsets keyGroupRangeOffsets =
-						new KeyGroupRangeOffsets(parallelInstanceIdx, parallelInstanceIdx, new long[]{offset});
+				if (null != streamStateHandle) {
+					KeyGroupRangeOffsets keyGroupRangeOffsets =
+							new KeyGroupRangeOffsets(parallelInstanceIdx, parallelInstanceIdx, new long[]{offset});
 
-				return new MigrationKeyGroupStateHandle(keyGroupRangeOffsets, streamStateHandle);
+					return new MigrationKeyGroupStateHandle(keyGroupRangeOffsets, streamStateHandle);
+				}
+			} finally {
+				IOUtils.closeQuietly(keyedStateOut);
 			}
 		}
 		return null;