You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/25 07:22:41 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-26136] Extract shared deployment validation logic
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 85d997e [FLINK-26136] Extract shared deployment validation logic
85d997e is described below
commit 85d997eaa0310a298bb3a3a704afacf3844635f7
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Feb 24 12:06:39 2022 +0100
[FLINK-26136] Extract shared deployment validation logic
Closes #24
---
.../flink/kubernetes/operator/FlinkOperator.java | 5 +
.../controller/FlinkDeploymentController.java | 11 ++
.../operator/reconciler/JobReconciler.java | 14 +-
.../operator/reconciler/SessionReconciler.java | 4 -
.../kubernetes/operator/service/FlinkService.java | 6 +-
.../validation/DefaultDeploymentValidator.java | 163 +++++++++++++++++++
.../validation/FlinkDeploymentValidator.java | 34 ++++
.../flink/kubernetes/operator/TestUtils.java | 7 +-
.../controller/FlinkDeploymentControllerTest.java | 2 +
.../validation/DeploymentValidatorTest.java | 176 +++++++++++++++++++++
flink-kubernetes-webhook/pom.xml | 10 +-
.../operator/admission/FlinkOperatorWebhook.java | 4 +-
...eploymentValidator.java => FlinkValidator.java} | 24 +--
13 files changed, 425 insertions(+), 35 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index cb05de3..7e8edf5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -26,6 +26,8 @@ import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
+import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.javaoperatorsdk.operator.Operator;
@@ -58,11 +60,14 @@ public class FlinkOperator {
JobReconciler jobReconciler = new JobReconciler(client, flinkService);
SessionReconciler sessionReconciler = new SessionReconciler(client, flinkService);
+ FlinkDeploymentValidator validator = new DefaultDeploymentValidator();
+
FlinkDeploymentController controller =
new FlinkDeploymentController(
defaultConfig,
client,
namespace,
+ validator,
observer,
jobReconciler,
sessionReconciler);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index a6aeb32..f97d5fd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
+import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -62,6 +63,7 @@ public class FlinkDeploymentController
private final String operatorNamespace;
+ private final FlinkDeploymentValidator validator;
private final JobStatusObserver observer;
private final JobReconciler jobReconciler;
private final SessionReconciler sessionReconciler;
@@ -71,12 +73,14 @@ public class FlinkDeploymentController
DefaultConfig defaultConfig,
KubernetesClient kubernetesClient,
String operatorNamespace,
+ FlinkDeploymentValidator validator,
JobStatusObserver observer,
JobReconciler jobReconciler,
SessionReconciler sessionReconciler) {
this.defaultConfig = defaultConfig;
this.kubernetesClient = kubernetesClient;
this.operatorNamespace = operatorNamespace;
+ this.validator = validator;
this.observer = observer;
this.jobReconciler = jobReconciler;
this.sessionReconciler = sessionReconciler;
@@ -99,6 +103,13 @@ public class FlinkDeploymentController
public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
LOG.info("Reconciling {}", flinkApp.getMetadata().getName());
+ Optional<String> validationError = validator.validate(flinkApp);
+ if (validationError.isPresent()) {
+ LOG.error("Reconciliation failed: " + validationError.get());
+ updateForReconciliationError(flinkApp, validationError.get());
+ return UpdateControl.updateStatus(flinkApp);
+ }
+
Configuration effectiveConfig =
FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
try {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 882c58a..dee22fb 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -60,9 +60,6 @@ public class JobReconciler {
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
JobSpec jobSpec = flinkApp.getSpec().getJob();
if (lastReconciledSpec == null) {
- if (!jobSpec.getState().equals(JobState.RUNNING)) {
- throw new InvalidDeploymentException("Job must start in running state");
- }
deployFlinkJob(
flinkApp,
effectiveConfig,
@@ -74,9 +71,6 @@ public class JobReconciler {
boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
if (specChanged) {
- if (lastReconciledSpec.getJob() == null) {
- throw new InvalidDeploymentException("Cannot switch from session to job cluster");
- }
JobState currentJobState = lastReconciledSpec.getJob().getState();
JobState desiredJobState = jobSpec.getState();
@@ -129,13 +123,7 @@ public class JobReconciler {
private void restoreFromLastSavepoint(FlinkDeployment flinkApp, Configuration effectiveConfig)
throws Exception {
JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
-
- String savepointLocation = jobStatus.getSavepointLocation();
- if (savepointLocation == null) {
- throw new InvalidDeploymentException(
- "Cannot perform stateful restore without a valid savepoint");
- }
- deployFlinkJob(flinkApp, effectiveConfig, Optional.of(savepointLocation));
+ deployFlinkJob(flinkApp, effectiveConfig, Optional.of(jobStatus.getSavepointLocation()));
}
private Optional<String> suspendJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index 2e36fc5..820e31f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -20,7 +20,6 @@ package org.apache.flink.kubernetes.operator.reconciler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
@@ -61,9 +60,6 @@ public class SessionReconciler {
boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
if (specChanged) {
- if (lastReconciledSpec.getJob() != null) {
- throw new InvalidDeploymentException("Cannot switch from job to session cluster");
- }
upgradeSessionCluster(flinkApp, effectiveConfig);
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index d17ce1e..c8edf01 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -65,10 +66,11 @@ public class FlinkService {
final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader);
+ JobSpec jobSpec = deployment.getSpec().getJob();
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
- deployment.getSpec().getJob().getArgs(),
- deployment.getSpec().getJob().getEntryClass());
+ jobSpec.getArgs() != null ? jobSpec.getArgs() : new String[0],
+ jobSpec.getEntryClass());
deployer.run(conf, applicationConfiguration);
LOG.info("Application cluster {} deployed", deployment.getMetadata().getName());
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
new file mode 100644
index 0000000..e090ec4
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.validation;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.Resource;
+import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Default validator implementation. */
+public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
+
+ private static final String[] FORBIDDEN_CONF_KEYS =
+ new String[] {"kubernetes.namespace", "kubernetes.cluster-id"};
+
+ @Override
+ public Optional<String> validate(FlinkDeployment deployment) {
+ FlinkDeploymentSpec spec = deployment.getSpec();
+ return firstPresent(
+ validateFlinkConfig(spec.getFlinkConfiguration()),
+ validateJobSpec(spec.getJob()),
+ validateJmSpec(spec.getJobManager()),
+ validateTmSpec(spec.getTaskManager()),
+ validateSpecChange(deployment));
+ }
+
+ private static Optional<String> firstPresent(Optional<String>... errOpts) {
+ for (Optional<String> opt : errOpts) {
+ if (opt.isPresent()) {
+ return opt;
+ }
+ }
+ return Optional.empty();
+ }
+
+ private Optional<String> validateFlinkConfig(Map<String, String> confMap) {
+ if (confMap == null) {
+ return Optional.empty();
+ }
+ Configuration conf = Configuration.fromMap(confMap);
+ for (String key : FORBIDDEN_CONF_KEYS) {
+ if (conf.containsKey(key)) {
+ return Optional.of("Forbidden Flink config key: " + key);
+ }
+ }
+ return Optional.empty();
+ }
+
+ private Optional<String> validateJobSpec(JobSpec job) {
+ if (job == null) {
+ return Optional.empty();
+ }
+
+ if (job.getParallelism() < 1) {
+ return Optional.of("Job parallelism must be larger than 0");
+ }
+
+ if (job.getJarURI() == null) {
+ return Optional.of("Jar URI must be defined");
+ }
+
+ return Optional.empty();
+ }
+
+ private Optional<String> validateJmSpec(JobManagerSpec jmSpec) {
+ if (jmSpec == null) {
+ return Optional.empty();
+ }
+
+ return validateResources("JobManager", jmSpec.getResource());
+ }
+
+ private Optional<String> validateTmSpec(TaskManagerSpec tmSpec) {
+ if (tmSpec == null) {
+ return Optional.empty();
+ }
+
+ return validateResources("TaskManager", tmSpec.getResource());
+ }
+
+ private Optional<String> validateResources(String component, Resource resource) {
+ if (resource == null) {
+ return Optional.empty();
+ }
+
+ String memory = resource.getMemory();
+ if (memory == null) {
+ return Optional.of(component + " resource memory must be defined.");
+ }
+
+ try {
+ MemorySize.parse(memory);
+ } catch (IllegalArgumentException iae) {
+ return Optional.of(component + " resource memory parse error: " + iae.getMessage());
+ }
+
+ return Optional.empty();
+ }
+
+ private Optional<String> validateSpecChange(FlinkDeployment deployment) {
+ FlinkDeploymentSpec newSpec = deployment.getSpec();
+
+ if (deployment.getStatus() == null
+ || deployment.getStatus().getReconciliationStatus() == null
+ || deployment.getStatus().getReconciliationStatus().getLastReconciledSpec()
+ == null) {
+ // New deployment
+ if (newSpec.getJob() != null && !newSpec.getJob().getState().equals(JobState.RUNNING)) {
+ return Optional.of("Job must start in running state");
+ }
+
+ return Optional.empty();
+ }
+
+ FlinkDeploymentSpec oldSpec =
+ deployment.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+ if (newSpec.getJob() != null && oldSpec.getJob() == null) {
+ return Optional.of("Cannot switch from session to job cluster");
+ }
+
+ if (newSpec.getJob() == null && oldSpec.getJob() != null) {
+ return Optional.of("Cannot switch from job to session cluster");
+ }
+
+ JobSpec oldJob = oldSpec.getJob();
+ JobSpec newJob = newSpec.getJob();
+ if (oldJob != null && newJob != null) {
+ if (oldJob.getState() == JobState.SUSPENDED
+ && newJob.getState() == JobState.RUNNING
+ && newJob.getUpgradeMode() == UpgradeMode.SAVEPOINT
+ && deployment.getStatus().getJobStatus().getSavepointLocation() == null) {
+ return Optional.of("Cannot perform savepoint restore without a valid savepoint");
+ }
+ }
+
+ return Optional.empty();
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
new file mode 100644
index 0000000..732224d
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.validation;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import java.util.Optional;
+
+/** Validator for {@link FlinkDeployment} resources. */
+public interface FlinkDeploymentValidator {
+
+ /**
+ * Validate and return optional error.
+ *
+ * @param deployment
+ * @return Optional error string, should be present iff validation resulted in an error
+ */
+ Optional<String> validate(FlinkDeployment deployment);
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 601f160..9ada6f8 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -65,7 +65,12 @@ public class TestUtils {
FlinkDeployment deployment = buildSessionCluster();
deployment
.getSpec()
- .setJob(JobSpec.builder().jarURI(SAMPLE_JAR).state(JobState.RUNNING).build());
+ .setJob(
+ JobSpec.builder()
+ .jarURI(SAMPLE_JAR)
+ .parallelism(1)
+ .state(JobState.RUNNING)
+ .build());
return deployment;
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 6dae546..88fcfe2 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
import org.apache.flink.runtime.client.JobStatusMessage;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
@@ -189,6 +190,7 @@ public class FlinkDeploymentControllerTest {
FlinkUtils.loadDefaultConfig(),
null,
"test",
+ new DefaultDeploymentValidator(),
observer,
jobReconciler,
sessionReconciler);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
new file mode 100644
index 0000000..030d4e5
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.validation;
+
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Test deployment validation logic. */
+public class DeploymentValidatorTest {
+
+ private final DefaultDeploymentValidator validator = new DefaultDeploymentValidator();
+
+ @Test
+ public void testValidation() {
+ testSuccess(dep -> {});
+
+ // Test job validation
+ testError(dep -> dep.getSpec().getJob().setJarURI(null), "Jar URI must be defined");
+ testError(
+ dep -> dep.getSpec().getJob().setState(JobState.SUSPENDED),
+ "Job must start in running state");
+
+ testError(
+ dep -> dep.getSpec().getJob().setParallelism(0),
+ "Job parallelism must be larger than 0");
+ testError(
+ dep -> dep.getSpec().getJob().setParallelism(-1),
+ "Job parallelism must be larger than 0");
+
+ // Test conf validation
+ testSuccess(
+ dep ->
+ dep.getSpec()
+ .setFlinkConfiguration(
+ Collections.singletonMap("random", "config")));
+ testError(
+ dep ->
+ dep.getSpec()
+ .setFlinkConfiguration(
+ Collections.singletonMap(
+ KubernetesConfigOptions.NAMESPACE.key(), "myns")),
+ "Forbidden Flink config key");
+
+ // Test resource validation
+ testSuccess(dep -> dep.getSpec().getTaskManager().getResource().setMemory("1G"));
+ testSuccess(dep -> dep.getSpec().getTaskManager().getResource().setMemory("100"));
+
+ testError(
+ dep -> dep.getSpec().getTaskManager().getResource().setMemory("invalid"),
+ "TaskManager resource memory parse error");
+ testError(
+ dep -> dep.getSpec().getJobManager().getResource().setMemory("invalid"),
+ "JobManager resource memory parse error");
+
+ testError(
+ dep -> dep.getSpec().getTaskManager().getResource().setMemory(null),
+ "TaskManager resource memory must be defined");
+ testError(
+ dep -> dep.getSpec().getJobManager().getResource().setMemory(null),
+ "JobManager resource memory must be defined");
+
+ // Test savepoint restore validation
+ testSuccess(
+ dep -> {
+ dep.setStatus(new FlinkDeploymentStatus());
+ dep.getStatus().setJobStatus(new JobStatus());
+ dep.getStatus().getJobStatus().setSavepointLocation("sp");
+
+ dep.getStatus().setReconciliationStatus(new ReconciliationStatus());
+ dep.getStatus()
+ .getReconciliationStatus()
+ .setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+ dep.getStatus()
+ .getReconciliationStatus()
+ .getLastReconciledSpec()
+ .getJob()
+ .setState(JobState.SUSPENDED);
+
+ dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+ });
+
+ testError(
+ dep -> {
+ dep.setStatus(new FlinkDeploymentStatus());
+ dep.getStatus().setJobStatus(new JobStatus());
+
+ dep.getStatus().setReconciliationStatus(new ReconciliationStatus());
+ dep.getStatus()
+ .getReconciliationStatus()
+ .setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+ dep.getStatus()
+ .getReconciliationStatus()
+ .getLastReconciledSpec()
+ .getJob()
+ .setState(JobState.SUSPENDED);
+
+ dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+ },
+ "Cannot perform savepoint restore without a valid savepoint");
+
+ // Test cluster type validation
+ testError(
+ dep -> {
+ dep.setStatus(new FlinkDeploymentStatus());
+ dep.getStatus().setJobStatus(new JobStatus());
+
+ dep.getStatus().setReconciliationStatus(new ReconciliationStatus());
+ dep.getStatus()
+ .getReconciliationStatus()
+ .setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+ dep.getSpec().setJob(null);
+ },
+ "Cannot switch from job to session cluster");
+
+ testError(
+ dep -> {
+ dep.setStatus(new FlinkDeploymentStatus());
+ dep.getStatus().setJobStatus(new JobStatus());
+
+ dep.getStatus().setReconciliationStatus(new ReconciliationStatus());
+ dep.getStatus()
+ .getReconciliationStatus()
+ .setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+ dep.getStatus().getReconciliationStatus().getLastReconciledSpec().setJob(null);
+ },
+ "Cannot switch from session to job cluster");
+ }
+
+ private void testSuccess(Consumer<FlinkDeployment> deploymentModifier) {
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ deploymentModifier.accept(deployment);
+ validator.validate(deployment).ifPresent(Assert::fail);
+ }
+
+ private void testError(Consumer<FlinkDeployment> deploymentModifier, String expectedErr) {
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ deploymentModifier.accept(deployment);
+ Optional<String> error = validator.validate(deployment);
+ if (error.isPresent()) {
+ assertTrue(error.get(), error.get().startsWith(expectedErr));
+ } else {
+ fail("Did not get expected error: " + expectedErr);
+ }
+ }
+}
diff --git a/flink-kubernetes-webhook/pom.xml b/flink-kubernetes-webhook/pom.xml
index 7f9a90c..0a82987 100644
--- a/flink-kubernetes-webhook/pom.xml
+++ b/flink-kubernetes-webhook/pom.xml
@@ -41,15 +41,17 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>*</artifactId>
</exclusion>
- <exclusion>
- <groupId>org.takes</groupId>
- <artifactId>takes</artifactId>
- </exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-netty</artifactId>
<version>4.1.70.Final-${flink.shaded.version}</version>
</dependency>
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
index 35cb046..c534273 100644
--- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.admission;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
@@ -54,7 +55,8 @@ public class FlinkOperatorWebhook {
public static void main(String[] args) throws Exception {
LOG.info("Starting Flink Kubernetes Webhook");
- AdmissionHandler endpoint = new AdmissionHandler(new FlinkDeploymentValidator());
+ AdmissionHandler endpoint =
+ new AdmissionHandler(new FlinkValidator(new DefaultDeploymentValidator()));
ChannelInitializer<SocketChannel> initializer = createChannelInitializer(endpoint);
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkDeploymentValidator.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
similarity index 75%
rename from flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkDeploymentValidator.java
rename to flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
index 4df7d49..838e618 100644
--- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkDeploymentValidator.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
@@ -21,19 +21,26 @@ import org.apache.flink.kubernetes.operator.admission.admissioncontroller.NotAll
import org.apache.flink.kubernetes.operator.admission.admissioncontroller.Operation;
import org.apache.flink.kubernetes.operator.admission.admissioncontroller.validation.Validator;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Optional;
+
/** Validator for FlinkDeployment creation and updates. */
-public class FlinkDeploymentValidator implements Validator<GenericKubernetesResource> {
- private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentValidator.class);
+public class FlinkValidator implements Validator<GenericKubernetesResource> {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkValidator.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
+ private final FlinkDeploymentValidator deploymentValidator;
+
+ public FlinkValidator(FlinkDeploymentValidator deploymentValidator) {
+ this.deploymentValidator = deploymentValidator;
+ }
+
@Override
public void validate(GenericKubernetesResource resource, Operation operation)
throws NotAllowedException {
@@ -41,13 +48,10 @@ public class FlinkDeploymentValidator implements Validator<GenericKubernetesReso
FlinkDeployment flinkDeployment =
objectMapper.convertValue(resource, FlinkDeployment.class);
- FlinkDeploymentSpec spec = flinkDeployment.getSpec();
- JobSpec job = spec.getJob();
- if (job != null) {
- if (job.getParallelism() < 1) {
- throw new NotAllowedException("Job parallelism must be larger than 0");
- }
+ Optional<String> validationError = deploymentValidator.validate(flinkDeployment);
+ if (validationError.isPresent()) {
+ throw new NotAllowedException(validationError.get());
}
}
}