You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/30 12:20:49 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-26917] Enforce Kubernetes HA during validation for last-state mode

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new d0f712c  [FLINK-26917] Enforce Kubernetes HA during validation for last-state mode
d0f712c is described below

commit d0f712c85a2bdf5eacb3c4a86f9d4309a72ed0ce
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Wed Mar 30 16:56:30 2022 +0800

    [FLINK-26917] Enforce Kubernetes HA during validation for last-state mode
---
 .../kubernetes/operator/reconciler/ReconciliationUtils.java  |  4 ++--
 .../flink/kubernetes/operator/service/FlinkService.java      |  3 +--
 .../apache/flink/kubernetes/operator/utils/FlinkUtils.java   |  8 ++++++++
 .../operator/validation/DefaultDeploymentValidator.java      | 12 ++++++------
 .../operator/validation/DeploymentValidatorTest.java         |  4 ++--
 5 files changed, 19 insertions(+), 12 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 0adc0a7..8fd27f2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -24,7 +24,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -135,7 +135,7 @@ public class ReconciliationUtils {
 
         return previousUpgradeMode != UpgradeMode.LAST_STATE
                 && currentUpgradeMode == UpgradeMode.LAST_STATE
-                && !HighAvailabilityMode.isHighAvailabilityModeActivated(lastReconciledFlinkConfig);
+                && !FlinkUtils.isKubernetesHAActivated(lastReconciledFlinkConfig);
     }
 
     private static boolean isJobUpgradeInProgress(FlinkDeployment current) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index f9d4b3a..2740c53 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -42,7 +42,6 @@ import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -93,7 +92,7 @@ public class FlinkService {
 
     public void submitApplicationCluster(FlinkDeployment deployment, Configuration conf)
             throws Exception {
-        if (HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
+        if (FlinkUtils.isKubernetesHAActivated(conf)) {
             final String clusterId =
                     Preconditions.checkNotNull(conf.get(KubernetesConfigOptions.CLUSTER_ID));
             final String namespace =
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index b57e0c1..8068802 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -19,7 +19,9 @@ package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
 import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
 import org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
@@ -261,4 +263,10 @@ public class FlinkUtils {
     private static boolean isJobGraphKey(Map.Entry<String, String> entry) {
         return entry.getKey().startsWith(Constants.JOB_GRAPH_STORE_KEY_PREFIX);
     }
+
+    public static boolean isKubernetesHAActivated(Configuration configuration) {
+        return configuration
+                .get(HighAvailabilityOptions.HA_MODE)
+                .equalsIgnoreCase(KubernetesHaServicesFactory.class.getCanonicalName());
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index 9228454..828e539 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -34,9 +34,9 @@ import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.kubernetes.utils.Constants;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
@@ -146,8 +146,9 @@ public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
 
         Configuration configuration = Configuration.fromMap(confMap);
         if (job.getUpgradeMode() == UpgradeMode.LAST_STATE
-                && !HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
-            return Optional.of("Job could not be upgraded with last-state while HA disabled");
+                && !FlinkUtils.isKubernetesHAActivated(configuration)) {
+            return Optional.of(
+                    "Job could not be upgraded with last-state while Kubernetes HA disabled");
         }
 
         if (StringUtils.isNullOrWhitespaceOnly(
@@ -182,10 +183,9 @@ public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
         if (replicas < 1) {
             return Optional.of("JobManager replicas should not be configured less than one.");
         } else if (replicas > 1
-                && !HighAvailabilityMode.isHighAvailabilityModeActivated(
-                        Configuration.fromMap(confMap))) {
+                && !FlinkUtils.isKubernetesHAActivated(Configuration.fromMap(confMap))) {
             return Optional.of(
-                    "High availability should be enabled when starting standby JobManagers.");
+                    "Kubernetes High availability should be enabled when starting standby JobManagers.");
         }
         return Optional.empty();
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index accce51..5f6a896 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -73,7 +73,7 @@ public class DeploymentValidatorTest {
                     dep.getSpec().setFlinkConfiguration(new HashMap<>());
                     dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
                 },
-                "Job could not be upgraded with last-state while HA disabled");
+                "Job could not be upgraded with last-state while Kubernetes HA disabled");
 
         testError(
                 dep -> {
@@ -145,7 +145,7 @@ public class DeploymentValidatorTest {
                     dep.getSpec().setFlinkConfiguration(new HashMap<>());
                     dep.getSpec().getJobManager().setReplicas(2);
                 },
-                "High availability should be enabled when starting standby JobManagers.");
+                "Kubernetes High availability should be enabled when starting standby JobManagers.");
         testError(
                 dep -> dep.getSpec().getJobManager().setReplicas(0),
                 "JobManager replicas should not be configured less than one.");