You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/06/14 11:38:48 UTC

[flink] 01/02: [FLINK:27808][ha] Allow "kubernetes" as HA_MODE

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fb05a7be9b828b7e582e75e4832443806fa4ff17
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri May 27 09:58:37 2022 +0200

    [FLINK:27808][ha] Allow "kubernetes" as HA_MODE
---
 .../content.zh/docs/deployment/ha/kubernetes_ha.md |  4 +--
 .../resource-providers/standalone/kubernetes.md    |  2 +-
 docs/content/docs/deployment/ha/kubernetes_ha.md   |  4 +--
 .../resource-providers/standalone/kubernetes.md    |  2 +-
 .../common_high_availability_section.html          |  2 +-
 .../generated/high_availability_configuration.html |  2 +-
 .../configuration/HighAvailabilityOptions.java     |  6 ++--
 ...HighAvailabilityRecoverFromSavepointITCase.java |  5 ++--
 .../factory/KubernetesJobManagerFactoryTest.java   |  6 ++--
 .../KubernetesJobManagerParametersTest.java        |  6 ++--
 .../HighAvailabilityServicesUtils.java             | 34 +++++++++++++++++++---
 .../runtime/jobmanager/HighAvailabilityMode.java   |  1 +
 12 files changed, 48 insertions(+), 26 deletions(-)

diff --git a/docs/content.zh/docs/deployment/ha/kubernetes_ha.md b/docs/content.zh/docs/deployment/ha/kubernetes_ha.md
index fb0cb0eb16a..b651a95579d 100644
--- a/docs/content.zh/docs/deployment/ha/kubernetes_ha.md
+++ b/docs/content.zh/docs/deployment/ha/kubernetes_ha.md
@@ -46,7 +46,7 @@ Kubernetes 高可用服务只能在部署到 Kubernetes 时使用。因此,当
 `high-availability` 选项必须设置为 `KubernetesHaServicesFactory`.
 
 ```yaml
-high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability: kubernetes
 ```
 
 - [high-availability.storageDir]({{< ref "docs/deployment/config" >}}#high-availability-storagedir) (必要的):
@@ -71,7 +71,7 @@ kubernetes.cluster-id: cluster1337
 
 ```yaml
 kubernetes.cluster-id: <cluster-id>
-high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability: kubernetes
 high-availability.storageDir: hdfs:///flink/recovery
 ```
 
diff --git a/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md b/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md
index f67be7f747f..e57ac7c3334 100644
--- a/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md
+++ b/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md
@@ -230,7 +230,7 @@ data:
   flink-conf.yaml: |+
   ...
     kubernetes.cluster-id: <cluster-id>
-    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability: kubernetes
     high-availability.storageDir: hdfs:///flink/recovery
     restart-strategy: fixed-delay
     restart-strategy.fixed-delay.attempts: 10
diff --git a/docs/content/docs/deployment/ha/kubernetes_ha.md b/docs/content/docs/deployment/ha/kubernetes_ha.md
index 550c3c792d0..a6e93cf4643 100644
--- a/docs/content/docs/deployment/ha/kubernetes_ha.md
+++ b/docs/content/docs/deployment/ha/kubernetes_ha.md
@@ -48,7 +48,7 @@ In order to start an HA-cluster you have to configure the following configuratio
 The `high-availability` option has to be set to `KubernetesHaServicesFactory`.
 
 ```yaml
-high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability: kubernetes
 ```
 
 - [high-availability.storageDir]({{< ref "docs/deployment/config" >}}#high-availability-storagedir) (required): 
@@ -73,7 +73,7 @@ Configure high availability mode in `conf/flink-conf.yaml`:
 
 ```yaml
 kubernetes.cluster-id: <cluster-id>
-high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability: kubernetes
 high-availability.storageDir: hdfs:///flink/recovery
 ```
 
diff --git a/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md b/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md
index 31c313edfdb..5c50b874f4a 100644
--- a/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md
+++ b/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md
@@ -213,7 +213,7 @@ data:
   flink-conf.yaml: |+
   ...
     kubernetes.cluster-id: <cluster-id>
-    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability: kubernetes
     high-availability.storageDir: hdfs:///flink/recovery
     restart-strategy: fixed-delay
     restart-strategy.fixed-delay.attempts: 10
diff --git a/docs/layouts/shortcodes/generated/common_high_availability_section.html b/docs/layouts/shortcodes/generated/common_high_availability_section.html
index 2f4b4e83878..d4a7463b6aa 100644
--- a/docs/layouts/shortcodes/generated/common_high_availability_section.html
+++ b/docs/layouts/shortcodes/generated/common_high_availability_section.html
@@ -12,7 +12,7 @@
             <td><h5>high-availability</h5></td>
             <td style="word-wrap: break-word;">"NONE"</td>
             <td>String</td>
-            <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.</td>
+            <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER", "KUBERNETES" or specify FQN of factory class.</td>
         </tr>
         <tr>
             <td><h5>high-availability.cluster-id</h5></td>
diff --git a/docs/layouts/shortcodes/generated/high_availability_configuration.html b/docs/layouts/shortcodes/generated/high_availability_configuration.html
index 324f3378fb1..1c93975923c 100644
--- a/docs/layouts/shortcodes/generated/high_availability_configuration.html
+++ b/docs/layouts/shortcodes/generated/high_availability_configuration.html
@@ -12,7 +12,7 @@
             <td><h5>high-availability</h5></td>
             <td style="word-wrap: break-word;">"NONE"</td>
             <td>String</td>
-            <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.</td>
+            <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER", "KUBERNETES" or specify FQN of factory class.</td>
         </tr>
         <tr>
             <td><h5>high-availability.cluster-id</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index b156c152d9e..97606c5679e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -33,8 +33,8 @@ public class HighAvailabilityOptions {
 
     /**
      * Defines high-availability mode used for the cluster execution. A value of "NONE" signals no
-     * highly available setup. To enable high-availability, set this mode to "ZOOKEEPER". Can also
-     * be set to FQN of HighAvailability factory class.
+     * highly available setup. To enable high-availability, set this mode to "ZOOKEEPER" or
+     * "KUBERNETES". Can also be set to FQN of HighAvailability factory class.
      */
     @Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY)
     public static final ConfigOption<String> HA_MODE =
@@ -44,7 +44,7 @@ public class HighAvailabilityOptions {
                     .withDeprecatedKeys("recovery.mode")
                     .withDescription(
                             "Defines high-availability mode used for the cluster execution."
-                                    + " To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class.");
+                                    + " To enable high-availability, set this mode to \"ZOOKEEPER\", \"KUBERNETES\" or specify FQN of factory class.");
 
     /**
      * The ID of the Flink cluster, used to separate multiple Flink clusters Needs to be set for
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java
index 386d3115e07..679f5edc621 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.kubernetes.KubernetesExtension;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateBackend;
@@ -140,9 +141,7 @@ class KubernetesHighAvailabilityRecoverFromSavepointITCase {
     private static Configuration getConfiguration() {
         Configuration configuration = new Configuration();
         configuration.set(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
-        configuration.set(
-                HighAvailabilityOptions.HA_MODE,
-                KubernetesHaServicesFactory.class.getCanonicalName());
+        configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.KUBERNETES.name());
         try {
             temporaryPath = Files.createTempDirectory("haStorage");
         } catch (IOException e) {
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
index be6f6357346..302655dd5c3 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
 import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
 import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint;
-import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
 import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
@@ -38,6 +37,7 @@ import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
 import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import io.fabric8.kubernetes.api.model.ConfigMap;
 import io.fabric8.kubernetes.api.model.Container;
@@ -459,9 +459,7 @@ class KubernetesJobManagerFactoryTest extends KubernetesJobManagerTestBase {
 
     @Test
     void testSetJobManagerDeploymentReplicas() throws Exception {
-        flinkConfig.set(
-                HighAvailabilityOptions.HA_MODE,
-                KubernetesHaServicesFactory.class.getCanonicalName());
+        flinkConfig.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.KUBERNETES.name());
         flinkConfig.set(
                 KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, JOBMANAGER_REPLICAS);
         kubernetesJobManagerSpecification =
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
index 0a44f015b9c..067d8cd3b19 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.KubernetesTestBase;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
-import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
 import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.junit.jupiter.api.Test;
@@ -250,9 +250,7 @@ class KubernetesJobManagerParametersTest extends KubernetesTestBase {
 
     @Test
     void testGetReplicas() {
-        flinkConfig.set(
-                HighAvailabilityOptions.HA_MODE,
-                KubernetesHaServicesFactory.class.getCanonicalName());
+        flinkConfig.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.KUBERNETES.name());
         flinkConfig.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, 2);
         assertThat(kubernetesJobManagerParameters.getReplicas()).isEqualTo(2);
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index b54b86dda23..c5155682b28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -68,6 +68,12 @@ public class HighAvailabilityServicesUtils {
             case ZOOKEEPER:
                 return createZooKeeperHaServices(config, executor, fatalErrorHandler);
 
+            case KUBERNETES:
+                return createCustomHAServices(
+                        "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory",
+                        config,
+                        executor);
+
             case FACTORY_CLASS:
                 return createCustomHAServices(config, executor);
 
@@ -129,6 +135,11 @@ public class HighAvailabilityServicesUtils {
                         resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);
             case ZOOKEEPER:
                 return createZooKeeperHaServices(configuration, executor, fatalErrorHandler);
+            case KUBERNETES:
+                return createCustomHAServices(
+                        "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory",
+                        configuration,
+                        executor);
 
             case FACTORY_CLASS:
                 return createCustomHAServices(configuration, executor);
@@ -152,6 +163,10 @@ public class HighAvailabilityServicesUtils {
                 return new ZooKeeperClientHAServices(
                         ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler),
                         configuration);
+            case KUBERNETES:
+                return createCustomClientHAServices(
+                        "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory",
+                        configuration);
             case FACTORY_CLASS:
                 return createCustomClientHAServices(configuration);
             default:
@@ -267,9 +282,15 @@ public class HighAvailabilityServicesUtils {
 
     private static HighAvailabilityServices createCustomHAServices(
             Configuration config, Executor executor) throws FlinkException {
+        return createCustomHAServices(
+                config.getString(HighAvailabilityOptions.HA_MODE), config, executor);
+    }
+
+    private static HighAvailabilityServices createCustomHAServices(
+            String factoryClassName, Configuration config, Executor executor)
+            throws FlinkException {
         final HighAvailabilityServicesFactory highAvailabilityServicesFactory =
-                loadCustomHighAvailabilityServicesFactory(
-                        config.getString(HighAvailabilityOptions.HA_MODE));
+                loadCustomHighAvailabilityServicesFactory(factoryClassName);
 
         try {
             return highAvailabilityServicesFactory.createHAServices(config, executor);
@@ -294,9 +315,14 @@ public class HighAvailabilityServicesUtils {
 
     private static ClientHighAvailabilityServices createCustomClientHAServices(Configuration config)
             throws FlinkException {
+        return createCustomClientHAServices(
+                config.getString(HighAvailabilityOptions.HA_MODE), config);
+    }
+
+    private static ClientHighAvailabilityServices createCustomClientHAServices(
+            String factoryClassName, Configuration config) throws FlinkException {
         final HighAvailabilityServicesFactory highAvailabilityServicesFactory =
-                loadCustomHighAvailabilityServicesFactory(
-                        config.getString(HighAvailabilityOptions.HA_MODE));
+                loadCustomHighAvailabilityServicesFactory(factoryClassName);
 
         try {
             return highAvailabilityServicesFactory.createClientHAServices(config);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
index 3f4137b1840..466ed60f4ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
@@ -35,6 +35,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 public enum HighAvailabilityMode {
     NONE(false),
     ZOOKEEPER(true),
+    KUBERNETES(true),
     FACTORY_CLASS(true);
 
     private final boolean haActive;