You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/22 08:27:57 UTC

[flink-statefun] 03/04: [FLINK-19329] Check for the existence of managed resources during dispose

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit a08141e58f2918d8659cf52f035d2660df4063f2
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Mon Sep 21 16:15:00 2020 +0200

    [FLINK-19329] Check for the existence of managed resources during dispose
    
    This closes #158.
---
 .../flink/statefun/flink/core/functions/FunctionGroupOperator.java  | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
index 9cc0818..381a672 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java
@@ -184,6 +184,12 @@ public class FunctionGroupOperator extends AbstractStreamOperator<Message>
   }
 
   private void closeOrDispose() {
+    final List<ManagingResources> managingResources = this.managingResources;
+    if (managingResources == null) {
+      // dispose can be called before state initialization was completed (for example a failure
+      // during initialization).
+      return;
+    }
     for (ManagingResources withResources : managingResources) {
       try {
         withResources.shutdown();