You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/06/14 11:38:48 UTC
[flink] 01/02: [FLINK:27808][ha] Allow "kubernetes" as HA_MODE
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fb05a7be9b828b7e582e75e4832443806fa4ff17
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri May 27 09:58:37 2022 +0200
[FLINK:27808][ha] Allow "kubernetes" as HA_MODE
---
.../content.zh/docs/deployment/ha/kubernetes_ha.md | 4 +--
.../resource-providers/standalone/kubernetes.md | 2 +-
docs/content/docs/deployment/ha/kubernetes_ha.md | 4 +--
.../resource-providers/standalone/kubernetes.md | 2 +-
.../common_high_availability_section.html | 2 +-
.../generated/high_availability_configuration.html | 2 +-
.../configuration/HighAvailabilityOptions.java | 6 ++--
...HighAvailabilityRecoverFromSavepointITCase.java | 5 ++--
.../factory/KubernetesJobManagerFactoryTest.java | 6 ++--
.../KubernetesJobManagerParametersTest.java | 6 ++--
.../HighAvailabilityServicesUtils.java | 34 +++++++++++++++++++---
.../runtime/jobmanager/HighAvailabilityMode.java | 1 +
12 files changed, 48 insertions(+), 26 deletions(-)
diff --git a/docs/content.zh/docs/deployment/ha/kubernetes_ha.md b/docs/content.zh/docs/deployment/ha/kubernetes_ha.md
index fb0cb0eb16a..b651a95579d 100644
--- a/docs/content.zh/docs/deployment/ha/kubernetes_ha.md
+++ b/docs/content.zh/docs/deployment/ha/kubernetes_ha.md
@@ -46,7 +46,7 @@ Kubernetes 高可用服务只能在部署到 Kubernetes 时使用。因此,当
`high-availability` 选项必须设置为 `KubernetesHaServicesFactory`.
```yaml
-high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability: kubernetes
```
- [high-availability.storageDir]({{< ref "docs/deployment/config" >}}#high-availability-storagedir) (必要的):
@@ -71,7 +71,7 @@ kubernetes.cluster-id: cluster1337
```yaml
kubernetes.cluster-id: <cluster-id>
-high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability: kubernetes
high-availability.storageDir: hdfs:///flink/recovery
```
diff --git a/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md b/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md
index f67be7f747f..e57ac7c3334 100644
--- a/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md
+++ b/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md
@@ -230,7 +230,7 @@ data:
flink-conf.yaml: |+
...
kubernetes.cluster-id: <cluster-id>
- high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability: kubernetes
high-availability.storageDir: hdfs:///flink/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
diff --git a/docs/content/docs/deployment/ha/kubernetes_ha.md b/docs/content/docs/deployment/ha/kubernetes_ha.md
index 550c3c792d0..a6e93cf4643 100644
--- a/docs/content/docs/deployment/ha/kubernetes_ha.md
+++ b/docs/content/docs/deployment/ha/kubernetes_ha.md
@@ -48,7 +48,7 @@ In order to start an HA-cluster you have to configure the following configuratio
The `high-availability` option has to be set to `KubernetesHaServicesFactory`.
```yaml
-high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability: kubernetes
```
- [high-availability.storageDir]({{< ref "docs/deployment/config" >}}#high-availability-storagedir) (required):
@@ -73,7 +73,7 @@ Configure high availability mode in `conf/flink-conf.yaml`:
```yaml
kubernetes.cluster-id: <cluster-id>
-high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability: kubernetes
high-availability.storageDir: hdfs:///flink/recovery
```
diff --git a/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md b/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md
index 31c313edfdb..5c50b874f4a 100644
--- a/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md
+++ b/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md
@@ -213,7 +213,7 @@ data:
flink-conf.yaml: |+
...
kubernetes.cluster-id: <cluster-id>
- high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability: kubernetes
high-availability.storageDir: hdfs:///flink/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
diff --git a/docs/layouts/shortcodes/generated/common_high_availability_section.html b/docs/layouts/shortcodes/generated/common_high_availability_section.html
index 2f4b4e83878..d4a7463b6aa 100644
--- a/docs/layouts/shortcodes/generated/common_high_availability_section.html
+++ b/docs/layouts/shortcodes/generated/common_high_availability_section.html
@@ -12,7 +12,7 @@
<td><h5>high-availability</h5></td>
<td style="word-wrap: break-word;">"NONE"</td>
<td>String</td>
- <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.</td>
+ <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER", "KUBERNETES" or specify FQN of factory class.</td>
</tr>
<tr>
<td><h5>high-availability.cluster-id</h5></td>
diff --git a/docs/layouts/shortcodes/generated/high_availability_configuration.html b/docs/layouts/shortcodes/generated/high_availability_configuration.html
index 324f3378fb1..1c93975923c 100644
--- a/docs/layouts/shortcodes/generated/high_availability_configuration.html
+++ b/docs/layouts/shortcodes/generated/high_availability_configuration.html
@@ -12,7 +12,7 @@
<td><h5>high-availability</h5></td>
<td style="word-wrap: break-word;">"NONE"</td>
<td>String</td>
- <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.</td>
+ <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER", "KUBERNETES" or specify FQN of factory class.</td>
</tr>
<tr>
<td><h5>high-availability.cluster-id</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index b156c152d9e..97606c5679e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -33,8 +33,8 @@ public class HighAvailabilityOptions {
/**
* Defines high-availability mode used for the cluster execution. A value of "NONE" signals no
- * highly available setup. To enable high-availability, set this mode to "ZOOKEEPER". Can also
- * be set to FQN of HighAvailability factory class.
+ * highly available setup. To enable high-availability, set this mode to "ZOOKEEPER" or
+ * "KUBERNETES". Can also be set to FQN of HighAvailability factory class.
*/
@Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_MODE =
@@ -44,7 +44,7 @@ public class HighAvailabilityOptions {
.withDeprecatedKeys("recovery.mode")
.withDescription(
"Defines high-availability mode used for the cluster execution."
- + " To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class.");
+ + " To enable high-availability, set this mode to \"ZOOKEEPER\", \"KUBERNETES\" or specify FQN of factory class.");
/**
* The ID of the Flink cluster, used to separate multiple Flink clusters Needs to be set for
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java
index 386d3115e07..679f5edc621 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.kubernetes.KubernetesExtension;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateBackend;
@@ -140,9 +141,7 @@ class KubernetesHighAvailabilityRecoverFromSavepointITCase {
private static Configuration getConfiguration() {
Configuration configuration = new Configuration();
configuration.set(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
- configuration.set(
- HighAvailabilityOptions.HA_MODE,
- KubernetesHaServicesFactory.class.getCanonicalName());
+ configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.KUBERNETES.name());
try {
temporaryPath = Files.createTempDirectory("haStorage");
} catch (IOException e) {
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
index be6f6357346..302655dd5c3 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint;
-import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
@@ -38,6 +37,7 @@ import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.Container;
@@ -459,9 +459,7 @@ class KubernetesJobManagerFactoryTest extends KubernetesJobManagerTestBase {
@Test
void testSetJobManagerDeploymentReplicas() throws Exception {
- flinkConfig.set(
- HighAvailabilityOptions.HA_MODE,
- KubernetesHaServicesFactory.class.getCanonicalName());
+ flinkConfig.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.KUBERNETES.name());
flinkConfig.set(
KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, JOBMANAGER_REPLICAS);
kubernetesJobManagerSpecification =
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
index 0a44f015b9c..067d8cd3b19 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.KubernetesTestBase;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
-import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.FlinkRuntimeException;
import org.junit.jupiter.api.Test;
@@ -250,9 +250,7 @@ class KubernetesJobManagerParametersTest extends KubernetesTestBase {
@Test
void testGetReplicas() {
- flinkConfig.set(
- HighAvailabilityOptions.HA_MODE,
- KubernetesHaServicesFactory.class.getCanonicalName());
+ flinkConfig.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.KUBERNETES.name());
flinkConfig.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, 2);
assertThat(kubernetesJobManagerParameters.getReplicas()).isEqualTo(2);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index b54b86dda23..c5155682b28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -68,6 +68,12 @@ public class HighAvailabilityServicesUtils {
case ZOOKEEPER:
return createZooKeeperHaServices(config, executor, fatalErrorHandler);
+ case KUBERNETES:
+ return createCustomHAServices(
+ "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory",
+ config,
+ executor);
+
case FACTORY_CLASS:
return createCustomHAServices(config, executor);
@@ -129,6 +135,11 @@ public class HighAvailabilityServicesUtils {
resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);
case ZOOKEEPER:
return createZooKeeperHaServices(configuration, executor, fatalErrorHandler);
+ case KUBERNETES:
+ return createCustomHAServices(
+ "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory",
+ configuration,
+ executor);
case FACTORY_CLASS:
return createCustomHAServices(configuration, executor);
@@ -152,6 +163,10 @@ public class HighAvailabilityServicesUtils {
return new ZooKeeperClientHAServices(
ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler),
configuration);
+ case KUBERNETES:
+ return createCustomClientHAServices(
+ "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory",
+ configuration);
case FACTORY_CLASS:
return createCustomClientHAServices(configuration);
default:
@@ -267,9 +282,15 @@ public class HighAvailabilityServicesUtils {
private static HighAvailabilityServices createCustomHAServices(
Configuration config, Executor executor) throws FlinkException {
+ return createCustomHAServices(
+ config.getString(HighAvailabilityOptions.HA_MODE), config, executor);
+ }
+
+ private static HighAvailabilityServices createCustomHAServices(
+ String factoryClassName, Configuration config, Executor executor)
+ throws FlinkException {
final HighAvailabilityServicesFactory highAvailabilityServicesFactory =
- loadCustomHighAvailabilityServicesFactory(
- config.getString(HighAvailabilityOptions.HA_MODE));
+ loadCustomHighAvailabilityServicesFactory(factoryClassName);
try {
return highAvailabilityServicesFactory.createHAServices(config, executor);
@@ -294,9 +315,14 @@ public class HighAvailabilityServicesUtils {
private static ClientHighAvailabilityServices createCustomClientHAServices(Configuration config)
throws FlinkException {
+ return createCustomClientHAServices(
+ config.getString(HighAvailabilityOptions.HA_MODE), config);
+ }
+
+ private static ClientHighAvailabilityServices createCustomClientHAServices(
+ String factoryClassName, Configuration config) throws FlinkException {
final HighAvailabilityServicesFactory highAvailabilityServicesFactory =
- loadCustomHighAvailabilityServicesFactory(
- config.getString(HighAvailabilityOptions.HA_MODE));
+ loadCustomHighAvailabilityServicesFactory(factoryClassName);
try {
return highAvailabilityServicesFactory.createClientHAServices(config);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
index 3f4137b1840..466ed60f4ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
@@ -35,6 +35,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
public enum HighAvailabilityMode {
NONE(false),
ZOOKEEPER(true),
+ KUBERNETES(true),
FACTORY_CLASS(true);
private final boolean haActive;