You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/22 08:27:57 UTC
[flink-statefun] 03/04: [FLINK-19329] Check for the existence of
managed resources during dispose
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit a08141e58f2918d8659cf52f035d2660df4063f2
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Mon Sep 21 16:15:00 2020 +0200
[FLINK-19329] Check for the existence of managed resources during dispose
This closes #158.
---
.../flink/statefun/flink/core/functions/FunctionGroupOperator.java | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index 9cc0818..381a672 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -184,6 +184,12 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
}
private void closeOrDispose() {
+ final List<ManagingResources> managingResources = this.managingResources;
+ if (managingResources == null) {
+ // dispose can be called before state initialization was completed (for example a failure
+ // during initialization).
+ return;
+ }
for (ManagingResources withResources : managingResources) {
try {
withResources.shutdown();