You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/07/06 06:31:43 UTC
[flink] 03/03: [hotfix][runtime] Adds shutdown logic for the watchExecutorService to the k8s HA services
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 08d0e6b2030c9f550e937e12558a64c515a45c1a
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Wed Jul 5 16:30:14 2023 +0200
[hotfix][runtime] Adds shutdown logic for the watchExecutorService to the k8s HA services
Signed-off-by: Matthias Pohl <ma...@aiven.io>
---
.../KubernetesMultipleComponentLeaderElectionHaServices.java | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java
index 1c232640e48..e481931ed0d 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesMultipleComponentLeaderElectionHaServices.java
@@ -37,6 +37,9 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -50,6 +53,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/** Kubernetes HA services that use a single leader election service per JobManager. */
public class KubernetesMultipleComponentLeaderElectionHaServices extends AbstractHaServices {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(KubernetesMultipleComponentLeaderElectionHaServices.class);
+
private final String clusterId;
private final FlinkKubeClient kubeClient;
@@ -178,6 +184,12 @@ public class KubernetesMultipleComponentLeaderElectionHaServices extends Abstrac
private void closeK8sServices() {
configMapSharedWatcher.close();
+ final int outstandingTaskCount = watchExecutorService.shutdownNow().size();
+ if (outstandingTaskCount != 0) {
+ LOG.debug(
+ "The k8s HA services were closed with {} event(s) still not being processed. No further action necessary.",
+ outstandingTaskCount);
+ }
}
@Override