You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/30 12:20:49 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-26917] Enforce Kubernetes HA during validation for last-state mode
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new d0f712c [FLINK-26917] Enforce Kubernetes HA during validation for last-state mode
d0f712c is described below
commit d0f712c85a2bdf5eacb3c4a86f9d4309a72ed0ce
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Wed Mar 30 16:56:30 2022 +0800
[FLINK-26917] Enforce Kubernetes HA during validation for last-state mode
---
.../kubernetes/operator/reconciler/ReconciliationUtils.java | 4 ++--
.../flink/kubernetes/operator/service/FlinkService.java | 3 +--
.../apache/flink/kubernetes/operator/utils/FlinkUtils.java | 8 ++++++++
.../operator/validation/DefaultDeploymentValidator.java | 12 ++++++------
.../operator/validation/DeploymentValidatorTest.java | 4 ++--
5 files changed, 19 insertions(+), 12 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 0adc0a7..8fd27f2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -24,7 +24,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -135,7 +135,7 @@ public class ReconciliationUtils {
return previousUpgradeMode != UpgradeMode.LAST_STATE
&& currentUpgradeMode == UpgradeMode.LAST_STATE
- && !HighAvailabilityMode.isHighAvailabilityModeActivated(lastReconciledFlinkConfig);
+ && !FlinkUtils.isKubernetesHAActivated(lastReconciledFlinkConfig);
}
private static boolean isJobUpgradeInProgress(FlinkDeployment current) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index f9d4b3a..2740c53 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -42,7 +42,6 @@ import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
@@ -93,7 +92,7 @@ public class FlinkService {
public void submitApplicationCluster(FlinkDeployment deployment, Configuration conf)
throws Exception {
- if (HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
+ if (FlinkUtils.isKubernetesHAActivated(conf)) {
final String clusterId =
Preconditions.checkNotNull(conf.get(KubernetesConfigOptions.CLUSTER_ID));
final String namespace =
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index b57e0c1..8068802 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -19,7 +19,9 @@ package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.operator.config.DefaultConfig;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
@@ -261,4 +263,10 @@ public class FlinkUtils {
private static boolean isJobGraphKey(Map.Entry<String, String> entry) {
return entry.getKey().startsWith(Constants.JOB_GRAPH_STORE_KEY_PREFIX);
}
+
+ public static boolean isKubernetesHAActivated(Configuration configuration) {
+ return configuration
+ .get(HighAvailabilityOptions.HA_MODE)
+ .equalsIgnoreCase(KubernetesHaServicesFactory.class.getCanonicalName());
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index 9228454..828e539 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -34,9 +34,9 @@ import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.utils.Constants;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.StringUtils;
import java.util.Map;
@@ -146,8 +146,9 @@ public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
Configuration configuration = Configuration.fromMap(confMap);
if (job.getUpgradeMode() == UpgradeMode.LAST_STATE
- && !HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
- return Optional.of("Job could not be upgraded with last-state while HA disabled");
+ && !FlinkUtils.isKubernetesHAActivated(configuration)) {
+ return Optional.of(
+ "Job could not be upgraded with last-state while Kubernetes HA disabled");
}
if (StringUtils.isNullOrWhitespaceOnly(
@@ -182,10 +183,9 @@ public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
if (replicas < 1) {
return Optional.of("JobManager replicas should not be configured less than one.");
} else if (replicas > 1
- && !HighAvailabilityMode.isHighAvailabilityModeActivated(
- Configuration.fromMap(confMap))) {
+ && !FlinkUtils.isKubernetesHAActivated(Configuration.fromMap(confMap))) {
return Optional.of(
- "High availability should be enabled when starting standby JobManagers.");
+ "Kubernetes High availability should be enabled when starting standby JobManagers.");
}
return Optional.empty();
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index accce51..5f6a896 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -73,7 +73,7 @@ public class DeploymentValidatorTest {
dep.getSpec().setFlinkConfiguration(new HashMap<>());
dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
},
- "Job could not be upgraded with last-state while HA disabled");
+ "Job could not be upgraded with last-state while Kubernetes HA disabled");
testError(
dep -> {
@@ -145,7 +145,7 @@ public class DeploymentValidatorTest {
dep.getSpec().setFlinkConfiguration(new HashMap<>());
dep.getSpec().getJobManager().setReplicas(2);
},
- "High availability should be enabled when starting standby JobManagers.");
+ "Kubernetes High availability should be enabled when starting standby JobManagers.");
testError(
dep -> dep.getSpec().getJobManager().setReplicas(0),
"JobManager replicas should not be configured less than one.");