You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/07 09:33:53 UTC

[flink-kubernetes-operator] branch main updated: [hotfix] Remove incorrect savepoint path check

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 637fcb9  [hotfix] Remove incorrect savepoint path check
637fcb9 is described below

commit 637fcb9f8f5afab5d43a30b050ea07f57b046bf8
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Mon Jun 6 13:35:27 2022 +0200

    [hotfix] Remove incorrect savepoint path check
---
 .../operator/validation/DefaultValidator.java      | 21 ---------------------
 .../operator/validation/DefaultValidatorTest.java  | 22 ----------------------
 2 files changed, 43 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index d4afbe4..54611fe 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -287,14 +287,6 @@ public class DefaultValidator implements FlinkResourceValidator {
         JobSpec oldJob = oldSpec.getJob();
         JobSpec newJob = newSpec.getJob();
         if (oldJob != null && newJob != null) {
-            if (oldJob.getState() == JobState.SUSPENDED
-                    && newJob.getState() == JobState.RUNNING
-                    && newJob.getUpgradeMode() == UpgradeMode.SAVEPOINT
-                    && (deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint()
-                            == null)) {
-                return Optional.of("Cannot perform savepoint restore without a valid savepoint");
-            }
-
             if (StringUtils.isNullOrWhitespaceOnly(
                             effectiveConfig.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()))
                     && deployment.getStatus().getJobManagerDeploymentStatus()
@@ -398,19 +390,6 @@ public class DefaultValidator implements FlinkResourceValidator {
             return Optional.empty();
         }
 
-        FlinkSessionJobSpec oldSpec =
-                sessionJob.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
-
-        JobSpec oldJob = oldSpec.getJob();
-        JobSpec newJob = newSpec.getJob();
-        if (oldJob.getState() == JobState.SUSPENDED
-                && newJob.getState() == JobState.RUNNING
-                && newJob.getUpgradeMode() == UpgradeMode.SAVEPOINT
-                && (sessionJob.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint()
-                        == null)) {
-            return Optional.of("Cannot perform savepoint restore without a valid savepoint");
-        }
-
         return Optional.empty();
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 312eaab..24af718 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -248,28 +248,6 @@ public class DefaultValidatorTest {
                     dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
                 });
 
-        testError(
-                dep -> {
-                    dep.setStatus(new FlinkDeploymentStatus());
-                    dep.getStatus().setJobStatus(new JobStatus());
-
-                    dep.getStatus()
-                            .setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
-                    FlinkDeploymentSpec spec = ReconciliationUtils.clone(dep.getSpec());
-                    spec.getJob().setState(JobState.SUSPENDED);
-                    dep.getStatus()
-                            .getReconciliationStatus()
-                            .serializeAndSetLastReconciledSpec(spec);
-
-                    dep.getSpec()
-                            .getFlinkConfiguration()
-                            .put(
-                                    CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
-                                    "file:///flink-data/savepoints");
-                    dep.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
-                },
-                "Cannot perform savepoint restore without a valid savepoint");
-
         // Test cluster type validation
         testError(
                 dep -> {