You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/03/30 09:28:24 UTC

[flink-kubernetes-operator] branch release-0.1 updated: [FLINK-26916] Remove job graph from K8s HA ConfigMaps when submitting a Flink application cluster

This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/release-0.1 by this push:
     new 08183ef  [FLINK-26916] Remove job graph from K8s HA ConfigMaps when submitting a Flink application cluster
08183ef is described below

commit 08183effa8b6af67e05cf4271cf016a5c5c7ca91
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Wed Mar 30 15:36:50 2022 +0800

    [FLINK-26916] Remove job graph from K8s HA ConfigMaps when submitting a Flink application cluster
---
 e2e-tests/data/cr.yaml                             | 17 ++++++++
 e2e-tests/test_last_state_upgrade.sh               | 15 ++++++-
 e2e-tests/utils.sh                                 |  3 +-
 .../kubernetes/operator/service/FlinkService.java  | 10 +++++
 .../kubernetes/operator/utils/FlinkUtils.java      | 39 +++++++++++++++++
 .../kubernetes/operator/utils/FlinkUtilsTest.java  | 51 ++++++++++++++++++++--
 6 files changed, 130 insertions(+), 5 deletions(-)

diff --git a/e2e-tests/data/cr.yaml b/e2e-tests/data/cr.yaml
index f697296..6dc0f16 100644
--- a/e2e-tests/data/cr.yaml
+++ b/e2e-tests/data/cr.yaml
@@ -24,6 +24,11 @@ metadata:
 spec:
   image: flink:1.14.3
   flinkVersion: v1_14
+  ingress:
+    template: "/{{namespace}}/{{name}}(/|$)(.*)"
+    className: "nginx"
+    annotations:
+      nginx.ingress.kubernetes.io/rewrite-target: "/$2"
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
     high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
@@ -92,3 +97,15 @@ spec:
   resources:
     requests:
       storage: 1Gi
+
+---
+apiVersion: networking.k8s.io/v1
+kind: IngressClass
+metadata:
+  annotations:
+    ingressclass.kubernetes.io/is-default-class: "true"
+  labels:
+    app.kubernetes.io/component: controller
+  name: nginx
+spec:
+  controller: k8s.io/ingress-nginx
diff --git a/e2e-tests/test_last_state_upgrade.sh b/e2e-tests/test_last_state_upgrade.sh
index ce51209..dd1ffc9 100755
--- a/e2e-tests/test_last_state_upgrade.sh
+++ b/e2e-tests/test_last_state_upgrade.sh
@@ -44,6 +44,17 @@ function wait_for_jobmanager_running() {
     wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1
 }
 
+function assert_available_slots() {
+  expected=$1
+  ip=$(minikube ip)
+  actual=$(curl http://$ip/default/${CLUSTER_ID}/overview 2>/dev/null | grep -E -o '"slots-available":[0-9]+' | awk -F':' '{print $2}')
+  if [[ expected -ne actual ]]; then
+    echo "Expected available slots: $expected, actual: $actual"
+    exit 1
+  fi
+  echo "Successfully assert available slots"
+}
+
 on_exit cleanup_and_exit
 
 retry_times 5 30 "kubectl apply -f e2e-tests/data/cr.yaml" || exit 1
@@ -53,11 +64,12 @@ wait_for_jobmanager_running
 wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
 wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
 wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
+assert_available_slots 0
 
 job_id=$(kubectl logs $jm_pod_name | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}')
 
 # Update the FlinkDeployment and trigger the last state upgrade
-kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"jobManager": {"resource": {"cpu": 0.51, "memory": "1024m"} } } }'
+kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }'
 
 kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}"
 wait_for_jobmanager_running
@@ -67,6 +79,7 @@ wait_for_logs $jm_pod_name "Restoring job $job_id from Checkpoint" ${TIMEOUT} ||
 wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
 wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
 wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
+assert_available_slots 1
 
 echo "Successfully run the last-state upgrade test"
 
diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh
index e0d7068..8a6c181 100644
--- a/e2e-tests/utils.sh
+++ b/e2e-tests/utils.sh
@@ -102,7 +102,8 @@ function start_minikube {
     if ! retry_times 5 30 start_minikube_if_not_running; then
         echo "Could not start minikube. Aborting..."
         exit 1
-   fi
+    fi
+    minikube addons enable ingress
 }
 
 function start_minikube_if_not_running {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index c4aaf02..f9d4b3a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -42,6 +42,7 @@ import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -92,6 +93,15 @@ public class FlinkService {
 
     public void submitApplicationCluster(FlinkDeployment deployment, Configuration conf)
             throws Exception {
+        if (HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
+            final String clusterId =
+                    Preconditions.checkNotNull(conf.get(KubernetesConfigOptions.CLUSTER_ID));
+            final String namespace =
+                    Preconditions.checkNotNull(conf.get(KubernetesConfigOptions.NAMESPACE));
+            // Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g.
+            // parallelism) could take effect
+            FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient);
+        }
         LOG.info("Deploying application cluster");
         final ClusterClientServiceLoader clusterClientServiceLoader =
                 new DefaultClusterClientServiceLoader();
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index d45c6a6..b57e0c1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -23,12 +23,14 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
 import org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ConfigMapList;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodList;
 import io.fabric8.kubernetes.api.model.Service;
@@ -37,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
+import java.util.Map;
 
 import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
 
@@ -222,4 +225,40 @@ public class FlinkUtils {
                 .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId))
                 .list();
     }
+
+    public static void deleteJobGraphInKubernetesHA(
+            String clusterId, String namespace, KubernetesClient kubernetesClient) {
+        // The HA ConfigMap names have been changed from 1.15, so we use the labels to filter out
+        // them and delete job graph key
+        final Map<String, String> haConfigMapLabels =
+                KubernetesUtils.getConfigMapLabels(
+                        clusterId, Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+        final ConfigMapList configMaps =
+                kubernetesClient
+                        .configMaps()
+                        .inNamespace(namespace)
+                        .withLabels(haConfigMapLabels)
+                        .list();
+
+        configMaps
+                .getItems()
+                .forEach(
+                        configMap -> {
+                            final boolean isDeleted =
+                                    configMap
+                                            .getData()
+                                            .entrySet()
+                                            .removeIf(FlinkUtils::isJobGraphKey);
+                            if (isDeleted) {
+                                LOG.info(
+                                        "Job graph in ConfigMap {} is deleted",
+                                        configMap.getMetadata().getName());
+                            }
+                        });
+        kubernetesClient.resourceList(configMaps).inNamespace(namespace).createOrReplace();
+    }
+
+    private static boolean isJobGraphKey(Map.Entry<String, String> entry) {
+        return entry.getKey().startsWith(Constants.JOB_GRAPH_STORE_KEY_PREFIX);
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index 2822e94..514fd29 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -18,18 +18,33 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.Pod;
-import org.junit.Assert;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** FlinkUtilsTest. */
+@EnableKubernetesMockClient(crud = true)
 public class FlinkUtilsTest {
 
+    KubernetesClient kubernetesClient;
+
     @Test
     public void testMergePods() throws Exception {
 
@@ -48,7 +63,37 @@ public class FlinkUtilsTest {
 
         Pod mergedPod = FlinkUtils.mergePodTemplates(pod1, pod2);
 
-        Assert.assertEquals(pod2.getApiVersion(), mergedPod.getApiVersion());
-        Assert.assertEquals(pod2.getSpec().getContainers(), mergedPod.getSpec().getContainers());
+        assertEquals(pod2.getApiVersion(), mergedPod.getApiVersion());
+        assertEquals(pod2.getSpec().getContainers(), mergedPod.getSpec().getContainers());
+    }
+
+    @Test
+    public void testDeleteJobGraphInKubernetesHA() {
+        final String name = "ha-configmap";
+        final String clusterId = "cluster-id";
+        final Map<String, String> data = new HashMap<>();
+        data.put(Constants.JOB_GRAPH_STORE_KEY_PREFIX + JobID.generate(), "job-graph-data");
+        data.put("leader", "localhost");
+        final ConfigMap kubernetesConfigMap =
+                new ConfigMapBuilder()
+                        .withNewMetadata()
+                        .withName(name)
+                        .withLabels(
+                                KubernetesUtils.getConfigMapLabels(
+                                        clusterId,
+                                        Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+                        .endMetadata()
+                        .withData(data)
+                        .build();
+        kubernetesClient.configMaps().create(kubernetesConfigMap);
+        assertNotNull(kubernetesClient.configMaps().withName(name).get());
+        assertEquals(2, kubernetesClient.configMaps().withName(name).get().getData().size());
+
+        FlinkUtils.deleteJobGraphInKubernetesHA(
+                clusterId, kubernetesClient.getNamespace(), kubernetesClient);
+
+        assertEquals(1, kubernetesClient.configMaps().withName(name).get().getData().size());
+        assertTrue(
+                kubernetesClient.configMaps().withName(name).get().getData().containsKey("leader"));
     }
 }