You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/12/19 21:16:27 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-30437][FLINK-30408] Harden HA meta check to avoid state loss

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new c4e76402 [FLINK-30437][FLINK-30408] Harden HA meta check to avoid state loss
c4e76402 is described below

commit c4e76402f02f05932c6446d97bdc3d60861b9b27
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Fri Dec 16 15:47:04 2022 +0100

    [FLINK-30437][FLINK-30408] Harden HA meta check to avoid state loss
---
 .../kubernetes/operator/utils/FlinkUtils.java      | 16 +++++-
 .../kubernetes/operator/utils/FlinkUtilsTest.java  | 67 +++++++++++++++++++++-
 2 files changed, 78 insertions(+), 5 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index e9600c08..221d0558 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -46,7 +46,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.function.Predicate;
 
 /** Flink Utility methods used by the operator. */
 public class FlinkUtils {
@@ -148,7 +147,20 @@ public class FlinkUtils {
                         .list()
                         .getItems();
 
-        return configMaps.stream().anyMatch(Predicate.not(ConfigMap::isMarkedForDeletion));
+        return configMaps.stream().anyMatch(FlinkUtils::isValidHaConfigMap);
+    }
+
+    private static boolean isValidHaConfigMap(ConfigMap cm) {
+        if (cm.isMarkedForDeletion()) {
+            return false;
+        }
+
+        var name = cm.getMetadata().getName();
+        if (name.endsWith("-config-map")) {
+            return !name.endsWith("-cluster-config-map");
+        }
+
+        return name.endsWith("-jobmanager-leader");
     }
 
     private static boolean isJobGraphKey(Map.Entry<String, String> entry) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index 4ec8ee05..ca7d0c39 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
@@ -41,6 +42,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -77,7 +79,7 @@ public class FlinkUtilsTest {
         final Map<String, String> data = new HashMap<>();
         data.put(Constants.JOB_GRAPH_STORE_KEY_PREFIX + JobID.generate(), "job-graph-data");
         data.put("leader", "localhost");
-        createHAConfigMapWithData(name, clusterId, data);
+        createHAConfigMapWithData(name, kubernetesClient.getNamespace(), clusterId, data);
         assertNotNull(kubernetesClient.configMaps().withName(name).get());
         assertEquals(2, kubernetesClient.configMaps().withName(name).get().getData().size());
 
@@ -89,6 +91,64 @@ public class FlinkUtilsTest {
                 kubernetesClient.configMaps().withName(name).get().getData().containsKey("leader"));
     }
 
+    @Test
+    public void haMetaDataCheckTest() {
+        var cr = TestUtils.buildApplicationCluster();
+        var confManager = new FlinkConfigManager(new Configuration());
+        assertFalse(
+                FlinkUtils.isHaMetadataAvailable(
+                        confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
+                        kubernetesClient));
+
+        // Flink 1.15+
+        createHAConfigMapWithData(
+                cr.getMetadata().getName() + "-cluster-config-map",
+                cr.getMetadata().getNamespace(),
+                cr.getMetadata().getName(),
+                null);
+        assertFalse(
+                FlinkUtils.isHaMetadataAvailable(
+                        confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
+                        kubernetesClient));
+
+        createHAConfigMapWithData(
+                cr.getMetadata().getName() + "-000000000000-config-map",
+                cr.getMetadata().getNamespace(),
+                cr.getMetadata().getName(),
+                null);
+        assertTrue(
+                FlinkUtils.isHaMetadataAvailable(
+                        confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
+                        kubernetesClient));
+
+        // Flink 1.13-1.14
+        kubernetesClient.configMaps().inAnyNamespace().delete();
+        assertFalse(
+                FlinkUtils.isHaMetadataAvailable(
+                        confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
+                        kubernetesClient));
+
+        createHAConfigMapWithData(
+                cr.getMetadata().getName() + "-dispatcher-leader",
+                cr.getMetadata().getNamespace(),
+                cr.getMetadata().getName(),
+                null);
+        assertFalse(
+                FlinkUtils.isHaMetadataAvailable(
+                        confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
+                        kubernetesClient));
+
+        createHAConfigMapWithData(
+                cr.getMetadata().getName() + "-000000000000-jobmanager-leader",
+                cr.getMetadata().getNamespace(),
+                cr.getMetadata().getName(),
+                null);
+        assertTrue(
+                FlinkUtils.isHaMetadataAvailable(
+                        confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
+                        kubernetesClient));
+    }
+
     @Test
     public void testDeleteJobGraphInKubernetesHAShouldNotUpdateWithEmptyConfigMap() {
         final String name = "empty-ha-configmap";
@@ -99,7 +159,7 @@ public class FlinkUtilsTest {
                 .withPath("/api/v1/namespaces/test/configmaps/" + name)
                 .andReturn(HttpURLConnection.HTTP_INTERNAL_ERROR, new ConfigMapBuilder().build())
                 .once();
-        createHAConfigMapWithData(name, clusterId, null);
+        createHAConfigMapWithData(name, kubernetesClient.getNamespace(), clusterId, null);
         assertTrue(kubernetesClient.configMaps().withName(name).get().getData().isEmpty());
         FlinkUtils.deleteJobGraphInKubernetesHA(
                 clusterId, kubernetesClient.getNamespace(), kubernetesClient);
@@ -122,11 +182,12 @@ public class FlinkUtilsTest {
     }
 
     private void createHAConfigMapWithData(
-            String configMapName, String clusterId, Map<String, String> data) {
+            String configMapName, String namespace, String clusterId, Map<String, String> data) {
         final ConfigMap kubernetesConfigMap =
                 new ConfigMapBuilder()
                         .withNewMetadata()
                         .withName(configMapName)
+                        .withNamespace(namespace)
                         .withLabels(
                                 KubernetesUtils.getConfigMapLabels(
                                         clusterId,