You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/03/01 11:25:16 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-26405] Improve the check of HA mode

This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 4df566d  [FLINK-26405] Improve the check of HA mode
4df566d is described below

commit 4df566d22092e7bfc271d381363bebade07e29d0
Author: bgeng777 <ge...@alibaba-inc.com>
AuthorDate: Tue Mar 1 17:55:20 2022 +0800

    [FLINK-26405] Improve the check of HA mode
---
 .../operator/validation/DefaultDeploymentValidator.java     | 13 +++++++------
 .../operator/validation/DeploymentValidatorTest.java        |  8 ++++----
 2 files changed, 11 insertions(+), 10 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index 997e109..2a385be 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -18,7 +18,6 @@
 package org.apache.flink.kubernetes.operator.validation;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
@@ -28,6 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.Resource;
 import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import java.util.Map;
 import java.util.Optional;
@@ -99,12 +99,13 @@ public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
 
     private Optional<String> validateJmReplicas(
             String component, int replicas, Map<String, String> confMap) {
-        if (!confMap.containsKey(HighAvailabilityOptions.HA_MODE.key()) && replicas != 1) {
+        if (replicas < 1) {
+            return Optional.of(component + " replicas should not be configured less than one.");
+        } else if (replicas > 1
+                && !HighAvailabilityMode.isHighAvailabilityModeActivated(
+                        Configuration.fromMap(confMap))) {
             return Optional.of(
-                    component
-                            + " replicas should be 1 when "
-                            + HighAvailabilityOptions.HA_MODE.key()
-                            + " is not set.");
+                    "High availability should be enabled when starting standby JobManagers.");
         }
         return Optional.empty();
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 38204f9..40fd1c7 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.kubernetes.operator.validation;
 
-import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
@@ -74,9 +73,10 @@ public class DeploymentValidatorTest {
                 "Forbidden Flink config key");
         testError(
                 dep -> dep.getSpec().getJobManager().setReplicas(2),
-                "JobManager replicas should be 1 when "
-                        + HighAvailabilityOptions.HA_MODE.key()
-                        + " is not set.");
+                "High availability should be enabled when starting standby JobManagers.");
+        testError(
+                dep -> dep.getSpec().getJobManager().setReplicas(0),
+                "JobManager replicas should not be configured less than one.");
 
         // Test resource validation
         testSuccess(dep -> dep.getSpec().getTaskManager().getResource().setMemory("1G"));