You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/09/27 11:57:04 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-29392] Trigger error when session job is lost without HA
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new c4445bcc [FLINK-29392] Trigger error when session job is lost without HA
c4445bcc is described below
commit c4445bcc7d00f774489007a3e961dfa0dae3e38d
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Tue Sep 27 13:57:00 2022 +0200
[FLINK-29392] Trigger error when session job is lost without HA
---
.../operator/observer/JobStatusObserver.java | 51 +++++++++++++++----
.../deployment/AbstractDeploymentObserver.java | 17 +++++--
.../observer/deployment/ApplicationObserver.java | 34 ++++++++++++-
.../observer/deployment/SessionObserver.java | 1 +
.../observer/sessionjob/SessionJobObserver.java | 59 ++++++++++++++++++++--
.../TestingFlinkDeploymentController.java | 12 +----
.../sessionjob/SessionJobObserverTest.java | 34 ++++++++++++-
.../kubernetes/operator/utils/EventCollector.java | 38 ++++++++++++++
8 files changed, 215 insertions(+), 31 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 51cf72ad..816b82af 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -34,10 +34,13 @@ import java.util.Optional;
import java.util.concurrent.TimeoutException;
/** An observer to observe the job status. */
-public abstract class JobStatusObserver<CTX> {
+public abstract class JobStatusObserver<R extends AbstractFlinkResource<?, ?>, CTX> {
private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+
private static final int MAX_ERROR_STRING_LENGTH = 512;
+ public static final String MISSING_SESSION_JOB_ERR = "Missing Session Job";
+
private final FlinkService flinkService;
private final EventRecorder eventRecorder;
@@ -54,15 +57,17 @@ public abstract class JobStatusObserver<CTX> {
* @param ctx Observe context.
* @return If job found return true, otherwise return false.
*/
- public boolean observe(
- AbstractFlinkResource<?, ?> resource, Configuration deployedConfig, CTX ctx) {
+ public boolean observe(R resource, Configuration deployedConfig, CTX ctx) {
var jobStatus = resource.getStatus().getJobStatus();
LOG.info("Observing job status");
var previousJobStatus = jobStatus.getState();
+
List<JobStatusMessage> clusterJobStatuses;
try {
+ // Query job list from the cluster
clusterJobStatuses = new ArrayList<>(flinkService.listJobs(deployedConfig));
} catch (Exception e) {
+ // Error while accessing the rest api, will try again later...
LOG.error("Exception while listing jobs", e);
ifRunningMoveToReconciling(jobStatus, previousJobStatus);
if (e instanceof TimeoutException) {
@@ -72,10 +77,14 @@ public abstract class JobStatusObserver<CTX> {
}
if (!clusterJobStatuses.isEmpty()) {
+ // There are jobs on the cluster, we filter the ones for this resource
Optional<JobStatusMessage> targetJobStatusMessage =
filterTargetJob(jobStatus, clusterJobStatuses);
+
if (targetJobStatusMessage.isEmpty()) {
- jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+ LOG.warn("No matching jobs found on the cluster");
+ ifRunningMoveToReconciling(jobStatus, previousJobStatus);
+ onTargetJobNotFound(resource, deployedConfig);
return false;
} else {
updateJobStatus(resource, targetJobStatusMessage.get(), deployedConfig);
@@ -83,11 +92,37 @@ public abstract class JobStatusObserver<CTX> {
ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus());
return true;
} else {
+ LOG.debug("No jobs found on the cluster");
+ // No jobs found on the cluster, it is possible that the jobmanager is still starting up
ifRunningMoveToReconciling(jobStatus, previousJobStatus);
+ onNoJobsFound(resource, deployedConfig);
return false;
}
}
+ /**
+ * Callback when no matching target job was found on a cluster where jobs were found.
+ *
+ * @param resource The Flink resource.
+ * @param config Deployed/observe configuration.
+ */
+ protected abstract void onTargetJobNotFound(R resource, Configuration config);
+
+ /**
+ * Callback when no jobs were found on the cluster.
+ *
+ * @param resource The Flink resource.
+ * @param config Deployed/observe configuration.
+ */
+ protected void onNoJobsFound(R resource, Configuration config) {}
+
+ /**
+ * If we observed the job previously in RUNNING state we move to RECONCILING instead as we are
+ * not sure anymore.
+ *
+ * @param jobStatus JobStatus object.
+ * @param previousJobStatus Last observed job state.
+ */
private void ifRunningMoveToReconciling(JobStatus jobStatus, String previousJobStatus) {
if (org.apache.flink.api.common.JobStatus.RUNNING.name().equals(previousJobStatus)) {
jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
@@ -120,9 +155,7 @@ public abstract class JobStatusObserver<CTX> {
* @param deployedConfig Deployed job config.
*/
private void updateJobStatus(
- AbstractFlinkResource<?, ?> resource,
- JobStatusMessage clusterJobStatus,
- Configuration deployedConfig) {
+ R resource, JobStatusMessage clusterJobStatus, Configuration deployedConfig) {
var jobStatus = resource.getStatus().getJobStatus();
var previousJobStatus = jobStatus.getState();
@@ -154,9 +187,7 @@ public abstract class JobStatusObserver<CTX> {
}
private void setErrorIfPresent(
- AbstractFlinkResource<?, ?> resource,
- JobStatusMessage clusterJobStatus,
- Configuration deployedConfig) {
+ R resource, JobStatusMessage clusterJobStatus, Configuration deployedConfig) {
if (clusterJobStatus.getJobState() == org.apache.flink.api.common.JobStatus.FAILED) {
try {
var result =
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index e828a8ab..744ecd64 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -75,26 +75,35 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
var status = flinkApp.getStatus();
var reconciliationStatus = status.getReconciliationStatus();
- // Nothing has been launched so skip observing
- if (reconciliationStatus.isBeforeFirstDeployment()
- || reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
+ if (reconciliationStatus.isBeforeFirstDeployment()) {
+ logger.debug("Skipping observe before first deployment");
return;
}
+ if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
+ logger.debug("Skipping observe during rollback operation");
+ return;
+ }
+
+ // We are in the middle or possibly right after an upgrade
if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+ // We must check if the upgrade went through without the status upgrade for some reason
checkIfAlreadyUpgraded(flinkApp, context);
if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(flinkApp);
+ logger.debug("Skipping observe before resource is deployed during upgrade");
return;
}
}
Configuration observeConfig = configManager.getObserveConfig(flinkApp);
if (!isJmDeploymentReady(flinkApp)) {
+ // Only observe the JM if we think it's in bad state
observeJmDeployment(flinkApp, context, observeConfig);
}
if (isJmDeploymentReady(flinkApp)) {
+ // Only observe session/application if JM is ready
observeFlinkCluster(flinkApp, context, observeConfig);
}
@@ -126,7 +135,7 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
deploymentStatus.getJobManagerDeploymentStatus();
if (isSuspendedJob(flinkApp)) {
- logger.debug("Skipping observe step for suspended application deployments.");
+ logger.debug("Skipping observe step for suspended application deployments");
return;
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index dd658b83..a2eac7e3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.observer.SavepointObserver;
import org.apache.flink.kubernetes.operator.observer.context.ApplicationObserverContext;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -38,7 +39,7 @@ import java.util.Optional;
public class ApplicationObserver extends AbstractDeploymentObserver {
private final SavepointObserver<FlinkDeployment, FlinkDeploymentStatus> savepointObserver;
- private final JobStatusObserver<ApplicationObserverContext> jobStatusObserver;
+ private final JobStatusObserver<FlinkDeployment, ApplicationObserverContext> jobStatusObserver;
public ApplicationObserver(
FlinkService flinkService,
@@ -62,6 +63,36 @@ public class ApplicationObserver extends AbstractDeploymentObserver {
}
return Optional.empty();
}
+
+ @Override
+ protected void onTargetJobNotFound(
+ FlinkDeployment resource, Configuration config) {
+ // This should never happen for application clusters, there is something
+ // wrong
+ setUnknownJobError(resource);
+ }
+
+ /**
+ * We found a job on an application cluster that doesn't match the expected job.
+ * Trigger error.
+ *
+ * @param deployment Application deployment.
+ */
+ private void setUnknownJobError(FlinkDeployment deployment) {
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+ String err = "Unrecognized Job for Application deployment";
+ logger.error(err);
+ ReconciliationUtils.updateForReconciliationError(deployment, err);
+ eventRecorder.triggerEvent(
+ deployment,
+ EventRecorder.Type.Warning,
+ EventRecorder.Reason.Missing,
+ EventRecorder.Component.Job,
+ err);
+ }
};
}
@@ -69,6 +100,7 @@ public class ApplicationObserver extends AbstractDeploymentObserver {
protected void observeFlinkCluster(
FlinkDeployment flinkApp, Context<?> context, Configuration deployedConfig) {
+ logger.debug("Observing application cluster");
boolean jobFound =
jobStatusObserver.observe(
flinkApp,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
index f8f0010a..74054ea7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
@@ -43,6 +43,7 @@ public class SessionObserver extends AbstractDeploymentObserver {
FlinkDeployment deployment, Context<?> context, Configuration deployedConfig) {
// Check if session cluster can serve rest calls following our practice in JobObserver
try {
+ logger.debug("Observing session cluster");
flinkService.listJobs(deployedConfig);
var rs = deployment.getStatus().getReconciliationStatus();
if (rs.getState() == ReconciliationState.DEPLOYED) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 760bd327..17580dc0 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -36,6 +36,7 @@ import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.Preconditions;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -65,7 +66,8 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
this.eventRecorder = eventRecorder;
}
- private JobStatusObserver<VoidObserverContext> getJobStatusObserver(FlinkService flinkService) {
+ private JobStatusObserver<FlinkSessionJob, VoidObserverContext> getJobStatusObserver(
+ FlinkService flinkService) {
return new JobStatusObserver<>(flinkService, eventRecorder) {
@Override
protected void onTimeout(VoidObserverContext sessionJobObserverContext) {}
@@ -87,18 +89,64 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
status.getJobId(), matchedList.size()));
if (matchedList.size() == 0) {
- LOG.info("No job found for JobID: {}", jobId);
+ LOG.warn("No job found for JobID: {}", jobId);
return Optional.empty();
} else {
return Optional.of(matchedList.get(0));
}
}
+
+ @Override
+ protected void onTargetJobNotFound(FlinkSessionJob resource, Configuration config) {
+ ifHaDisabledMarkSessionJobMissing(resource, config);
+ }
+
+ @Override
+ protected void onNoJobsFound(FlinkSessionJob resource, Configuration config) {
+ ifHaDisabledMarkSessionJobMissing(resource, config);
+ }
+
+ /**
+ * When HA is disabled the session job will not recover on JM restarts. If the JM goes
+ * down / restarted the session job should be marked missing.
+ *
+ * @param sessionJob Flink session job.
+ * @param conf Flink config.
+ */
+ private void ifHaDisabledMarkSessionJobMissing(
+ FlinkSessionJob sessionJob, Configuration conf) {
+ if (HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
+ return;
+ }
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+ LOG.error(MISSING_SESSION_JOB_ERR);
+ ReconciliationUtils.updateForReconciliationError(
+ sessionJob, MISSING_SESSION_JOB_ERR);
+ eventRecorder.triggerEvent(
+ sessionJob,
+ EventRecorder.Type.Warning,
+ EventRecorder.Reason.Missing,
+ EventRecorder.Component.Job,
+ MISSING_SESSION_JOB_ERR);
+ }
};
}
@Override
public void observe(FlinkSessionJob flinkSessionJob, Context<?> context) {
- if (flinkSessionJob.getStatus().getReconciliationStatus().isBeforeFirstDeployment()) {
+ var status = flinkSessionJob.getStatus();
+ var reconciliationStatus = status.getReconciliationStatus();
+
+ if (reconciliationStatus.isBeforeFirstDeployment()) {
+ LOG.debug("Skipping observe before first deployment");
+ return;
+ }
+
+ if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
+ LOG.debug("Skipping observe during rollback operation");
return;
}
@@ -112,10 +160,13 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
var jobStatusObserver = getJobStatusObserver(flinkService);
var deployedConfig =
configManager.getSessionJobConfig(flinkDepOpt.get(), flinkSessionJob.getSpec());
- var reconciliationStatus = flinkSessionJob.getStatus().getReconciliationStatus();
+
+ // We are in the middle or possibly right after an upgrade
if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+ // We must check if the upgrade went through without the status upgrade for some reason
checkIfAlreadyUpgraded(flinkSessionJob, deployedConfig, flinkService);
if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+ LOG.debug("Skipping observe before resource is deployed during upgrade");
ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(flinkSessionJob);
return;
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 7a364979..14ba9671 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -20,7 +20,6 @@ package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
@@ -28,6 +27,7 @@ import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
+import org.apache.flink.kubernetes.operator.utils.EventCollector;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
@@ -46,7 +46,6 @@ import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import org.junit.jupiter.api.Assertions;
-import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.function.BiConsumer;
@@ -123,15 +122,6 @@ public class TestingFlinkDeploymentController
throw new UnsupportedOperationException();
}
- private static class EventCollector implements BiConsumer<AbstractFlinkResource<?, ?>, Event> {
- private Queue<Event> events = new LinkedList<>();
-
- @Override
- public void accept(AbstractFlinkResource<?, ?> abstractFlinkResource, Event event) {
- events.add(event);
- }
- }
-
public Queue<Event> events() {
return eventCollector.events;
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index 092fe899..17075ed6 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.observer.sessionjob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
@@ -30,9 +31,11 @@ import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
+import org.apache.flink.kubernetes.operator.utils.EventCollector;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
@@ -40,6 +43,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -59,10 +63,12 @@ public class SessionJobObserverTest {
private SessionJobObserver observer;
private SessionJobReconciler reconciler;
+ private EventCollector eventCollector = new EventCollector();
+
@BeforeEach
public void before() {
kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
- var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+ var eventRecorder = new EventRecorder(kubernetesClient, eventCollector);
var statusRecorder = new TestingStatusRecorder<FlinkSessionJob, FlinkSessionJobStatus>();
flinkService = new TestingFlinkService();
FlinkServiceFactory flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
@@ -137,6 +143,32 @@ public class SessionJobObserverTest {
observer.observe(sessionJob, readyContext);
Assertions.assertEquals(
JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+
+ // test error behaviour if job not present
+ flinkService.clear();
+
+ eventCollector.events.clear();
+
+ // With HA enabled no error should be triggered
+ observer.observe(sessionJob2, readyContext);
+ Assertions.assertEquals(
+ JobStatus.RECONCILING.name(), sessionJob2.getStatus().getJobStatus().getState());
+ Assertions.assertTrue(StringUtils.isEmpty(sessionJob2.getStatus().getError()));
+ Assertions.assertTrue(eventCollector.events.isEmpty());
+
+ // With HA disabled we expect an error status and event
+ sessionJob2
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(HighAvailabilityOptions.HA_MODE.key(), "NONE");
+ observer.observe(sessionJob2, readyContext);
+ Assertions.assertEquals(
+ JobStatus.RECONCILING.name(), sessionJob2.getStatus().getJobStatus().getState());
+ Assertions.assertEquals(
+ JobStatusObserver.MISSING_SESSION_JOB_ERR, sessionJob2.getStatus().getError());
+ Assertions.assertEquals(
+ JobStatusObserver.MISSING_SESSION_JOB_ERR,
+ eventCollector.events.peek().getMessage());
}
@Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventCollector.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventCollector.java
new file mode 100644
index 00000000..b7db8d65
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventCollector.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+
+import io.fabric8.kubernetes.api.model.Event;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.function.BiConsumer;
+
+/** Simple consumer that collects triggered events for tests. */
+public class EventCollector implements BiConsumer<AbstractFlinkResource<?, ?>, Event> {
+
+ public final Queue<Event> events = new LinkedList<>();
+
+ @Override
+ public void accept(AbstractFlinkResource<?, ?> abstractFlinkResource, Event event) {
+ events.add(event);
+ }
+}