You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2022/11/29 11:31:55 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-30222] Operator should handle 'kubernetes' as the 'high-availability' config key

This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 458022d2 [FLINK-30222] Operator should handle 'kubernetes' as the 'high-availability' config key
458022d2 is described below

commit 458022d2e67247c9941f102fb39d9dda96bd8837
Author: pvary <pe...@gmail.com>
AuthorDate: Tue Nov 29 12:31:49 2022 +0100

    [FLINK-30222] Operator should handle 'kubernetes' as the 'high-availability' config key
    
    Co-authored-by: Peter Vary <pe...@apple.com>
---
 .../apache/flink/kubernetes/operator/utils/FlinkUtils.java   |  7 ++++---
 .../kubernetes/operator/validation/DefaultValidatorTest.java | 12 ++++++++++++
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index c17a2495..b438a14d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -160,9 +160,10 @@ public class FlinkUtils {
     }
 
     public static boolean isKubernetesHAActivated(Configuration configuration) {
-        return configuration
-                .get(HighAvailabilityOptions.HA_MODE)
-                .equalsIgnoreCase(KubernetesHaServicesFactory.class.getCanonicalName());
+        String haMode = configuration.get(HighAvailabilityOptions.HA_MODE);
+        return haMode.equalsIgnoreCase(KubernetesHaServicesFactory.class.getCanonicalName())
+                // Hardcoded config value should be removed when upgrading Flink dependency to 1.16
+                || haMode.equalsIgnoreCase("kubernetes");
     }
 
     public static boolean clusterShutdownDisabled(FlinkDeploymentSpec spec) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 80fb6af0..fe2e3162 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -416,6 +416,18 @@ public class DefaultValidatorTest {
                 "spec.serviceAccount must be defined. If you use helm, its value should be the same with the name of jobServiceAccount.");
 
         testSuccess(dep -> dep.getSpec().setServiceAccount("flink"));
+
+        testSuccess(
+                dep -> {
+                    dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+                    dep.getSpec()
+                            .getFlinkConfiguration()
+                            .put(
+                                    HighAvailabilityOptions.HA_MODE.key(),
+                                    // Hardcoded config value should be removed when upgrading Flink
+                                    // dependency to 1.16
+                                    "kubernetes");
+                });
     }
 
     @Test