You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/25 07:22:41 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-26136] Extract shared deployment validation logic

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 85d997e  [FLINK-26136] Extract shared deployment validation logic
85d997e is described below

commit 85d997eaa0310a298bb3a3a704afacf3844635f7
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Feb 24 12:06:39 2022 +0100

    [FLINK-26136] Extract shared deployment validation logic
    
    Closes #24
---
 .../flink/kubernetes/operator/FlinkOperator.java   |   5 +
 .../controller/FlinkDeploymentController.java      |  11 ++
 .../operator/reconciler/JobReconciler.java         |  14 +-
 .../operator/reconciler/SessionReconciler.java     |   4 -
 .../kubernetes/operator/service/FlinkService.java  |   6 +-
 .../validation/DefaultDeploymentValidator.java     | 163 +++++++++++++++++++
 .../validation/FlinkDeploymentValidator.java       |  34 ++++
 .../flink/kubernetes/operator/TestUtils.java       |   7 +-
 .../controller/FlinkDeploymentControllerTest.java  |   2 +
 .../validation/DeploymentValidatorTest.java        | 176 +++++++++++++++++++++
 flink-kubernetes-webhook/pom.xml                   |  10 +-
 .../operator/admission/FlinkOperatorWebhook.java   |   4 +-
 ...eploymentValidator.java => FlinkValidator.java} |  24 +--
 13 files changed, 425 insertions(+), 35 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index cb05de3..7e8edf5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -26,6 +26,8 @@ import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
+import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
 
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.javaoperatorsdk.operator.Operator;
@@ -58,11 +60,14 @@ public class FlinkOperator {
         JobReconciler jobReconciler = new JobReconciler(client, flinkService);
         SessionReconciler sessionReconciler = new SessionReconciler(client, flinkService);
 
+        FlinkDeploymentValidator validator = new DefaultDeploymentValidator();
+
         FlinkDeploymentController controller =
                 new FlinkDeploymentController(
                         defaultConfig,
                         client,
                         namespace,
+                        validator,
                         observer,
                         jobReconciler,
                         sessionReconciler);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index a6aeb32..f97d5fd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
+import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -62,6 +63,7 @@ public class FlinkDeploymentController
 
     private final String operatorNamespace;
 
+    private final FlinkDeploymentValidator validator;
     private final JobStatusObserver observer;
     private final JobReconciler jobReconciler;
     private final SessionReconciler sessionReconciler;
@@ -71,12 +73,14 @@ public class FlinkDeploymentController
             DefaultConfig defaultConfig,
             KubernetesClient kubernetesClient,
             String operatorNamespace,
+            FlinkDeploymentValidator validator,
             JobStatusObserver observer,
             JobReconciler jobReconciler,
             SessionReconciler sessionReconciler) {
         this.defaultConfig = defaultConfig;
         this.kubernetesClient = kubernetesClient;
         this.operatorNamespace = operatorNamespace;
+        this.validator = validator;
         this.observer = observer;
         this.jobReconciler = jobReconciler;
         this.sessionReconciler = sessionReconciler;
@@ -99,6 +103,13 @@ public class FlinkDeploymentController
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
         LOG.info("Reconciling {}", flinkApp.getMetadata().getName());
 
+        Optional<String> validationError = validator.validate(flinkApp);
+        if (validationError.isPresent()) {
+            LOG.error("Reconciliation failed: " + validationError.get());
+            updateForReconciliationError(flinkApp, validationError.get());
+            return UpdateControl.updateStatus(flinkApp);
+        }
+
         Configuration effectiveConfig =
                 FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
         try {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 882c58a..dee22fb 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -60,9 +60,6 @@ public class JobReconciler {
                 flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
         JobSpec jobSpec = flinkApp.getSpec().getJob();
         if (lastReconciledSpec == null) {
-            if (!jobSpec.getState().equals(JobState.RUNNING)) {
-                throw new InvalidDeploymentException("Job must start in running state");
-            }
             deployFlinkJob(
                     flinkApp,
                     effectiveConfig,
@@ -74,9 +71,6 @@ public class JobReconciler {
 
         boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
         if (specChanged) {
-            if (lastReconciledSpec.getJob() == null) {
-                throw new InvalidDeploymentException("Cannot switch from session to job cluster");
-            }
             JobState currentJobState = lastReconciledSpec.getJob().getState();
             JobState desiredJobState = jobSpec.getState();
 
@@ -129,13 +123,7 @@ public class JobReconciler {
     private void restoreFromLastSavepoint(FlinkDeployment flinkApp, Configuration effectiveConfig)
             throws Exception {
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
-
-        String savepointLocation = jobStatus.getSavepointLocation();
-        if (savepointLocation == null) {
-            throw new InvalidDeploymentException(
-                    "Cannot perform stateful restore without a valid savepoint");
-        }
-        deployFlinkJob(flinkApp, effectiveConfig, Optional.of(savepointLocation));
+        deployFlinkJob(flinkApp, effectiveConfig, Optional.of(jobStatus.getSavepointLocation()));
     }
 
     private Optional<String> suspendJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index 2e36fc5..820e31f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -20,7 +20,6 @@ package org.apache.flink.kubernetes.operator.reconciler;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 
@@ -61,9 +60,6 @@ public class SessionReconciler {
         boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
 
         if (specChanged) {
-            if (lastReconciledSpec.getJob() != null) {
-                throw new InvalidDeploymentException("Cannot switch from job to session cluster");
-            }
             upgradeSessionCluster(flinkApp, effectiveConfig);
         }
     }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index d17ce1e..c8edf01 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
@@ -65,10 +66,11 @@ public class FlinkService {
         final ApplicationDeployer deployer =
                 new ApplicationClusterDeployer(clusterClientServiceLoader);
 
+        JobSpec jobSpec = deployment.getSpec().getJob();
         final ApplicationConfiguration applicationConfiguration =
                 new ApplicationConfiguration(
-                        deployment.getSpec().getJob().getArgs(),
-                        deployment.getSpec().getJob().getEntryClass());
+                        jobSpec.getArgs() != null ? jobSpec.getArgs() : new String[0],
+                        jobSpec.getEntryClass());
 
         deployer.run(conf, applicationConfiguration);
         LOG.info("Application cluster {} deployed", deployment.getMetadata().getName());
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
new file mode 100644
index 0000000..e090ec4
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.validation;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.Resource;
+import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Default validator implementation. */
+public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
+
+    private static final String[] FORBIDDEN_CONF_KEYS =
+            new String[] {"kubernetes.namespace", "kubernetes.cluster-id"};
+
+    @Override
+    public Optional<String> validate(FlinkDeployment deployment) {
+        FlinkDeploymentSpec spec = deployment.getSpec();
+        return firstPresent(
+                validateFlinkConfig(spec.getFlinkConfiguration()),
+                validateJobSpec(spec.getJob()),
+                validateJmSpec(spec.getJobManager()),
+                validateTmSpec(spec.getTaskManager()),
+                validateSpecChange(deployment));
+    }
+
+    private static Optional<String> firstPresent(Optional<String>... errOpts) {
+        for (Optional<String> opt : errOpts) {
+            if (opt.isPresent()) {
+                return opt;
+            }
+        }
+        return Optional.empty();
+    }
+
+    private Optional<String> validateFlinkConfig(Map<String, String> confMap) {
+        if (confMap == null) {
+            return Optional.empty();
+        }
+        Configuration conf = Configuration.fromMap(confMap);
+        for (String key : FORBIDDEN_CONF_KEYS) {
+            if (conf.containsKey(key)) {
+                return Optional.of("Forbidden Flink config key: " + key);
+            }
+        }
+        return Optional.empty();
+    }
+
+    private Optional<String> validateJobSpec(JobSpec job) {
+        if (job == null) {
+            return Optional.empty();
+        }
+
+        if (job.getParallelism() < 1) {
+            return Optional.of("Job parallelism must be larger than 0");
+        }
+
+        if (job.getJarURI() == null) {
+            return Optional.of("Jar URI must be defined");
+        }
+
+        return Optional.empty();
+    }
+
+    private Optional<String> validateJmSpec(JobManagerSpec jmSpec) {
+        if (jmSpec == null) {
+            return Optional.empty();
+        }
+
+        return validateResources("JobManager", jmSpec.getResource());
+    }
+
+    private Optional<String> validateTmSpec(TaskManagerSpec tmSpec) {
+        if (tmSpec == null) {
+            return Optional.empty();
+        }
+
+        return validateResources("TaskManager", tmSpec.getResource());
+    }
+
+    private Optional<String> validateResources(String component, Resource resource) {
+        if (resource == null) {
+            return Optional.empty();
+        }
+
+        String memory = resource.getMemory();
+        if (memory == null) {
+            return Optional.of(component + " resource memory must be defined.");
+        }
+
+        try {
+            MemorySize.parse(memory);
+        } catch (IllegalArgumentException iae) {
+            return Optional.of(component + " resource memory parse error: " + iae.getMessage());
+        }
+
+        return Optional.empty();
+    }
+
+    private Optional<String> validateSpecChange(FlinkDeployment deployment) {
+        FlinkDeploymentSpec newSpec = deployment.getSpec();
+
+        if (deployment.getStatus() == null
+                || deployment.getStatus().getReconciliationStatus() == null
+                || deployment.getStatus().getReconciliationStatus().getLastReconciledSpec()
+                        == null) {
+            // New deployment
+            if (newSpec.getJob() != null && !newSpec.getJob().getState().equals(JobState.RUNNING)) {
+                return Optional.of("Job must start in running state");
+            }
+
+            return Optional.empty();
+        }
+
+        FlinkDeploymentSpec oldSpec =
+                deployment.getStatus().getReconciliationStatus().getLastReconciledSpec();
+
+        if (newSpec.getJob() != null && oldSpec.getJob() == null) {
+            return Optional.of("Cannot switch from session to job cluster");
+        }
+
+        if (newSpec.getJob() == null && oldSpec.getJob() != null) {
+            return Optional.of("Cannot switch from job to session cluster");
+        }
+
+        JobSpec oldJob = oldSpec.getJob();
+        JobSpec newJob = newSpec.getJob();
+        if (oldJob != null && newJob != null) {
+            if (oldJob.getState() == JobState.SUSPENDED
+                    && newJob.getState() == JobState.RUNNING
+                    && newJob.getUpgradeMode() == UpgradeMode.SAVEPOINT
+                    && deployment.getStatus().getJobStatus().getSavepointLocation() == null) {
+                return Optional.of("Cannot perform savepoint restore without a valid savepoint");
+            }
+        }
+
+        return Optional.empty();
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
new file mode 100644
index 0000000..732224d
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkDeploymentValidator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.validation;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import java.util.Optional;
+
+/** Validator for {@link FlinkDeployment} resources. */
+public interface FlinkDeploymentValidator {
+
+    /**
+     * Validate and return optional error.
+     *
+     * @param deployment
+     * @return Optional error string, should be present iff validation resulted in an error
+     */
+    Optional<String> validate(FlinkDeployment deployment);
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 601f160..9ada6f8 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -65,7 +65,12 @@ public class TestUtils {
         FlinkDeployment deployment = buildSessionCluster();
         deployment
                 .getSpec()
-                .setJob(JobSpec.builder().jarURI(SAMPLE_JAR).state(JobState.RUNNING).build());
+                .setJob(
+                        JobSpec.builder()
+                                .jarURI(SAMPLE_JAR)
+                                .parallelism(1)
+                                .state(JobState.RUNNING)
+                                .build());
         return deployment;
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 6dae546..88fcfe2 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
@@ -189,6 +190,7 @@ public class FlinkDeploymentControllerTest {
                 FlinkUtils.loadDefaultConfig(),
                 null,
                 "test",
+                new DefaultDeploymentValidator(),
                 observer,
                 jobReconciler,
                 sessionReconciler);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
new file mode 100644
index 0000000..030d4e5
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.validation;
+
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Test deployment validation logic. */
+public class DeploymentValidatorTest {
+
+    private final DefaultDeploymentValidator validator = new DefaultDeploymentValidator();
+
+    @Test
+    public void testValidation() {
+        testSuccess(dep -> {});
+
+        // Test job validation
+        testError(dep -> dep.getSpec().getJob().setJarURI(null), "Jar URI must be defined");
+        testError(
+                dep -> dep.getSpec().getJob().setState(JobState.SUSPENDED),
+                "Job must start in running state");
+
+        testError(
+                dep -> dep.getSpec().getJob().setParallelism(0),
+                "Job parallelism must be larger than 0");
+        testError(
+                dep -> dep.getSpec().getJob().setParallelism(-1),
+                "Job parallelism must be larger than 0");
+
+        // Test conf validation
+        testSuccess(
+                dep ->
+                        dep.getSpec()
+                                .setFlinkConfiguration(
+                                        Collections.singletonMap("random", "config")));
+        testError(
+                dep ->
+                        dep.getSpec()
+                                .setFlinkConfiguration(
+                                        Collections.singletonMap(
+                                                KubernetesConfigOptions.NAMESPACE.key(), "myns")),
+                "Forbidden Flink config key");
+
+        // Test resource validation
+        testSuccess(dep -> dep.getSpec().getTaskManager().getResource().setMemory("1G"));
+        testSuccess(dep -> dep.getSpec().getTaskManager().getResource().setMemory("100"));
+
+        testError(
+                dep -> dep.getSpec().getTaskManager().getResource().setMemory("invalid"),
+                "TaskManager resource memory parse error");
+        testError(
+                dep -> dep.getSpec().getJobManager().getResource().setMemory("invalid"),
+                "JobManager resource memory parse error");
+
+        testError(
+                dep -> dep.getSpec().getTaskManager().getResource().setMemory(null),
+                "TaskManager resource memory must be defined");
+        testError(
+                dep -> dep.getSpec().getJobManager().getResource().setMemory(null),
+                "JobManager resource memory must be defined");
+
+        // Test savepoint restore validation
+        testSuccess(
+                dep -> {
+                    dep.setStatus(new FlinkDeploymentStatus());
+                    dep.getStatus().setJobStatus(new JobStatus());
+                    dep.getStatus().getJobStatus().setSavepointLocation("sp");
+
+                    dep.getStatus().setReconciliationStatus(new ReconciliationStatus());
+                    dep.getStatus()
+                            .getReconciliationStatus()
+                            .setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+                    dep.getStatus()
+                            .getReconciliationStatus()
+                            .getLastReconciledSpec()
+                            .getJob()
+                            .setState(JobState.SUSPENDED);
+
+                    dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+                });
+
+        testError(
+                dep -> {
+                    dep.setStatus(new FlinkDeploymentStatus());
+                    dep.getStatus().setJobStatus(new JobStatus());
+
+                    dep.getStatus().setReconciliationStatus(new ReconciliationStatus());
+                    dep.getStatus()
+                            .getReconciliationStatus()
+                            .setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+                    dep.getStatus()
+                            .getReconciliationStatus()
+                            .getLastReconciledSpec()
+                            .getJob()
+                            .setState(JobState.SUSPENDED);
+
+                    dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+                },
+                "Cannot perform savepoint restore without a valid savepoint");
+
+        // Test cluster type validation
+        testError(
+                dep -> {
+                    dep.setStatus(new FlinkDeploymentStatus());
+                    dep.getStatus().setJobStatus(new JobStatus());
+
+                    dep.getStatus().setReconciliationStatus(new ReconciliationStatus());
+                    dep.getStatus()
+                            .getReconciliationStatus()
+                            .setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+                    dep.getSpec().setJob(null);
+                },
+                "Cannot switch from job to session cluster");
+
+        testError(
+                dep -> {
+                    dep.setStatus(new FlinkDeploymentStatus());
+                    dep.getStatus().setJobStatus(new JobStatus());
+
+                    dep.getStatus().setReconciliationStatus(new ReconciliationStatus());
+                    dep.getStatus()
+                            .getReconciliationStatus()
+                            .setLastReconciledSpec(TestUtils.clone(dep.getSpec()));
+                    dep.getStatus().getReconciliationStatus().getLastReconciledSpec().setJob(null);
+                },
+                "Cannot switch from session to job cluster");
+    }
+
+    private void testSuccess(Consumer<FlinkDeployment> deploymentModifier) {
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        deploymentModifier.accept(deployment);
+        validator.validate(deployment).ifPresent(Assert::fail);
+    }
+
+    private void testError(Consumer<FlinkDeployment> deploymentModifier, String expectedErr) {
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        deploymentModifier.accept(deployment);
+        Optional<String> error = validator.validate(deployment);
+        if (error.isPresent()) {
+            assertTrue(error.get(), error.get().startsWith(expectedErr));
+        } else {
+            fail("Did not get expected error: " + expectedErr);
+        }
+    }
+}
diff --git a/flink-kubernetes-webhook/pom.xml b/flink-kubernetes-webhook/pom.xml
index 7f9a90c..0a82987 100644
--- a/flink-kubernetes-webhook/pom.xml
+++ b/flink-kubernetes-webhook/pom.xml
@@ -41,15 +41,17 @@ under the License.
                     <groupId>org.apache.flink</groupId>
                     <artifactId>*</artifactId>
                 </exclusion>
-                <exclusion>
-                    <groupId>org.takes</groupId>
-                    <artifactId>takes</artifactId>
-                </exclusion>
             </exclusions>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
             <artifactId>flink-shaded-netty</artifactId>
             <version>4.1.70.Final-${flink.shaded.version}</version>
         </dependency>
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
index 35cb046..c534273 100644
--- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.admission;
 
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
@@ -54,7 +55,8 @@ public class FlinkOperatorWebhook {
 
     public static void main(String[] args) throws Exception {
         LOG.info("Starting Flink Kubernetes Webhook");
-        AdmissionHandler endpoint = new AdmissionHandler(new FlinkDeploymentValidator());
+        AdmissionHandler endpoint =
+                new AdmissionHandler(new FlinkValidator(new DefaultDeploymentValidator()));
         ChannelInitializer<SocketChannel> initializer = createChannelInitializer(endpoint);
         NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
         NioEventLoopGroup workerGroup = new NioEventLoopGroup();
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkDeploymentValidator.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
similarity index 75%
rename from flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkDeploymentValidator.java
rename to flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
index 4df7d49..838e618 100644
--- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkDeploymentValidator.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
@@ -21,19 +21,26 @@ import org.apache.flink.kubernetes.operator.admission.admissioncontroller.NotAll
 import org.apache.flink.kubernetes.operator.admission.admissioncontroller.Operation;
 import org.apache.flink.kubernetes.operator.admission.admissioncontroller.validation.Validator;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
+
 /** Validator for FlinkDeployment creation and updates. */
-public class FlinkDeploymentValidator implements Validator<GenericKubernetesResource> {
-    private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentValidator.class);
+public class FlinkValidator implements Validator<GenericKubernetesResource> {
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkValidator.class);
     private static final ObjectMapper objectMapper = new ObjectMapper();
 
+    private final FlinkDeploymentValidator deploymentValidator;
+
+    public FlinkValidator(FlinkDeploymentValidator deploymentValidator) {
+        this.deploymentValidator = deploymentValidator;
+    }
+
     @Override
     public void validate(GenericKubernetesResource resource, Operation operation)
             throws NotAllowedException {
@@ -41,13 +48,10 @@ public class FlinkDeploymentValidator implements Validator<GenericKubernetesReso
 
         FlinkDeployment flinkDeployment =
                 objectMapper.convertValue(resource, FlinkDeployment.class);
-        FlinkDeploymentSpec spec = flinkDeployment.getSpec();
-        JobSpec job = spec.getJob();
 
-        if (job != null) {
-            if (job.getParallelism() < 1) {
-                throw new NotAllowedException("Job parallelism must be larger than 0");
-            }
+        Optional<String> validationError = deploymentValidator.validate(flinkDeployment);
+        if (validationError.isPresent()) {
+            throw new NotAllowedException(validationError.get());
         }
     }
 }