You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/06 17:06:23 UTC
[flink] branch release-1.10 updated:
[FLINK-16313][state-processor-api] Properly dispose of native resources
when closing input split
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 93d6324 [FLINK-16313][state-processor-api] Properly dispose of native resources when closing input split
93d6324 is described below
commit 93d6324ee0e3fb7c7975fb0c56280357adc66f24
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Fri Mar 6 09:49:47 2020 -0600
[FLINK-16313][state-processor-api] Properly dispose of native resources when closing input split
This closes #11335.
---
.../flink/state/api/input/KeyedStateInputFormat.java | 7 ++++++-
.../state/api/input/operator/StateReaderOperator.java | 16 +++++++++++++++-
2 files changed, 21 insertions(+), 2 deletions(-)
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java
index daf6278..32df221 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java
@@ -180,7 +180,12 @@ public class KeyedStateInputFormat<K, N, OUT> extends RichInputFormat<OUT, KeyGr
@Override
public void close() throws IOException {
- registry.close();
+ try {
+ operator.close();
+ registry.close();
+ } catch (Exception e) {
+ throw new IOException("Failed to close state backend", e);
+ }
}
@Override
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/operator/StateReaderOperator.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/operator/StateReaderOperator.java
index eabc2f8..87c1c57 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/operator/StateReaderOperator.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/operator/StateReaderOperator.java
@@ -104,7 +104,21 @@ public abstract class StateReaderOperator<F extends Function, KEY, N, OUT> imple
}
public void close() throws Exception {
- FunctionUtils.closeFunction(function);
+ Exception exception = null;
+
+ try {
+ FunctionUtils.closeFunction(function);
+ } catch (Exception e) {
+ // The state backend must always be closed
+ // to release native resources.
+ exception = e;
+ }
+
+ keyedStateBackend.dispose();
+
+ if (exception != null) {
+ throw exception;
+ }
}
@Override