You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/12/19 21:16:27 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-30437][FLINK-30408] Harden HA meta check to avoid state loss
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new c4e76402 [FLINK-30437][FLINK-30408] Harden HA meta check to avoid state loss
c4e76402 is described below
commit c4e76402f02f05932c6446d97bdc3d60861b9b27
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Fri Dec 16 15:47:04 2022 +0100
[FLINK-30437][FLINK-30408] Harden HA meta check to avoid state loss
---
.../kubernetes/operator/utils/FlinkUtils.java | 16 +++++-
.../kubernetes/operator/utils/FlinkUtilsTest.java | 67 +++++++++++++++++++++-
2 files changed, 78 insertions(+), 5 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index e9600c08..221d0558 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -46,7 +46,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.function.Predicate;
/** Flink Utility methods used by the operator. */
public class FlinkUtils {
@@ -148,7 +147,20 @@ public class FlinkUtils {
.list()
.getItems();
- return configMaps.stream().anyMatch(Predicate.not(ConfigMap::isMarkedForDeletion));
+ return configMaps.stream().anyMatch(FlinkUtils::isValidHaConfigMap);
+ }
+
+ private static boolean isValidHaConfigMap(ConfigMap cm) {
+ if (cm.isMarkedForDeletion()) {
+ return false;
+ }
+
+ var name = cm.getMetadata().getName();
+ if (name.endsWith("-config-map")) {
+ return !name.endsWith("-cluster-config-map");
+ }
+
+ return name.endsWith("-jobmanager-leader");
}
private static boolean isJobGraphKey(Map.Entry<String, String> entry) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index 4ec8ee05..ca7d0c39 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
@@ -41,6 +42,7 @@ import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -77,7 +79,7 @@ public class FlinkUtilsTest {
final Map<String, String> data = new HashMap<>();
data.put(Constants.JOB_GRAPH_STORE_KEY_PREFIX + JobID.generate(), "job-graph-data");
data.put("leader", "localhost");
- createHAConfigMapWithData(name, clusterId, data);
+ createHAConfigMapWithData(name, kubernetesClient.getNamespace(), clusterId, data);
assertNotNull(kubernetesClient.configMaps().withName(name).get());
assertEquals(2, kubernetesClient.configMaps().withName(name).get().getData().size());
@@ -89,6 +91,64 @@ public class FlinkUtilsTest {
kubernetesClient.configMaps().withName(name).get().getData().containsKey("leader"));
}
+ @Test
+ public void haMetaDataCheckTest() {
+ var cr = TestUtils.buildApplicationCluster();
+ var confManager = new FlinkConfigManager(new Configuration());
+ assertFalse(
+ FlinkUtils.isHaMetadataAvailable(
+ confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
+ kubernetesClient));
+
+ // Flink 1.15+
+ createHAConfigMapWithData(
+ cr.getMetadata().getName() + "-cluster-config-map",
+ cr.getMetadata().getNamespace(),
+ cr.getMetadata().getName(),
+ null);
+ assertFalse(
+ FlinkUtils.isHaMetadataAvailable(
+ confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
+ kubernetesClient));
+
+ createHAConfigMapWithData(
+ cr.getMetadata().getName() + "-000000000000-config-map",
+ cr.getMetadata().getNamespace(),
+ cr.getMetadata().getName(),
+ null);
+ assertTrue(
+ FlinkUtils.isHaMetadataAvailable(
+ confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
+ kubernetesClient));
+
+ // Flink 1.13-1.14
+ kubernetesClient.configMaps().inAnyNamespace().delete();
+ assertFalse(
+ FlinkUtils.isHaMetadataAvailable(
+ confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
+ kubernetesClient));
+
+ createHAConfigMapWithData(
+ cr.getMetadata().getName() + "-dispatcher-leader",
+ cr.getMetadata().getNamespace(),
+ cr.getMetadata().getName(),
+ null);
+ assertFalse(
+ FlinkUtils.isHaMetadataAvailable(
+ confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
+ kubernetesClient));
+
+ createHAConfigMapWithData(
+ cr.getMetadata().getName() + "-000000000000-jobmanager-leader",
+ cr.getMetadata().getNamespace(),
+ cr.getMetadata().getName(),
+ null);
+ assertTrue(
+ FlinkUtils.isHaMetadataAvailable(
+ confManager.getDeployConfig(cr.getMetadata(), cr.getSpec()),
+ kubernetesClient));
+ }
+
@Test
public void testDeleteJobGraphInKubernetesHAShouldNotUpdateWithEmptyConfigMap() {
final String name = "empty-ha-configmap";
@@ -99,7 +159,7 @@ public class FlinkUtilsTest {
.withPath("/api/v1/namespaces/test/configmaps/" + name)
.andReturn(HttpURLConnection.HTTP_INTERNAL_ERROR, new ConfigMapBuilder().build())
.once();
- createHAConfigMapWithData(name, clusterId, null);
+ createHAConfigMapWithData(name, kubernetesClient.getNamespace(), clusterId, null);
assertTrue(kubernetesClient.configMaps().withName(name).get().getData().isEmpty());
FlinkUtils.deleteJobGraphInKubernetesHA(
clusterId, kubernetesClient.getNamespace(), kubernetesClient);
@@ -122,11 +182,12 @@ public class FlinkUtilsTest {
}
private void createHAConfigMapWithData(
- String configMapName, String clusterId, Map<String, String> data) {
+ String configMapName, String namespace, String clusterId, Map<String, String> data) {
final ConfigMap kubernetesConfigMap =
new ConfigMapBuilder()
.withNewMetadata()
.withName(configMapName)
+ .withNamespace(namespace)
.withLabels(
KubernetesUtils.getConfigMapLabels(
clusterId,