You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/07/06 06:31:43 UTC

[flink] 03/03: [hotfix][runtime] Adds shutdown logic for the watchExecutorService to the k8s HA services

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 08d0e6b2030c9f550e937e12558a64c515a45c1a
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Wed Jul 5 16:30:14 2023 +0200

    [hotfix][runtime] Adds shutdown logic for the watchExecutorService to the k8s HA services
    
    Signed-off-by: Matthias Pohl <ma...@aiven.io>
---
 .../KubernetesMultipleComponentLeaderElectionHaServices.java | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java
index 1c232640e48..e481931ed0d 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java
@@ -37,6 +37,9 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -50,6 +53,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /** Kubernetes HA services that use a single leader election service per JobManager. */
 public class KubernetesMultipleComponentLeaderElectionHaServices extends AbstractHaServices {
 
+    private static final Logger LOG =
+            LoggerFactory.getLogger(KubernetesMultipleComponentLeaderElectionHaServices.class);
+
     private final String clusterId;
 
     private final FlinkKubeClient kubeClient;
@@ -178,6 +184,12 @@ public class KubernetesMultipleComponentLeaderElectionHaServices extends Abstrac
 
     private void closeK8sServices() {
         configMapSharedWatcher.close();
+        final int outstandingTaskCount = watchExecutorService.shutdownNow().size();
+        if (outstandingTaskCount != 0) {
+            LOG.debug(
+                    "The k8s HA services were closed with {} event(s) still not being processed. No further action necessary.",
+                    outstandingTaskCount);
+        }
     }
 
     @Override