You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/19 09:12:45 UTC

[4/4] flink git commit: [FLINK-9022][state] Backend disposal in StreamTaskStateInitializer should always be performed in cleanup.

[FLINK-9022][state] Backend disposal in StreamTaskStateInitializer should always be performed in cleanup.

This step should be independent from the fact if the backend is still registered with the closeable registry.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/777cc1ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/777cc1ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/777cc1ab

Branch: refs/heads/master
Commit: 777cc1ab53884f12ac245e62d367c4439f7939e0
Parents: 388a083
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Apr 18 18:27:00 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 11:12:22 2018 +0200

----------------------------------------------------------------------
 .../api/operators/StreamTaskStateInitializerImpl.java     | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/777cc1ab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index d9bd089..460a52b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -165,12 +165,18 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize
 		} catch (Exception ex) {
 
 			// cleanup if something went wrong before results got published.
-			if (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
+			if (keyedStatedBackend != null) {
+				if (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
+					IOUtils.closeQuietly(keyedStatedBackend);
+				}
 				// release resource (e.g native resource)
 				keyedStatedBackend.dispose();
 			}
 
-			if (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
+			if (operatorStateBackend != null) {
+				if (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
+					IOUtils.closeQuietly(operatorStateBackend);
+				}
 				operatorStateBackend.dispose();
 			}