You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/12/19 14:13:20 UTC
[flink-kubernetes-operator] branch release-1.3 updated: [FLINK-28875] Add FlinkSessionJobControllerTest
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/release-1.3 by this push:
new 69f7e948 [FLINK-28875] Add FlinkSessionJobControllerTest
69f7e948 is described below
commit 69f7e94808cbded27f2238bf3039b9d693009f4a
Author: pvary <pe...@gmail.com>
AuthorDate: Mon Dec 19 15:13:15 2022 +0100
[FLINK-28875] Add FlinkSessionJobControllerTest
---
.../sessionjob/FlinkSessionJobObserver.java | 2 +-
.../sessionjob/SessionJobReconciler.java | 9 +-
.../kubernetes/operator/TestingFlinkService.java | 4 +
.../controller/FlinkDeploymentControllerTest.java | 45 ++-
.../controller/FlinkSessionJobControllerTest.java | 444 ++++++++++++++++++++-
5 files changed, 488 insertions(+), 16 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
index fe1874fd..e23458ff 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserver.java
@@ -184,7 +184,7 @@ public class FlinkSessionJobObserver
Preconditions.checkArgument(
matchedList.size() <= 1,
String.format(
- "Expected one job for JobID: %s, but %d founded",
+ "Expected one job for JobID: %s, but found %d",
status.getJobId(), matchedList.size()));
if (matchedList.size() == 0) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index 4b9d6aad..ad5c5735 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -24,7 +24,6 @@ import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -107,12 +106,8 @@ public class SessionJobReconciler
var jobID =
flinkService.submitJobToSessionCluster(
cr.getMetadata(), sessionJobSpec, deployConfig, savepoint.orElse(null));
- status.setJobStatus(
- new JobStatus()
- .toBuilder()
- .jobId(jobID.toHexString())
- .state(org.apache.flink.api.common.JobStatus.RECONCILING.name())
- .build());
+ status.getJobStatus().setJobId(jobID.toHexString());
+ status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
}
@Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index a78705df..a62bd732 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -134,6 +134,10 @@ public class TestingFlinkService extends AbstractFlinkService {
savepointCounter = 0;
}
+ public void clearJobsInTerminalState() {
+ jobs.removeIf(job -> job.f1.getJobState().isTerminalState());
+ }
+
public Set<String> getSessions() {
return sessions;
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index beb80f31..4ba5bb44 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -761,7 +761,7 @@ public class FlinkDeploymentControllerTest {
}
@Test
- public void verifyReconcileWithAChangedOperatorMode() throws Exception {
+ public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception {
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
UpdateControl<FlinkDeployment> updateControl;
@@ -790,6 +790,12 @@ public class FlinkDeploymentControllerTest {
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertTrue(
+ appCluster
+ .getStatus()
+ .getError()
+ .contains("Cannot switch from job to session cluster"));
+
assertNotNull(ReconciliationUtils.getDeployedSpec(appCluster).getJob());
// Verify jobStatus is running
jobStatus = appCluster.getStatus().getJobStatus();
@@ -799,6 +805,43 @@ public class FlinkDeploymentControllerTest {
assertEquals(expectedJobStatus.getJobState().toString(), jobStatus.getState());
}
+ @Test
+ public void verifyReconcileWithAChangedOperatorModeToApplication() throws Exception {
+
+ FlinkDeployment appCluster = TestUtils.buildSessionCluster();
+ UpdateControl<FlinkDeployment> updateControl;
+
+ updateControl = testController.reconcile(appCluster, context);
+ assertFalse(updateControl.isUpdateStatus());
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+
+ updateControl = testController.reconcile(appCluster, context);
+ JobStatus jobStatus = appCluster.getStatus().getJobStatus();
+ assertFalse(updateControl.isUpdateStatus());
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ // jobStatus has not been set at this time
+ assertNull(jobStatus.getState());
+
+ // Switches operator mode to APPLICATION
+ appCluster.getSpec().setJob(TestUtils.buildSessionJob().getSpec().getJob());
+ // Validation fails and JobObserver should still be used
+ updateControl = testController.reconcile(appCluster, context);
+ assertFalse(updateControl.isUpdateStatus());
+ assertEquals(
+ JobManagerDeploymentStatus.READY,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertTrue(
+ appCluster
+ .getStatus()
+ .getError()
+ .contains("Cannot switch from session to job cluster"));
+ assertNull(ReconciliationUtils.getDeployedSpec(appCluster).getJob());
+ }
+
private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Exception {
flinkService.clear();
testController.reconcile(appCluster, context);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
index f7f5c8ec..e5f91aea 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
@@ -17,44 +17,67 @@
package org.apache.flink.kubernetes.operator.controller;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.api.spec.JobState;
+import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus;
+import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.runtime.client.JobStatusMessage;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.kubernetes.operator.utils.EventRecorder.Reason.ValidationError;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
/** {@link FlinkSessionJobController} tests. */
@EnableKubernetesMockClient(crud = true)
class FlinkSessionJobControllerTest {
private KubernetesClient kubernetesClient;
private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
+ private final Context context = TestUtils.createContextWithReadyFlinkDeployment();
+
private TestingFlinkService flinkService = new TestingFlinkService();
private TestingFlinkSessionJobController testController;
+ private FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
@BeforeEach
public void before() {
flinkService = new TestingFlinkService();
testController =
new TestingFlinkSessionJobController(configManager, kubernetesClient, flinkService);
-
- kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
+ sessionJob = TestUtils.buildSessionJob();
+ kubernetesClient.resource(sessionJob).createOrReplace();
}
@Test
public void testSubmitJobButException() {
- FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
-
flinkService.setDeployFailure(true);
try {
- testController.reconcile(sessionJob, TestUtils.createContextWithReadyFlinkDeployment());
+ testController.reconcile(sessionJob, context);
} catch (Exception e) {
// Ignore
}
@@ -64,9 +87,416 @@ class FlinkSessionJobControllerTest {
Assertions.assertEquals(EventRecorder.Type.Warning.toString(), event.getType());
Assertions.assertEquals("SessionJobException", event.getReason());
- testController.cleanup(sessionJob, TestUtils.createContextWithReadyFlinkDeployment());
+ testController.cleanup(sessionJob, context);
+ }
+
+ @Test
+ public void verifyBasicReconcileLoop() throws Exception {
+ UpdateControl<FlinkSessionJob> updateControl;
+
+ assertEquals(
+ ReconciliationState.UPGRADING,
+ sessionJob.getStatus().getReconciliationStatus().getState());
+ assertNull(sessionJob.getStatus().getJobStatus().getState());
+
+ updateControl = testController.reconcile(sessionJob, context);
+
+ // Reconciling
+ assertEquals(
+ JobStatus.RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(3, testController.getInternalStatusUpdateCount());
+ assertFalse(updateControl.isUpdateStatus());
+ assertEquals(
+ Optional.of(
+ configManager.getOperatorConfiguration().getReconcileInterval().toMillis()),
+ updateControl.getScheduleDelay());
+
+ // Validate reconciliation status
+ FlinkSessionJobReconciliationStatus reconciliationStatus =
+ sessionJob.getStatus().getReconciliationStatus();
+ assertNull(sessionJob.getStatus().getError());
+ assertEquals(sessionJob.getSpec(), reconciliationStatus.deserializeLastReconciledSpec());
+ assertNull(sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
+
+ // Running
+ updateControl = testController.reconcile(sessionJob, context);
+ assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(4, testController.getInternalStatusUpdateCount());
+ assertFalse(updateControl.isUpdateStatus());
+ assertEquals(
+ Optional.of(
+ configManager.getOperatorConfiguration().getReconcileInterval().toMillis()),
+ updateControl.getScheduleDelay());
+
+ // Stable loop
+ updateControl = testController.reconcile(sessionJob, context);
+ assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(4, testController.getInternalStatusUpdateCount());
+ assertFalse(updateControl.isUpdateStatus());
+ assertEquals(
+ Optional.of(
+ configManager.getOperatorConfiguration().getReconcileInterval().toMillis()),
+ updateControl.getScheduleDelay());
+
+ // Validate job status
+ var jobStatus = sessionJob.getStatus().getJobStatus();
+ JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
+ assertEquals(expectedJobStatus.getJobId().toHexString(), jobStatus.getJobId());
+ assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
+ assertEquals(expectedJobStatus.getJobState().toString(), jobStatus.getState());
+ assertEquals(
+ sessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec(),
+ sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
+
+ // Send in invalid update
+ sessionJob.getSpec().getJob().setParallelism(-1);
+ updateControl = testController.reconcile(sessionJob, context);
+
+ assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(5, testController.getInternalStatusUpdateCount());
+ assertFalse(updateControl.isUpdateStatus());
+
+ reconciliationStatus = sessionJob.getStatus().getReconciliationStatus();
+ assertTrue(
+ sessionJob
+ .getStatus()
+ .getError()
+ .contains("Job parallelism must be larger than 0"));
+ assertNotNull(reconciliationStatus.deserializeLastReconciledSpec().getJob());
+
+ // Validate job status correct even with error
+ jobStatus = sessionJob.getStatus().getJobStatus();
+ expectedJobStatus = flinkService.listJobs().get(0).f1;
+ assertEquals(expectedJobStatus.getJobId().toHexString(), jobStatus.getJobId());
+ assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
+ assertEquals(expectedJobStatus.getJobState().toString(), jobStatus.getState());
+
+ // Validate last stable spec is still the old one
+ assertEquals(
+ sessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec(),
+ sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
+ }
+
+ @Test
+ public void verifyUpgradeFromSavepoint() throws Exception {
+ UpdateControl<FlinkDeployment> updateControl;
+
+ sessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+ sessionJob.getSpec().getJob().setInitialSavepointPath("s0");
+ testController.reconcile(sessionJob, context);
+ var jobs = flinkService.listJobs();
+ assertEquals(1, jobs.size());
+ assertEquals("s0", jobs.get(0).f0);
+
+ var previousJobs = new ArrayList<>(jobs);
+ sessionJob.getSpec().getJob().setInitialSavepointPath("s1");
+
+ // Send in a no-op change
+ testController.reconcile(sessionJob, context);
+ assertEquals(previousJobs, new ArrayList<>(flinkService.listJobs()));
+
+ // Upgrade job
+ assertTrue(
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getSavepointHistory()
+ .isEmpty());
+
+ sessionJob.getSpec().getJob().setParallelism(100);
+ updateControl = testController.reconcile(sessionJob, context);
+
+ assertEquals(0L, updateControl.getScheduleDelay().get());
+ assertEquals(
+ JobState.SUSPENDED,
+ sessionJob
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getState());
+ assertEquals(
+ 1,
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getSavepointHistory()
+ .size());
+
+ flinkService.clearJobsInTerminalState();
+
+ testController.reconcile(sessionJob, context);
+ jobs = flinkService.listJobs();
+ assertEquals(1, jobs.size());
+ assertEquals("savepoint_0", jobs.get(0).f0);
+ testController.reconcile(sessionJob, context);
+ assertEquals(
+ 1,
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getSavepointHistory()
+ .size());
+
+ // Suspend job
+ sessionJob.getSpec().getJob().setState(JobState.SUSPENDED);
+ testController.reconcile(sessionJob, context);
+ flinkService.clearJobsInTerminalState();
+
+ // Resume from last savepoint
+ sessionJob.getSpec().getJob().setState(JobState.RUNNING);
+ testController.reconcile(sessionJob, context);
+ jobs = flinkService.listJobs();
+ assertEquals(1, jobs.size());
+ assertEquals("savepoint_1", jobs.get(0).f0);
+
+ testController.reconcile(sessionJob, context);
+ testController.cleanup(sessionJob, context);
+
+ flinkService.clearJobsInTerminalState();
+ jobs = flinkService.listJobs();
+ assertEquals(0, jobs.size());
+ }
+
+ @Test
+ public void verifyStatelessUpgrade() throws Exception {
+ UpdateControl<FlinkDeployment> updateControl;
+
+ sessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
+ sessionJob.getSpec().getJob().setInitialSavepointPath("s0");
+
+ testController.reconcile(sessionJob, context);
+ var jobs = flinkService.listJobs();
+ assertEquals(1, jobs.size());
+ assertEquals("s0", jobs.get(0).f0);
+
+ testController.reconcile(sessionJob, context);
+
+ assertEquals(1, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.JobStatusChanged,
+ EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+ // Upgrade job
+ sessionJob.getSpec().getJob().setParallelism(100);
+ updateControl = testController.reconcile(sessionJob, context);
+
+ assertEquals(2, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.SpecChanged,
+ EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+ assertEquals(
+ EventRecorder.Reason.Suspended,
+ EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
+ assertEquals(0, updateControl.getScheduleDelay().get());
+ assertEquals(
+ JobState.SUSPENDED,
+ sessionJob
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getState());
+
+ flinkService.clearJobsInTerminalState();
+
+ updateControl = testController.reconcile(sessionJob, context);
+
+ assertEquals(
+ Optional.of(
+ configManager.getOperatorConfiguration().getReconcileInterval().toMillis()),
+ updateControl.getScheduleDelay());
+
+ testController.reconcile(sessionJob, context);
+ jobs = flinkService.listJobs();
+ assertEquals(1, jobs.size());
+ assertNull(jobs.get(0).f0);
+
+ assertEquals(1, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.JobStatusChanged,
+ EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
+ // Suspend job
+ sessionJob.getSpec().getJob().setState(JobState.SUSPENDED);
+ testController.reconcile(sessionJob, context);
+
+ assertEquals(2, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.SpecChanged,
+ EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+ assertEquals(
+ EventRecorder.Reason.Suspended,
+ EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
+ // Resume from empty state
+ sessionJob.getSpec().getJob().setState(JobState.RUNNING);
+ testController.reconcile(sessionJob, context);
+ flinkService.clearJobsInTerminalState();
+ testController.reconcile(sessionJob, context);
+ assertEquals(2, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.SpecChanged,
+ EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+ assertEquals(
+ EventRecorder.Reason.JobStatusChanged,
+ EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+
+ jobs = flinkService.listJobs();
+ assertEquals(1, jobs.size());
+ assertNull(jobs.get(0).f0);
+
+ // Inject validation error in the middle of the upgrade
+ sessionJob.getSpec().setRestartNonce(123L);
+ testController.reconcile(sessionJob, context);
+ assertEquals(2, testController.events().size());
+ assertEquals(
+ EventRecorder.Reason.SpecChanged,
+ EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+ assertEquals(
+ EventRecorder.Reason.Suspended,
+ EventRecorder.Reason.valueOf(testController.events().poll().getReason()));
+ assertEquals(
+ JobState.SUSPENDED,
+ sessionJob
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getState());
+
+ sessionJob.getSpec().getJob().setParallelism(-1);
+ testController.reconcile(sessionJob, context);
+ flinkService.clearJobsInTerminalState();
+ assertEquals(1, testController.events().size());
+ testController.reconcile(sessionJob, context);
+ var statusEvents =
+ testController.events().stream()
+ .filter(e -> !e.getReason().equals(ValidationError.name()))
+ .collect(Collectors.toList());
+ assertEquals(1, statusEvents.size());
+ assertEquals(
+ EventRecorder.Reason.JobStatusChanged,
+ EventRecorder.Reason.valueOf(statusEvents.get(0).getReason()));
+
+ assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(
+ JobState.RUNNING,
+ sessionJob
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getState());
+ }
+
+ @Test
+ public void verifyReconcileWithBadConfig() throws Exception {
+ UpdateControl<FlinkDeployment> updateControl;
+ // Override headers, and it should be saved in lastReconciledSpec once a successful
+ // reconcile() finishes.
+ sessionJob
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "changed");
+ updateControl = testController.reconcile(sessionJob, context);
+ assertFalse(updateControl.isUpdateStatus());
+ assertEquals(
+ JobStatus.RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState());
+
+ // Check when the bad config is applied, observe() will change the cluster state correctly
+ sessionJob.getSpec().getJob().setParallelism(-1);
+ // Next reconcile will set error msg and observe with previous validated config
+ updateControl = testController.reconcile(sessionJob, context);
+ assertTrue(
+ sessionJob
+ .getStatus()
+ .getError()
+ .contains("Job parallelism must be larger than 0"));
+ assertFalse(updateControl.isUpdateStatus());
+ assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+
+ // Make sure we do validation before getting effective config in reconcile().
+ // Verify the saved headers in lastReconciledSpec is actually used in observe() by
+ // utilizing listJobConsumer
+ sessionJob
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "again");
+ flinkService.setListJobConsumer(
+ (configuration) ->
+ assertEquals(
+ "changed",
+ configuration.get(
+ KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER)));
+ testController.reconcile(sessionJob, context);
+ assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ }
+
+ @Test
+ public void testSuccessfulObservationShouldClearErrors() throws Exception {
+ sessionJob.getSpec().getJob().setParallelism(-1);
+
+ testController.reconcile(sessionJob, context);
+
+ assertNull(sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
+
+ // Failed Job deployment should set errors to the status
+ assertTrue(
+ sessionJob
+ .getStatus()
+ .getError()
+ .contains("Job parallelism must be larger than 0"));
+ assertNull(sessionJob.getStatus().getJobStatus().getState());
+
+ // Job deployment becomes ready and successful observation should clear the errors
+ sessionJob.getSpec().getJob().setParallelism(1);
+ testController.reconcile(sessionJob, context);
+ assertNull(sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
+
+ testController.reconcile(sessionJob, context);
+ assertEquals(JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+ assertNull(sessionJob.getStatus().getError());
+
+ assertEquals(
+ sessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec(),
+ sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
+ }
+
+ @Test
+ public void testValidationError() throws Exception {
+ UpdateControl<FlinkDeployment> updateControl;
+
+ sessionJob.getSpec().getJob().setParallelism(-1);
+ updateControl = testController.reconcile(sessionJob, context);
+
+ assertEquals(1, testController.events().size());
+ assertNull(sessionJob.getStatus().getJobStatus().getState());
+
+ var event = testController.events().remove();
+ assertEquals("Warning", event.getType());
+ assertEquals("ValidationError", event.getReason());
+ assertTrue(event.getMessage().startsWith("Job parallelism "));
+
+ // Failed spec should not be rescheduled
+ assertEquals(Optional.empty(), updateControl.getScheduleDelay());
+ }
+
+ @Test
+ public void testInitialSavepointOnError() throws Exception {
+ sessionJob.getSpec().getJob().setInitialSavepointPath("msp");
+ flinkService.setDeployFailure(true);
+ try {
+ testController.reconcile(sessionJob, context);
+ fail();
+ } catch (Exception expected) {
+ }
flinkService.setDeployFailure(false);
- flinkService.clear();
+ testController.reconcile(sessionJob, context);
+ assertEquals("msp", flinkService.listJobs().get(0).f0);
}
}