You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/06 17:06:23 UTC

[flink] branch release-1.10 updated: [FLINK-16313][state-processor-api] Properly dispose of native resources when closing input split

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 93d6324  [FLINK-16313][state-processor-api] Properly dispose of native resources when closing input split
93d6324 is described below

commit 93d6324ee0e3fb7c7975fb0c56280357adc66f24
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Fri Mar 6 09:49:47 2020 -0600

    [FLINK-16313][state-processor-api] Properly dispose of native resources when closing input split
    
    This closes #11335.
---
 .../flink/state/api/input/KeyedStateInputFormat.java     |  7 ++++++-
 .../state/api/input/operator/StateReaderOperator.java    | 16 +++++++++++++++-
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java
index daf6278..32df221 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java
@@ -180,7 +180,12 @@ public class KeyedStateInputFormat<K, N, OUT> extends RichInputFormat<OUT, KeyGr
 
 	@Override
 	public void close() throws IOException {
-		registry.close();
+		try {
+			operator.close();
+			registry.close();
+		} catch (Exception e) {
+			throw new IOException("Failed to close state backend", e);
+		}
 	}
 
 	@Override
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/operator/StateReaderOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/operator/StateReaderOperator.java
index eabc2f8..87c1c57 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/operator/StateReaderOperator.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/operator/StateReaderOperator.java
@@ -104,7 +104,21 @@ public abstract class StateReaderOperator<F extends Function, KEY, N, OUT> imple
 	}
 
 	public void close() throws Exception {
-		FunctionUtils.closeFunction(function);
+		Exception exception = null;
+
+		try {
+			FunctionUtils.closeFunction(function);
+		} catch (Exception e) {
+			// The state backend must always be closed
+			// to release native resources.
+			exception = e;
+		}
+
+		keyedStateBackend.dispose();
+
+		if (exception != null) {
+			throw exception;
+		}
 	}
 
 	@Override