You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/09/27 11:57:04 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-29392] Trigger error when session job is lost without HA

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new c4445bcc [FLINK-29392] Trigger error when session job is lost without HA
c4445bcc is described below

commit c4445bcc7d00f774489007a3e961dfa0dae3e38d
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Tue Sep 27 13:57:00 2022 +0200

    [FLINK-29392] Trigger error when session job is lost without HA
---
 .../operator/observer/JobStatusObserver.java       | 51 +++++++++++++++----
 .../deployment/AbstractDeploymentObserver.java     | 17 +++++--
 .../observer/deployment/ApplicationObserver.java   | 34 ++++++++++++-
 .../observer/deployment/SessionObserver.java       |  1 +
 .../observer/sessionjob/SessionJobObserver.java    | 59 ++++++++++++++++++++--
 .../TestingFlinkDeploymentController.java          | 12 +----
 .../sessionjob/SessionJobObserverTest.java         | 34 ++++++++++++-
 .../kubernetes/operator/utils/EventCollector.java  | 38 ++++++++++++++
 8 files changed, 215 insertions(+), 31 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 51cf72ad..816b82af 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -34,10 +34,13 @@ import java.util.Optional;
 import java.util.concurrent.TimeoutException;
 
 /** An observer to observe the job status. */
-public abstract class JobStatusObserver<CTX> {
+public abstract class JobStatusObserver<R extends AbstractFlinkResource<?, ?>, CTX> {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+
     private static final int MAX_ERROR_STRING_LENGTH = 512;
+    public static final String MISSING_SESSION_JOB_ERR = "Missing Session Job";
+
     private final FlinkService flinkService;
     private final EventRecorder eventRecorder;
 
@@ -54,15 +57,17 @@ public abstract class JobStatusObserver<CTX> {
      * @param ctx Observe context.
      * @return If job found return true, otherwise return false.
      */
-    public boolean observe(
-            AbstractFlinkResource<?, ?> resource, Configuration deployedConfig, CTX ctx) {
+    public boolean observe(R resource, Configuration deployedConfig, CTX ctx) {
         var jobStatus = resource.getStatus().getJobStatus();
         LOG.info("Observing job status");
         var previousJobStatus = jobStatus.getState();
+
         List<JobStatusMessage> clusterJobStatuses;
         try {
+            // Query job list from the cluster
             clusterJobStatuses = new ArrayList<>(flinkService.listJobs(deployedConfig));
         } catch (Exception e) {
+            // Error while accessing the rest api, will try again later...
             LOG.error("Exception while listing jobs", e);
             ifRunningMoveToReconciling(jobStatus, previousJobStatus);
             if (e instanceof TimeoutException) {
@@ -72,10 +77,14 @@ public abstract class JobStatusObserver<CTX> {
         }
 
         if (!clusterJobStatuses.isEmpty()) {
+            // There are jobs on the cluster, we filter the ones for this resource
             Optional<JobStatusMessage> targetJobStatusMessage =
                     filterTargetJob(jobStatus, clusterJobStatuses);
+
             if (targetJobStatusMessage.isEmpty()) {
-                jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                LOG.warn("No matching jobs found on the cluster");
+                ifRunningMoveToReconciling(jobStatus, previousJobStatus);
+                onTargetJobNotFound(resource, deployedConfig);
                 return false;
             } else {
                 updateJobStatus(resource, targetJobStatusMessage.get(), deployedConfig);
@@ -83,11 +92,37 @@ public abstract class JobStatusObserver<CTX> {
             ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus());
             return true;
         } else {
+            LOG.debug("No jobs found on the cluster");
+            // No jobs found on the cluster, it is possible that the jobmanager is still starting up
             ifRunningMoveToReconciling(jobStatus, previousJobStatus);
+            onNoJobsFound(resource, deployedConfig);
             return false;
         }
     }
 
+    /**
+     * Callback when no matching target job was found on a cluster where jobs were found.
+     *
+     * @param resource The Flink resource.
+     * @param config Deployed/observe configuration.
+     */
+    protected abstract void onTargetJobNotFound(R resource, Configuration config);
+
+    /**
+     * Callback when no jobs were found on the cluster.
+     *
+     * @param resource The Flink resource.
+     * @param config Deployed/observe configuration.
+     */
+    protected void onNoJobsFound(R resource, Configuration config) {}
+
+    /**
+     * If we observed the job previously in RUNNING state we move to RECONCILING instead as we are
+     * not sure anymore.
+     *
+     * @param jobStatus JobStatus object.
+     * @param previousJobStatus Last observed job state.
+     */
     private void ifRunningMoveToReconciling(JobStatus jobStatus, String previousJobStatus) {
         if (org.apache.flink.api.common.JobStatus.RUNNING.name().equals(previousJobStatus)) {
             jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
@@ -120,9 +155,7 @@ public abstract class JobStatusObserver<CTX> {
      * @param deployedConfig Deployed job config.
      */
     private void updateJobStatus(
-            AbstractFlinkResource<?, ?> resource,
-            JobStatusMessage clusterJobStatus,
-            Configuration deployedConfig) {
+            R resource, JobStatusMessage clusterJobStatus, Configuration deployedConfig) {
         var jobStatus = resource.getStatus().getJobStatus();
         var previousJobStatus = jobStatus.getState();
 
@@ -154,9 +187,7 @@ public abstract class JobStatusObserver<CTX> {
     }
 
     private void setErrorIfPresent(
-            AbstractFlinkResource<?, ?> resource,
-            JobStatusMessage clusterJobStatus,
-            Configuration deployedConfig) {
+            R resource, JobStatusMessage clusterJobStatus, Configuration deployedConfig) {
         if (clusterJobStatus.getJobState() == org.apache.flink.api.common.JobStatus.FAILED) {
             try {
                 var result =
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index e828a8ab..744ecd64 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -75,26 +75,35 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
         var status = flinkApp.getStatus();
         var reconciliationStatus = status.getReconciliationStatus();
 
-        // Nothing has been launched so skip observing
-        if (reconciliationStatus.isBeforeFirstDeployment()
-                || reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
+        if (reconciliationStatus.isBeforeFirstDeployment()) {
+            logger.debug("Skipping observe before first deployment");
             return;
         }
 
+        if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
+            logger.debug("Skipping observe during rollback operation");
+            return;
+        }
+
+        // We are in the middle or possibly right after an upgrade
         if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+            // We must check if the upgrade went through without the status upgrade for some reason
             checkIfAlreadyUpgraded(flinkApp, context);
             if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
                 ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(flinkApp);
+                logger.debug("Skipping observe before resource is deployed during upgrade");
                 return;
             }
         }
 
         Configuration observeConfig = configManager.getObserveConfig(flinkApp);
         if (!isJmDeploymentReady(flinkApp)) {
+            // Only observe the JM if we think it's in bad state
             observeJmDeployment(flinkApp, context, observeConfig);
         }
 
         if (isJmDeploymentReady(flinkApp)) {
+            // Only observe session/application if JM is ready
             observeFlinkCluster(flinkApp, context, observeConfig);
         }
 
@@ -126,7 +135,7 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
                 deploymentStatus.getJobManagerDeploymentStatus();
 
         if (isSuspendedJob(flinkApp)) {
-            logger.debug("Skipping observe step for suspended application deployments.");
+            logger.debug("Skipping observe step for suspended application deployments");
             return;
         }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index dd658b83..a2eac7e3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.observer.SavepointObserver;
 import org.apache.flink.kubernetes.operator.observer.context.ApplicationObserverContext;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.client.JobStatusMessage;
@@ -38,7 +39,7 @@ import java.util.Optional;
 public class ApplicationObserver extends AbstractDeploymentObserver {
 
     private final SavepointObserver<FlinkDeployment, FlinkDeploymentStatus> savepointObserver;
-    private final JobStatusObserver<ApplicationObserverContext> jobStatusObserver;
+    private final JobStatusObserver<FlinkDeployment, ApplicationObserverContext> jobStatusObserver;
 
     public ApplicationObserver(
             FlinkService flinkService,
@@ -62,6 +63,36 @@ public class ApplicationObserver extends AbstractDeploymentObserver {
                         }
                         return Optional.empty();
                     }
+
+                    @Override
+                    protected void onTargetJobNotFound(
+                            FlinkDeployment resource, Configuration config) {
+                        // This should never happen for application clusters, there is something
+                        // wrong
+                        setUnknownJobError(resource);
+                    }
+
+                    /**
+                     * We found a job on an application cluster that doesn't match the expected job.
+                     * Trigger error.
+                     *
+                     * @param deployment Application deployment.
+                     */
+                    private void setUnknownJobError(FlinkDeployment deployment) {
+                        deployment
+                                .getStatus()
+                                .getJobStatus()
+                                .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                        String err = "Unrecognized Job for Application deployment";
+                        logger.error(err);
+                        ReconciliationUtils.updateForReconciliationError(deployment, err);
+                        eventRecorder.triggerEvent(
+                                deployment,
+                                EventRecorder.Type.Warning,
+                                EventRecorder.Reason.Missing,
+                                EventRecorder.Component.Job,
+                                err);
+                    }
                 };
     }
 
@@ -69,6 +100,7 @@ public class ApplicationObserver extends AbstractDeploymentObserver {
     protected void observeFlinkCluster(
             FlinkDeployment flinkApp, Context<?> context, Configuration deployedConfig) {
 
+        logger.debug("Observing application cluster");
         boolean jobFound =
                 jobStatusObserver.observe(
                         flinkApp,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
index f8f0010a..74054ea7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
@@ -43,6 +43,7 @@ public class SessionObserver extends AbstractDeploymentObserver {
             FlinkDeployment deployment, Context<?> context, Configuration deployedConfig) {
         // Check if session cluster can serve rest calls following our practice in JobObserver
         try {
+            logger.debug("Observing session cluster");
             flinkService.listJobs(deployedConfig);
             var rs = deployment.getStatus().getReconciliationStatus();
             if (rs.getState() == ReconciliationState.DEPLOYED) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 760bd327..17580dc0 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -36,6 +36,7 @@ import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.Preconditions;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -65,7 +66,8 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
         this.eventRecorder = eventRecorder;
     }
 
-    private JobStatusObserver<VoidObserverContext> getJobStatusObserver(FlinkService flinkService) {
+    private JobStatusObserver<FlinkSessionJob, VoidObserverContext> getJobStatusObserver(
+            FlinkService flinkService) {
         return new JobStatusObserver<>(flinkService, eventRecorder) {
             @Override
             protected void onTimeout(VoidObserverContext sessionJobObserverContext) {}
@@ -87,18 +89,64 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
                                 status.getJobId(), matchedList.size()));
 
                 if (matchedList.size() == 0) {
-                    LOG.info("No job found for JobID: {}", jobId);
+                    LOG.warn("No job found for JobID: {}", jobId);
                     return Optional.empty();
                 } else {
                     return Optional.of(matchedList.get(0));
                 }
             }
+
+            @Override
+            protected void onTargetJobNotFound(FlinkSessionJob resource, Configuration config) {
+                ifHaDisabledMarkSessionJobMissing(resource, config);
+            }
+
+            @Override
+            protected void onNoJobsFound(FlinkSessionJob resource, Configuration config) {
+                ifHaDisabledMarkSessionJobMissing(resource, config);
+            }
+
+            /**
+             * When HA is disabled the session job will not recover on JM restarts. If the JM goes
+             * down / restarted the session job should be marked missing.
+             *
+             * @param sessionJob Flink session job.
+             * @param conf Flink config.
+             */
+            private void ifHaDisabledMarkSessionJobMissing(
+                    FlinkSessionJob sessionJob, Configuration conf) {
+                if (HighAvailabilityMode.isHighAvailabilityModeActivated(conf)) {
+                    return;
+                }
+                sessionJob
+                        .getStatus()
+                        .getJobStatus()
+                        .setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                LOG.error(MISSING_SESSION_JOB_ERR);
+                ReconciliationUtils.updateForReconciliationError(
+                        sessionJob, MISSING_SESSION_JOB_ERR);
+                eventRecorder.triggerEvent(
+                        sessionJob,
+                        EventRecorder.Type.Warning,
+                        EventRecorder.Reason.Missing,
+                        EventRecorder.Component.Job,
+                        MISSING_SESSION_JOB_ERR);
+            }
         };
     }
 
     @Override
     public void observe(FlinkSessionJob flinkSessionJob, Context<?> context) {
-        if (flinkSessionJob.getStatus().getReconciliationStatus().isBeforeFirstDeployment()) {
+        var status = flinkSessionJob.getStatus();
+        var reconciliationStatus = status.getReconciliationStatus();
+
+        if (reconciliationStatus.isBeforeFirstDeployment()) {
+            LOG.debug("Skipping observe before first deployment");
+            return;
+        }
+
+        if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
+            LOG.debug("Skipping observe during rollback operation");
             return;
         }
 
@@ -112,10 +160,13 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
         var jobStatusObserver = getJobStatusObserver(flinkService);
         var deployedConfig =
                 configManager.getSessionJobConfig(flinkDepOpt.get(), flinkSessionJob.getSpec());
-        var reconciliationStatus = flinkSessionJob.getStatus().getReconciliationStatus();
+
+        // We are in the middle or possibly right after an upgrade
         if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+            // We must check if the upgrade went through without the status upgrade for some reason
             checkIfAlreadyUpgraded(flinkSessionJob, deployedConfig, flinkService);
             if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+                LOG.debug("Skipping observe before resource is deployed during upgrade");
                 ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(flinkSessionJob);
                 return;
             }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 7a364979..14ba9671 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -20,7 +20,6 @@ package org.apache.flink.kubernetes.operator.controller;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
@@ -28,6 +27,7 @@ import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
+import org.apache.flink.kubernetes.operator.utils.EventCollector;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
@@ -46,7 +46,6 @@ import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import io.javaoperatorsdk.operator.processing.event.source.EventSource;
 import org.junit.jupiter.api.Assertions;
 
-import java.util.LinkedList;
 import java.util.Map;
 import java.util.Queue;
 import java.util.function.BiConsumer;
@@ -123,15 +122,6 @@ public class TestingFlinkDeploymentController
         throw new UnsupportedOperationException();
     }
 
-    private static class EventCollector implements BiConsumer<AbstractFlinkResource<?, ?>, Event> {
-        private Queue<Event> events = new LinkedList<>();
-
-        @Override
-        public void accept(AbstractFlinkResource<?, ?> abstractFlinkResource, Event event) {
-            events.add(event);
-        }
-    }
-
     public Queue<Event> events() {
         return eventCollector.events;
     }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index 092fe899..17075ed6 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.observer.sessionjob;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
@@ -30,9 +31,11 @@ import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
+import org.apache.flink.kubernetes.operator.utils.EventCollector;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
@@ -40,6 +43,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.apache.commons.lang3.StringUtils;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -59,10 +63,12 @@ public class SessionJobObserverTest {
     private SessionJobObserver observer;
     private SessionJobReconciler reconciler;
 
+    private EventCollector eventCollector = new EventCollector();
+
     @BeforeEach
     public void before() {
         kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
-        var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+        var eventRecorder = new EventRecorder(kubernetesClient, eventCollector);
         var statusRecorder = new TestingStatusRecorder<FlinkSessionJob, FlinkSessionJobStatus>();
         flinkService = new TestingFlinkService();
         FlinkServiceFactory flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
@@ -137,6 +143,32 @@ public class SessionJobObserverTest {
         observer.observe(sessionJob, readyContext);
         Assertions.assertEquals(
                 JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState());
+
+        // test error behaviour if job not present
+        flinkService.clear();
+
+        eventCollector.events.clear();
+
+        // With HA enabled no error should be triggered
+        observer.observe(sessionJob2, readyContext);
+        Assertions.assertEquals(
+                JobStatus.RECONCILING.name(), sessionJob2.getStatus().getJobStatus().getState());
+        Assertions.assertTrue(StringUtils.isEmpty(sessionJob2.getStatus().getError()));
+        Assertions.assertTrue(eventCollector.events.isEmpty());
+
+        // With HA disabled we expect an error status and event
+        sessionJob2
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(HighAvailabilityOptions.HA_MODE.key(), "NONE");
+        observer.observe(sessionJob2, readyContext);
+        Assertions.assertEquals(
+                JobStatus.RECONCILING.name(), sessionJob2.getStatus().getJobStatus().getState());
+        Assertions.assertEquals(
+                JobStatusObserver.MISSING_SESSION_JOB_ERR, sessionJob2.getStatus().getError());
+        Assertions.assertEquals(
+                JobStatusObserver.MISSING_SESSION_JOB_ERR,
+                eventCollector.events.peek().getMessage());
     }
 
     @Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventCollector.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventCollector.java
new file mode 100644
index 00000000..b7db8d65
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventCollector.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+
+import io.fabric8.kubernetes.api.model.Event;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.function.BiConsumer;
+
+/** Simple consumer that collects triggered events for tests. */
+public class EventCollector implements BiConsumer<AbstractFlinkResource<?, ?>, Event> {
+
+    public final Queue<Event> events = new LinkedList<>();
+
+    @Override
+    public void accept(AbstractFlinkResource<?, ?> abstractFlinkResource, Event event) {
+        events.add(event);
+    }
+}