You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/12/15 15:08:06 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-28875] Add FlinkSessionJobControllerTest

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new c8ed11df [FLINK-28875] Add FlinkSessionJobControllerTest
c8ed11df is described below

commit c8ed11dfde1e0fc57d8fa45b0121ecec362bd03d
Author: pvary <pe...@gmail.com>
AuthorDate: Thu Dec 15 16:08:01 2022 +0100

    [FLINK-28875] Add FlinkSessionJobControllerTest
---
 .../sessionjob/FlinkSessionJobObserver.java        |   2 +-
 .../sessionjob/SessionJobReconciler.java           |   9 +-
 .../kubernetes/operator/TestingFlinkService.java   |   4 +
 .../controller/FlinkDeploymentControllerTest.java  |  45 ++-
 .../controller/FlinkSessionJobControllerTest.java  | 444 ++++++++++++++++++++-
 5 files changed, 488 insertions(+), 16 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
index fe1874fd..e23458ff 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
@@ -184,7 +184,7 @@ public class FlinkSessionJobObserver
             Preconditions.checkArgument(
                     matchedList.size() <= 1,
                     String.format(
-                            "Expected one job for JobID: %s, but %d founded",
+                            "Expected one job for JobID: %s, but found %d",
                             status.getJobId(), matchedList.size()));
 
             if (matchedList.size() == 0) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index 4b9d6aad..ad5c5735 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -24,7 +24,6 @@ import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.api.status.JobStatus;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -107,12 +106,8 @@ public class SessionJobReconciler
         var jobID =
                 flinkService.submitJobToSessionCluster(
                         cr.getMetadata(), sessionJobSpec, deployConfig, savepoint.orElse(null));
-        status.setJobStatus(
-                new JobStatus()
-                        .toBuilder()
-                        .jobId(jobID.toHexString())
-                        .state(org.apache.flink.api.common.JobStatus.RECONCILING.name())
-                        .build());
+        status.getJobStatus().setJobId(jobID.toHexString());
+        status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 97175b15..9f654409 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -135,6 +135,10 @@ public class TestingFlinkService extends AbstractFlinkService {
         savepointCounter = 0;
     }
 
+    public void clearJobsInTerminalState() {
+        jobs.removeIf(job -> job.f1.getJobState().isTerminalState());
+    }
+
     public Set<String> getSessions() {
         return sessions;
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index d00f9ffd..9772ab04 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -683,7 +683,7 @@ public class FlinkDeploymentControllerTest {
     }
 
     @Test
-    public void verifyReconcileWithAChangedOperatorMode() throws Exception {
+    public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception {
 
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
         UpdateControl<FlinkDeployment> updateControl;
@@ -712,6 +712,12 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertTrue(
+                appCluster
+                        .getStatus()
+                        .getError()
+                        .contains("Cannot switch from job to session cluster"));
+
         assertNotNull(ReconciliationUtils.getDeployedSpec(appCluster).getJob());
         // Verify jobStatus is running
         jobStatus = appCluster.getStatus().getJobStatus();
@@ -721,6 +727,43 @@ public class FlinkDeploymentControllerTest {
         assertEquals(expectedJobStatus.getJobState().toString(), jobStatus.getState());
     }
 
+    @Test
+    public void verifyReconcileWithAChangedOperatorModeToApplication() throws Exception {
+
+        FlinkDeployment appCluster = TestUtils.buildSessionCluster();
+        UpdateControl<FlinkDeployment> updateControl;
+
+        updateControl = testController.reconcile(appCluster, context);
+        assertFalse(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        updateControl = testController.reconcile(appCluster, context);
+        JobStatus jobStatus = appCluster.getStatus().getJobStatus();
+        assertFalse(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        // jobStatus has not been set at this time
+        assertNull(jobStatus.getState());
+
+        // Switches operator mode to APPLICATION
+        appCluster.getSpec().setJob(TestUtils.buildSessionJob().getSpec().getJob());
+        // Validation fails and JobObserver should still be used
+        updateControl = testController.reconcile(appCluster, context);
+        assertFalse(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertTrue(
+                appCluster
+                        .getStatus()
+                        .getError()
+                        .contains("Cannot switch from session to job cluster"));
+        assertNull(ReconciliationUtils.getDeployedSpec(appCluster).getJob());
+    }
+
     private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Exception {
         flinkService.clear();
         testController.reconcile(appCluster, context);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
index f7f5c8ec..e5f91aea 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
@@ -17,44 +17,67 @@
 
 package org.apache.flink.kubernetes.operator.controller;
 
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.spec.JobState;
+import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus;
+import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.kubernetes.operator.utils.EventRecorder.Reason.ValidationError;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 /** {@link FlinkSessionJobController} tests. */
 @EnableKubernetesMockClient(crud = true)
 class FlinkSessionJobControllerTest {
     private KubernetesClient kubernetesClient;
     private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
+    private final Context context = TestUtils.createContextWithReadyFlinkDeployment();
+
     private TestingFlinkService flinkService = new TestingFlinkService();
     private TestingFlinkSessionJobController testController;
+    private FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
 
     @BeforeEach
     public void before() {
         flinkService = new TestingFlinkService();
         testController =
                 new TestingFlinkSessionJobController(configManager, kubernetesClient, flinkService);
-
-        kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
+        sessionJob = TestUtils.buildSessionJob();
+        kubernetesClient.resource(sessionJob).createOrReplace();
     }
 
     @Test
     public void testSubmitJobButException() {
-        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
-
         flinkService.setDeployFailure(true);
 
         try {
-            testController.reconcile(sessionJob, TestUtils.createContextWithReadyFlinkDeployment());
+            testController.reconcile(sessionJob, context);
         } catch (Exception e) {
             // Ignore
         }
@@ -64,9 +87,416 @@ class FlinkSessionJobControllerTest {
         Assertions.assertEquals(EventRecorder.Type.Warning.toString(), event.getType());
         Assertions.assertEquals("SessionJobException", event.getReason());
 
-        testController.cleanup(sessionJob, TestUtils.createContextWithReadyFlinkDeployment());
+        testController.cleanup(sessionJob, context);
+    }
+
+    @Test
+    public void verifyBasicReconcileLoop() throws Exception {
+        UpdateControl<FlinkSessionJob> updateControl;
+
+        assertEquals(
+                ReconciliationState.UPGRADING,
+                sessionJob.getStatus().getReconciliationStatus().getState());
+        assertNull(sessionJob.getStatus().getJobStatus().getState());
+
+        updateControl = testController.reconcile(sessionJob, context);
+
+        // Reconciling
+        assertEquals(
+                JobStatus.RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(3, testController.getInternalStatusUpdateCount());
+        assertFalse(updateControl.isUpdateStatus());
+        assertEquals(
+                Optional.of(
+                        configManager.getOperatorConfiguration().getReconcileInterval().toMillis()),
+                updateControl.getScheduleDelay());
+
+        // Validate reconciliation status
+        FlinkSessionJobReconciliationStatus reconciliationStatus =
+                sessionJob.getStatus().getReconciliationStatus();
+        assertNull(sessionJob.getStatus().getError());
+        assertEquals(sessionJob.getSpec(), reconciliationStatus.deserializeLastReconciledSpec());
+        assertNull(sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
+
+        // Running
+        updateControl = testController.reconcile(sessionJob, context);
+        assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(4, testController.getInternalStatusUpdateCount());
+        assertFalse(updateControl.isUpdateStatus());
+        assertEquals(
+                Optional.of(
+                        configManager.getOperatorConfiguration().getReconcileInterval().toMillis()),
+                updateControl.getScheduleDelay());
+
+        // Stable loop
+        updateControl = testController.reconcile(sessionJob, context);
+        assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(4, testController.getInternalStatusUpdateCount());
+        assertFalse(updateControl.isUpdateStatus());
+        assertEquals(
+                Optional.of(
+                        configManager.getOperatorConfiguration().getReconcileInterval().toMillis()),
+                updateControl.getScheduleDelay());
+
+        // Validate job status
+        var jobStatus = sessionJob.getStatus().getJobStatus();
+        JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
+        assertEquals(expectedJobStatus.getJobId().toHexString(), jobStatus.getJobId());
+        assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
+        assertEquals(expectedJobStatus.getJobState().toString(), jobStatus.getState());
+        assertEquals(
+                sessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec(),
+                sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
+
+        // Send in invalid update
+        sessionJob.getSpec().getJob().setParallelism(-1);
+        updateControl = testController.reconcile(sessionJob, context);
+
+        assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(5, testController.getInternalStatusUpdateCount());
+        assertFalse(updateControl.isUpdateStatus());
+
+        reconciliationStatus = sessionJob.getStatus().getReconciliationStatus();
+        assertTrue(
+                sessionJob
+                        .getStatus()
+                        .getError()
+                        .contains("Job parallelism must be larger than 0"));
+        assertNotNull(reconciliationStatus.deserializeLastReconciledSpec().getJob());
+
+        // Validate job status correct even with error
+        jobStatus = sessionJob.getStatus().getJobStatus();
+        expectedJobStatus = flinkService.listJobs().get(0).f1;
+        assertEquals(expectedJobStatus.getJobId().toHexString(), jobStatus.getJobId());
+        assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
+        assertEquals(expectedJobStatus.getJobState().toString(), jobStatus.getState());
+
+        // Validate last stable spec is still the old one
+        assertEquals(
+                sessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec(),
+                sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
+    }
+
+    @Test
+    public void verifyUpgradeFromSavepoint() throws Exception {
+        UpdateControl<FlinkDeployment> updateControl;
+
+        sessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+        sessionJob.getSpec().getJob().setInitialSavepointPath("s0");
+        testController.reconcile(sessionJob, context);
+        var jobs = flinkService.listJobs();
+        assertEquals(1, jobs.size());
+        assertEquals("s0", jobs.get(0).f0);
+
+        var previousJobs = new ArrayList<>(jobs);
+        sessionJob.getSpec().getJob().setInitialSavepointPath("s1");
+
+        // Send in a no-op change
+        testController.reconcile(sessionJob, context);
+        assertEquals(previousJobs, new ArrayList<>(flinkService.listJobs()));
+
+        // Upgrade job
+        assertTrue(
+                sessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .getSavepointInfo()
+                        .getSavepointHistory()
+                        .isEmpty());
+
+        sessionJob.getSpec().getJob().setParallelism(100);
+        updateControl = testController.reconcile(sessionJob, context);
+
+        assertEquals(0L, updateControl.getScheduleDelay().get());
+        assertEquals(
+                JobState.SUSPENDED,
+                sessionJob
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getJob()
+                        .getState());
+        assertEquals(
+                1,
+                sessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .getSavepointInfo()
+                        .getSavepointHistory()
+                        .size());
+
+        flinkService.clearJobsInTerminalState();
+
+        testController.reconcile(sessionJob, context);
+        jobs = flinkService.listJobs();
+        assertEquals(1, jobs.size());
+        assertEquals("savepoint_0", jobs.get(0).f0);
+        testController.reconcile(sessionJob, context);
+        assertEquals(
+                1,
+                sessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .getSavepointInfo()
+                        .getSavepointHistory()
+                        .size());
+
+        // Suspend job
+        sessionJob.getSpec().getJob().setState(JobState.SUSPENDED);
+        testController.reconcile(sessionJob, context);
+        flinkService.clearJobsInTerminalState();
+
+        // Resume from last savepoint
+        sessionJob.getSpec().getJob().setState(JobState.RUNNING);
+        testController.reconcile(sessionJob, context);
+        jobs = flinkService.listJobs();
+        assertEquals(1, jobs.size());
+        assertEquals("savepoint_1", jobs.get(0).f0);
+
+        testController.reconcile(sessionJob, context);
+        testController.cleanup(sessionJob, context);
+
+        flinkService.clearJobsInTerminalState();
+        jobs = flinkService.listJobs();
+        assertEquals(0, jobs.size());
+    }
+
+    @Test
+    public void verifyStatelessUpgrade() throws Exception {
+        UpdateControl<FlinkDeployment> updateControl;
+
+        sessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
+        sessionJob.getSpec().getJob().setInitialSavepointPath("s0");
+
+        testController.reconcile(sessionJob, context);
+        var jobs = flinkService.listJobs();
+        assertEquals(1, jobs.size());
+        assertEquals("s0", jobs.get(0).f0);
+
+        testController.reconcile(sessionJob, context);
+
+        assertEquals(1, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.JobStatusChanged,
+                EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
 
+        // Upgrade job
+        sessionJob.getSpec().getJob().setParallelism(100);
+        updateControl = testController.reconcile(sessionJob, context);
+
+        assertEquals(2, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.SpecChanged,
+                EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+        assertEquals(
+                EventRecorder.Reason.Suspended,
+                EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
+        assertEquals(0, updateControl.getScheduleDelay().get());
+        assertEquals(
+                JobState.SUSPENDED,
+                sessionJob
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getJob()
+                        .getState());
+
+        flinkService.clearJobsInTerminalState();
+
+        updateControl = testController.reconcile(sessionJob, context);
+
+        assertEquals(
+                Optional.of(
+                        configManager.getOperatorConfiguration().getReconcileInterval().toMillis()),
+                updateControl.getScheduleDelay());
+
+        testController.reconcile(sessionJob, context);
+        jobs = flinkService.listJobs();
+        assertEquals(1, jobs.size());
+        assertNull(jobs.get(0).f0);
+
+        assertEquals(1, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.JobStatusChanged,
+                EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
+        // Suspend job
+        sessionJob.getSpec().getJob().setState(JobState.SUSPENDED);
+        testController.reconcile(sessionJob, context);
+
+        assertEquals(2, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.SpecChanged,
+                EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+        assertEquals(
+                EventRecorder.Reason.Suspended,
+                EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
+        // Resume from empty state
+        sessionJob.getSpec().getJob().setState(JobState.RUNNING);
+        testController.reconcile(sessionJob, context);
+        flinkService.clearJobsInTerminalState();
+        testController.reconcile(sessionJob, context);
+        assertEquals(2, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.SpecChanged,
+                EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+        assertEquals(
+                EventRecorder.Reason.JobStatusChanged,
+                EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
+        jobs = flinkService.listJobs();
+        assertEquals(1, jobs.size());
+        assertNull(jobs.get(0).f0);
+
+        // Inject validation error in the middle of the upgrade
+        sessionJob.getSpec().setRestartNonce(123L);
+        testController.reconcile(sessionJob, context);
+        assertEquals(2, testController.events().size());
+        assertEquals(
+                EventRecorder.Reason.SpecChanged,
+                EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+        assertEquals(
+                EventRecorder.Reason.Suspended,
+                EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+        assertEquals(
+                JobState.SUSPENDED,
+                sessionJob
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getJob()
+                        .getState());
+
+        sessionJob.getSpec().getJob().setParallelism(-1);
+        testController.reconcile(sessionJob, context);
+        flinkService.clearJobsInTerminalState();
+        assertEquals(1, testController.events().size());
+        testController.reconcile(sessionJob, context);
+        var statusEvents =
+                testController.events().stream()
+                        .filter(e -> !e.getReason().equals(ValidationError.name()))
+                        .collect(Collectors.toList());
+        assertEquals(1, statusEvents.size());
+        assertEquals(
+                EventRecorder.Reason.JobStatusChanged,
+                EventRecorder.Reason.valueOf(statusEvents.get(0).getReason()));
+
+        assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(
+                JobState.RUNNING,
+                sessionJob
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getJob()
+                        .getState());
+    }
+
+    @Test
+    public void verifyReconcileWithBadConfig() throws Exception {
+        UpdateControl<FlinkDeployment> updateControl;
+        // Override headers, and it should be saved in lastReconciledSpec once a successful
+        // reconcile() finishes.
+        sessionJob
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "changed");
+        updateControl = testController.reconcile(sessionJob, context);
+        assertFalse(updateControl.isUpdateStatus());
+        assertEquals(
+                JobStatus.RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState());
+
+        // Check when the bad config is applied, observe() will change the cluster state correctly
+        sessionJob.getSpec().getJob().setParallelism(-1);
+        // Next reconcile will set error msg and observe with previous validated config
+        updateControl = testController.reconcile(sessionJob, context);
+        assertTrue(
+                sessionJob
+                        .getStatus()
+                        .getError()
+                        .contains("Job parallelism must be larger than 0"));
+        assertFalse(updateControl.isUpdateStatus());
+        assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+
+        // Make sure we do validation before getting effective config in reconcile().
+        // Verify the saved headers in lastReconciledSpec is actually used in observe() by
+        // utilizing listJobConsumer
+        sessionJob
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "again");
+        flinkService.setListJobConsumer(
+                (configuration) ->
+                        assertEquals(
+                                "changed",
+                                configuration.get(
+                                        KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER)));
+        testController.reconcile(sessionJob, context);
+        assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+    }
+
+    @Test
+    public void testSuccessfulObservationShouldClearErrors() throws Exception {
+        sessionJob.getSpec().getJob().setParallelism(-1);
+
+        testController.reconcile(sessionJob, context);
+
+        assertNull(sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
+
+        // Failed Job deployment should set errors to the status
+        assertTrue(
+                sessionJob
+                        .getStatus()
+                        .getError()
+                        .contains("Job parallelism must be larger than 0"));
+        assertNull(sessionJob.getStatus().getJobStatus().getState());
+
+        // Job deployment becomes ready and successful observation should clear the errors
+        sessionJob.getSpec().getJob().setParallelism(1);
+        testController.reconcile(sessionJob, context);
+        assertNull(sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
+
+        testController.reconcile(sessionJob, context);
+        assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+        assertNull(sessionJob.getStatus().getError());
+
+        assertEquals(
+                sessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec(),
+                sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
+    }
+
+    @Test
+    public void testValidationError() throws Exception {
+        UpdateControl<FlinkDeployment> updateControl;
+
+        sessionJob.getSpec().getJob().setParallelism(-1);
+        updateControl = testController.reconcile(sessionJob, context);
+
+        assertEquals(1, testController.events().size());
+        assertNull(sessionJob.getStatus().getJobStatus().getState());
+
+        var event = testController.events().remove();
+        assertEquals("Warning", event.getType());
+        assertEquals("ValidationError", event.getReason());
+        assertTrue(event.getMessage().startsWith("Job parallelism "));
+
+        // Failed spec should not be rescheduled
+        assertEquals(Optional.empty(), updateControl.getScheduleDelay());
+    }
+
+    @Test
+    public void testInitialSavepointOnError() throws Exception {
+        sessionJob.getSpec().getJob().setInitialSavepointPath("msp");
+        flinkService.setDeployFailure(true);
+        try {
+            testController.reconcile(sessionJob, context);
+            fail();
+        } catch (Exception expected) {
+        }
         flinkService.setDeployFailure(false);
-        flinkService.clear();
+        testController.reconcile(sessionJob, context);
+        assertEquals("msp", flinkService.listJobs().get(0).f0);
     }
 }