You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/03/30 09:28:11 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-26916] Remove job graph from K8s HA ConfigMaps when submitting a Flink application cluster
This is an automated email from the ASF dual-hosted git repository.
wangyang0918 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 48aded1 [FLINK-26916] Remove job graph from K8s HA ConfigMaps when submitting a Flink application cluster
48aded1 is described below
commit 48aded1b2fba62b1e339f761627d338b1bea07c2
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Wed Mar 30 15:36:50 2022 +0800
[FLINK-26916] Remove job graph from K8s HA ConfigMaps when submitting a Flink application cluster
---
e2e-tests/data/cr.yaml | 17 ++++++++
e2e-tests/test_last_state_upgrade.sh | 15 ++++++-
e2e-tests/utils.sh | 3 +-
.../kubernetes/operator/service/FlinkService.java | 10 +++++
.../kubernetes/operator/utils/FlinkUtils.java | 39 +++++++++++++++++
.../kubernetes/operator/utils/FlinkUtilsTest.java | 51 ++++++++++++++++++++--
6 files changed, 130 insertions(+), 5 deletions(-)
diff --git a/e2e-tests/data/cr.yaml b/e2e-tests/data/cr.yaml
index f697296..6dc0f16 100644
--- a/e2e-tests/data/cr.yaml
+++ b/e2e-tests/data/cr.yaml
@@ -24,6 +24,11 @@ metadata:
spec:
image: flink:1.14.3
flinkVersion: v1_14
+ ingress:
+ template: "/{{namespace}}/{{name}}(/|$)(.*)"
+ className: "nginx"
+ annotations:
+ nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
@@ -92,3 +97,15 @@ spec:
resources:
requests:
storage: 1Gi
+
+---
+apiVersion: networking.k8s.io/v1
+kind: IngressClass
+metadata:
+ annotations:
+ ingressclass.kubernetes.io/is-default-class: "true"
+ labels:
+ app.kubernetes.io/component: controller
+ name: nginx
+spec:
+ controller: k8s.io/ingress-nginx
diff --git a/e2e-tests/test_last_state_upgrade.sh b/e2e-tests/test_last_state_upgrade.sh
index ce51209..dd1ffc9 100755
--- a/e2e-tests/test_last_state_upgrade.sh
+++ b/e2e-tests/test_last_state_upgrade.sh
@@ -44,6 +44,17 @@ function wait_for_jobmanager_running() {
wait_for_logs $jm_pod_name "Rest endpoint listening at" ${TIMEOUT} || exit 1
}
+function assert_available_slots() {
+ expected=$1
+ ip=$(minikube ip)
+ actual=$(curl http://$ip/default/${CLUSTER_ID}/overview 2>/dev/null | grep -E -o '"slots-available":[0-9]+' | awk -F':' '{print $2}')
+ if [[ expected -ne actual ]]; then
+ echo "Expected available slots: $expected, actual: $actual"
+ exit 1
+ fi
+ echo "Successfully assert available slots"
+}
+
on_exit cleanup_and_exit
retry_times 5 30 "kubectl apply -f e2e-tests/data/cr.yaml" || exit 1
@@ -53,11 +64,12 @@ wait_for_jobmanager_running
wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
+assert_available_slots 0
job_id=$(kubectl logs $jm_pod_name | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}')
# Update the FlinkDeployment and trigger the last state upgrade
-kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"jobManager": {"resource": {"cpu": 0.51, "memory": "1024m"} } } }'
+kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }'
kubectl wait --for=delete pod --timeout=${TIMEOUT}s --selector="app=${CLUSTER_ID}"
wait_for_jobmanager_running
@@ -67,6 +79,7 @@ wait_for_logs $jm_pod_name "Restoring job $job_id from Checkpoint" ${TIMEOUT} ||
wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
+assert_available_slots 1
echo "Successfully run the last-state upgrade test"
diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh
index e0d7068..8a6c181 100644
--- a/e2e-tests/utils.sh
+++ b/e2e-tests/utils.sh
@@ -102,7 +102,8 @@ function start_minikube {
if ! retry_times 5 30 start_minikube_if_not_running; then
echo "Could not start minikube. Aborting..."
exit 1
- fi
+ fi
+ minikube addons enable ingress
}
function start_minikube_if_not_running {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index c4aaf02..f9d4b3a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -42,6 +42,7 @@ import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -92,6 +93,15 @@ public class FlinkService {
public void submitApplicationCluster(FlinkDeployment deployment, Configuration conf)
throws Exception {
+ if (HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
+ final String clusterId =
+ Preconditions.checkNotNull(conf.get(KubernetesConfigOptions.CLUSTER_ID));
+ final String namespace =
+ Preconditions.checkNotNull(conf.get(KubernetesConfigOptions.NAMESPACE));
+ // Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g.
+ // parallelism) could take effect
+ FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient);
+ }
LOG.info("Deploying application cluster");
final ClusterClientServiceLoader clusterClientServiceLoader =
new DefaultClusterClientServiceLoader();
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index d45c6a6..b57e0c1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -23,12 +23,14 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.operator.config.DefaultConfig;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ConfigMapList;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Service;
@@ -37,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
+import java.util.Map;
import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
@@ -222,4 +225,40 @@ public class FlinkUtils {
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId))
.list();
}
+
+ public static void deleteJobGraphInKubernetesHA(
+ String clusterId, String namespace, KubernetesClient kubernetesClient) {
+ // The HA ConfigMap names have been changed from 1.15, so we use the labels to filter out
+ // them and delete job graph key
+ final Map<String, String> haConfigMapLabels =
+ KubernetesUtils.getConfigMapLabels(
+ clusterId, Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+ final ConfigMapList configMaps =
+ kubernetesClient
+ .configMaps()
+ .inNamespace(namespace)
+ .withLabels(haConfigMapLabels)
+ .list();
+
+ configMaps
+ .getItems()
+ .forEach(
+ configMap -> {
+ final boolean isDeleted =
+ configMap
+ .getData()
+ .entrySet()
+ .removeIf(FlinkUtils::isJobGraphKey);
+ if (isDeleted) {
+ LOG.info(
+ "Job graph in ConfigMap {} is deleted",
+ configMap.getMetadata().getName());
+ }
+ });
+ kubernetesClient.resourceList(configMaps).inNamespace(namespace).createOrReplace();
+ }
+
+ private static boolean isJobGraphKey(Map.Entry<String, String> entry) {
+ return entry.getKey().startsWith(Constants.JOB_GRAPH_STORE_KEY_PREFIX);
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index 2822e94..514fd29 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -18,18 +18,33 @@
package org.apache.flink.kubernetes.operator.utils;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.Pod;
-import org.junit.Assert;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** FlinkUtilsTest. */
+@EnableKubernetesMockClient(crud = true)
public class FlinkUtilsTest {
+ KubernetesClient kubernetesClient;
+
@Test
public void testMergePods() throws Exception {
@@ -48,7 +63,37 @@ public class FlinkUtilsTest {
Pod mergedPod = FlinkUtils.mergePodTemplates(pod1, pod2);
- Assert.assertEquals(pod2.getApiVersion(), mergedPod.getApiVersion());
- Assert.assertEquals(pod2.getSpec().getContainers(), mergedPod.getSpec().getContainers());
+ assertEquals(pod2.getApiVersion(), mergedPod.getApiVersion());
+ assertEquals(pod2.getSpec().getContainers(), mergedPod.getSpec().getContainers());
+ }
+
+ @Test
+ public void testDeleteJobGraphInKubernetesHA() {
+ final String name = "ha-configmap";
+ final String clusterId = "cluster-id";
+ final Map<String, String> data = new HashMap<>();
+ data.put(Constants.JOB_GRAPH_STORE_KEY_PREFIX + JobID.generate(), "job-graph-data");
+ data.put("leader", "localhost");
+ final ConfigMap kubernetesConfigMap =
+ new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(name)
+ .withLabels(
+ KubernetesUtils.getConfigMapLabels(
+ clusterId,
+ Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+ .endMetadata()
+ .withData(data)
+ .build();
+ kubernetesClient.configMaps().create(kubernetesConfigMap);
+ assertNotNull(kubernetesClient.configMaps().withName(name).get());
+ assertEquals(2, kubernetesClient.configMaps().withName(name).get().getData().size());
+
+ FlinkUtils.deleteJobGraphInKubernetesHA(
+ clusterId, kubernetesClient.getNamespace(), kubernetesClient);
+
+ assertEquals(1, kubernetesClient.configMaps().withName(name).get().getData().size());
+ assertTrue(
+ kubernetesClient.configMaps().withName(name).get().getData().containsKey("leader"));
}
}