You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/09 10:51:49 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-27497] Track terminal job states in the observer

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 18fdd7e  [FLINK-27497] Track terminal job states in the observer
18fdd7e is described below

commit 18fdd7e8cd870bac7763a79d0af99c64d4e181a7
Author: Aitozi <yu...@alibaba-inc.com>
AuthorDate: Tue Jun 7 16:36:36 2022 +0800

    [FLINK-27497] Track terminal job states in the observer
---
 .../operator/observer/JobStatusObserver.java       | 114 +++++++++++++++---
 .../observer/context/VoidObserverContext.java      |   2 +-
 .../deployment/AbstractDeploymentObserver.java     |   1 +
 .../observer/deployment/ApplicationObserver.java   |  24 ++--
 .../observer/sessionjob/SessionJobObserver.java    |  23 ++--
 .../kubernetes/operator/service/FlinkService.java  |  14 +++
 .../kubernetes/operator/utils/EventUtils.java      |   3 +-
 .../kubernetes/operator/TestingClusterClient.java  |  12 +-
 .../kubernetes/operator/TestingFlinkService.java   |  29 +++++
 .../controller/DeploymentRecoveryTest.java         |   2 +-
 .../controller/FlinkDeploymentControllerTest.java  |   2 +-
 .../operator/controller/RollbackTest.java          |   2 +-
 .../deployment/ApplicationObserverTest.java        | 134 +++++++++++++++------
 .../sessionjob/SessionJobObserverTest.java         |   4 +-
 14 files changed, 273 insertions(+), 93 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 30df728..afc9582 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -18,8 +18,10 @@
 package org.apache.flink.kubernetes.operator.observer;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
 import org.slf4j.Logger;
@@ -34,6 +36,7 @@ import java.util.concurrent.TimeoutException;
 public abstract class JobStatusObserver<CTX> {
 
     private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+    private static final int MAX_ERROR_STRING_LENGTH = 512;
     private final FlinkService flinkService;
 
     public JobStatusObserver(FlinkService flinkService) {
@@ -43,11 +46,13 @@ public abstract class JobStatusObserver<CTX> {
     /**
      * Observe the status of the flink job.
      *
-     * @param jobStatus The job status to be observed.
+     * @param resource The custom resource to be observed.
      * @param deployedConfig Deployed job config.
      * @return If job found return true, otherwise return false.
      */
-    public boolean observe(JobStatus jobStatus, Configuration deployedConfig, CTX ctx) {
+    public boolean observe(
+            AbstractFlinkResource<?, ?> resource, Configuration deployedConfig, CTX ctx) {
+        var jobStatus = resource.getStatus().getJobStatus();
         LOG.info("Observing job status");
         var previousJobStatus = jobStatus.getState();
         List<JobStatusMessage> clusterJobStatuses;
@@ -63,19 +68,13 @@ public abstract class JobStatusObserver<CTX> {
         }
 
         if (!clusterJobStatuses.isEmpty()) {
-            Optional<String> targetJobStatus = updateJobStatus(jobStatus, clusterJobStatuses);
-            if (targetJobStatus.isEmpty()) {
+            Optional<JobStatusMessage> targetJobStatusMessage =
+                    filterTargetJob(jobStatus, clusterJobStatuses);
+            if (targetJobStatusMessage.isEmpty()) {
                 jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
                 return false;
             } else {
-                if (targetJobStatus.get().equals(previousJobStatus)) {
-                    LOG.info("Job status ({}) unchanged", previousJobStatus);
-                } else {
-                    LOG.info(
-                            "Job status successfully updated from {} to {}",
-                            previousJobStatus,
-                            targetJobStatus.get());
-                }
+                updateJobStatus(resource, targetJobStatusMessage.get(), deployedConfig);
             }
             return true;
         } else {
@@ -94,14 +93,95 @@ public abstract class JobStatusObserver<CTX> {
     protected abstract void onTimeout(CTX ctx);
 
     /**
-     * Find and update previous job status based on the job list from the cluster and return the
-     * target status.
+     * Filter the target job status message by the job list from the cluster.
      *
-     * @param status the target job status to be updated.
+     * @param status the target job status.
      * @param clusterJobStatuses the candidate cluster jobs.
-     * @return The target status of the job. If no matched job found, {@code Optional.empty()} will
+     * @return The target job status message. If no matched job found, {@code Optional.empty()} will
      *     be returned.
      */
-    protected abstract Optional<String> updateJobStatus(
+    protected abstract Optional<JobStatusMessage> filterTargetJob(
             JobStatus status, List<JobStatusMessage> clusterJobStatuses);
+
+    /**
+     * Update the status in CR according to the cluster job status.
+     *
+     * @param resource the target custom resource.
+     * @param clusterJobStatus the status fetch from the cluster.
+     * @param deployedConfig Deployed job config.
+     */
+    private void updateJobStatus(
+            AbstractFlinkResource<?, ?> resource,
+            JobStatusMessage clusterJobStatus,
+            Configuration deployedConfig) {
+        var jobStatus = resource.getStatus().getJobStatus();
+        var previousJobStatus = jobStatus.getState();
+
+        jobStatus.setState(clusterJobStatus.getJobState().name());
+        jobStatus.setJobName(clusterJobStatus.getJobName());
+        jobStatus.setJobId(clusterJobStatus.getJobId().toHexString());
+        jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
+
+        if (jobStatus.getState().equals(previousJobStatus)) {
+            LOG.info("Job status ({}) unchanged", previousJobStatus);
+        } else {
+            jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis()));
+            var message =
+                    previousJobStatus == null
+                            ? String.format("Job status changed to %s", jobStatus.getState())
+                            : String.format(
+                                    "Job status changed from %s to %s",
+                                    previousJobStatus, jobStatus.getState());
+            LOG.info(message);
+
+            setErrorIfPresent(resource, clusterJobStatus, deployedConfig);
+            EventUtils.createOrUpdateEvent(
+                    flinkService.getKubernetesClient(),
+                    resource,
+                    EventUtils.Type.Normal,
+                    "Status Changed",
+                    message,
+                    EventUtils.Component.Job);
+        }
+    }
+
+    private void setErrorIfPresent(
+            AbstractFlinkResource<?, ?> resource,
+            JobStatusMessage clusterJobStatus,
+            Configuration deployedConfig) {
+        if (clusterJobStatus.getJobState() == org.apache.flink.api.common.JobStatus.FAILED) {
+            try {
+                var result =
+                        flinkService.requestJobResult(deployedConfig, clusterJobStatus.getJobId());
+                result.getSerializedThrowable()
+                        .ifPresent(
+                                t -> {
+                                    var error = t.getFullStringifiedStackTrace();
+                                    var trimmedError = getErrorWithMaxLength(error);
+                                    trimmedError.ifPresent(
+                                            value -> {
+                                                if (!value.equals(
+                                                        resource.getStatus().getError())) {
+                                                    resource.getStatus().setError(value);
+                                                    LOG.error(
+                                                            "Job {} failed with error: {}",
+                                                            clusterJobStatus.getJobId(),
+                                                            error);
+                                                }
+                                            });
+                                });
+            } catch (Exception e) {
+                LOG.warn("Failed to request the job result", e);
+            }
+        }
+    }
+
+    private Optional<String> getErrorWithMaxLength(String error) {
+        if (error == null) {
+            return Optional.empty();
+        } else {
+            return Optional.of(
+                    error.substring(0, Math.min(error.length(), MAX_ERROR_STRING_LENGTH)));
+        }
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/context/VoidObserverContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/context/VoidObserverContext.java
index 085638e..3ed3940 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/context/VoidObserverContext.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/context/VoidObserverContext.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.observer.context;
 
-/** A empty observer context. */
+/** An empty observer context. */
 public class VoidObserverContext {
     public static final VoidObserverContext INSTANCE = new VoidObserverContext();
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index c17647f..7711c4c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -217,6 +217,7 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
         FlinkDeploymentStatus status = dep.getStatus();
         ReconciliationStatus reconciliationStatus = status.getReconciliationStatus();
         if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.ERROR
+                && !JobStatus.FAILED.name().equals(dep.getStatus().getJobStatus().getState())
                 && reconciliationStatus.isLastReconciledSpecStable()) {
             status.setError(null);
         }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index f74263b..dac96a5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -42,7 +42,7 @@ import static org.apache.flink.api.common.JobStatus.RUNNING;
 /** The observer of {@link org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
 public class ApplicationObserver extends AbstractDeploymentObserver {
 
-    private final SavepointObserver savepointObserver;
+    private final SavepointObserver<FlinkDeploymentStatus> savepointObserver;
     private final JobStatusObserver<ApplicationObserverContext> jobStatusObserver;
 
     public ApplicationObserver(
@@ -51,7 +51,7 @@ public class ApplicationObserver extends AbstractDeploymentObserver {
             FlinkConfigManager configManager,
             StatusHelper<FlinkDeploymentStatus> statusHelper) {
         super(kubernetesClient, flinkService, configManager);
-        this.savepointObserver = new SavepointObserver(flinkService, configManager, statusHelper);
+        this.savepointObserver = new SavepointObserver<>(flinkService, configManager, statusHelper);
         this.jobStatusObserver =
                 new JobStatusObserver<>(flinkService) {
                     @Override
@@ -60,18 +60,12 @@ public class ApplicationObserver extends AbstractDeploymentObserver {
                     }
 
                     @Override
-                    protected Optional<String> updateJobStatus(
+                    protected Optional<JobStatusMessage> filterTargetJob(
                             JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
-                        clusterJobStatuses.sort(
-                                (j1, j2) -> Long.compare(j2.getStartTime(), j1.getStartTime()));
-                        JobStatusMessage newJob = clusterJobStatuses.get(0);
-
-                        status.setState(newJob.getJobState().name());
-                        status.setJobName(newJob.getJobName());
-                        status.setJobId(newJob.getJobId().toHexString());
-                        status.setStartTime(String.valueOf(newJob.getStartTime()));
-                        status.setUpdateTime(String.valueOf(System.currentTimeMillis()));
-                        return Optional.of(status.getState());
+                        if (!clusterJobStatuses.isEmpty()) {
+                            return Optional.of(clusterJobStatuses.get(0));
+                        }
+                        return Optional.empty();
                     }
                 };
     }
@@ -80,11 +74,9 @@ public class ApplicationObserver extends AbstractDeploymentObserver {
     protected boolean observeFlinkCluster(
             FlinkDeployment flinkApp, Context context, Configuration deployedConfig) {
 
-        var jobStatus = flinkApp.getStatus().getJobStatus();
-
         boolean jobFound =
                 jobStatusObserver.observe(
-                        jobStatus,
+                        flinkApp,
                         deployedConfig,
                         new ApplicationObserverContext(flinkApp, context, deployedConfig));
         if (jobFound) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index a0f6c73..7d16a6a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -47,7 +47,7 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
     private static final Logger LOG = LoggerFactory.getLogger(SessionJobObserver.class);
     private final FlinkService flinkService;
     private final FlinkConfigManager configManager;
-    private final SavepointObserver savepointObserver;
+    private final SavepointObserver<FlinkSessionJobStatus> savepointObserver;
     private final JobStatusObserver<VoidObserverContext> jobStatusObserver;
 
     public SessionJobObserver(
@@ -56,14 +56,14 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
             StatusHelper<FlinkSessionJobStatus> statusHelper) {
         this.flinkService = flinkService;
         this.configManager = configManager;
-        this.savepointObserver = new SavepointObserver(flinkService, configManager, statusHelper);
+        this.savepointObserver = new SavepointObserver<>(flinkService, configManager, statusHelper);
         this.jobStatusObserver =
                 new JobStatusObserver<>(flinkService) {
                     @Override
-                    protected void onTimeout(VoidObserverContext voidObserverContext) {}
+                    protected void onTimeout(VoidObserverContext sessionJobObserverContext) {}
 
                     @Override
-                    protected Optional<String> updateJobStatus(
+                    protected Optional<JobStatusMessage> filterTargetJob(
                             JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
                         var jobId =
                                 Preconditions.checkNotNull(
@@ -82,15 +82,9 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
                         if (matchedList.size() == 0) {
                             LOG.info("No job found for JobID: {}", jobId);
                             return Optional.empty();
+                        } else {
+                            return Optional.of(matchedList.get(0));
                         }
-
-                        JobStatusMessage newJob = matchedList.get(0);
-
-                        status.setState(newJob.getJobState().name());
-                        status.setJobName(newJob.getJobName());
-                        status.setStartTime(String.valueOf(newJob.getStartTime()));
-                        status.setUpdateTime(String.valueOf(System.currentTimeMillis()));
-                        return Optional.of(status.getState());
                     }
                 };
     }
@@ -115,9 +109,8 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
         var deployedConfig = configManager.getSessionJobConfig(flinkDepOpt.get(), flinkSessionJob);
         var jobFound =
                 jobStatusObserver.observe(
-                        flinkSessionJob.getStatus().getJobStatus(),
-                        deployedConfig,
-                        VoidObserverContext.INSTANCE);
+                        flinkSessionJob, deployedConfig, VoidObserverContext.INSTANCE);
+
         if (jobFound) {
             savepointObserver.observeSavepointStatus(flinkSessionJob, deployedConfig);
         }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index c6731c0..2ba73e0 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -53,6 +53,7 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
 import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.rest.FileUpload;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
@@ -340,6 +341,19 @@ public class FlinkService {
         }
     }
 
+    public JobResult requestJobResult(Configuration conf, JobID jobID) throws Exception {
+        try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
+            return clusterClient
+                    .requestJobResult(jobID)
+                    .get(
+                            configManager
+                                    .getOperatorConfiguration()
+                                    .getFlinkClientTimeout()
+                                    .getSeconds(),
+                            TimeUnit.SECONDS);
+        }
+    }
+
     @VisibleForTesting
     protected ClusterClient<String> getClusterClient(Configuration config) throws Exception {
         final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
index aad4375..f710378 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
@@ -39,7 +39,8 @@ public class EventUtils {
     /** The component of events. */
     public enum Component {
         Operator,
-        JobManagerDeployment
+        JobManagerDeployment,
+        Job
     }
 
     public static String generateEventName(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
index 730d99b..1175461 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingClusterClient.java
@@ -71,6 +71,11 @@ public class TestingClusterClient<T> extends RestClusterClient<T> {
                 throw new UnsupportedOperationException();
             };
 
+    private Function<JobID, CompletableFuture<JobResult>> requestResultFunction =
+            jobID ->
+                    CompletableFuture.completedFuture(
+                            new JobResult.Builder().jobId(jobID).netRuntime(1).build());
+
     private final Configuration configuration;
     private final T clusterId;
 
@@ -109,6 +114,11 @@ public class TestingClusterClient<T> extends RestClusterClient<T> {
         this.listJobsFunction = listJobsFunction;
     }
 
+    public void setRequestResultFunction(
+            Function<JobID, CompletableFuture<JobResult>> requestResultFunction) {
+        this.requestResultFunction = requestResultFunction;
+    }
+
     @Override
     public T getClusterId() {
         return clusterId;
@@ -151,7 +161,7 @@ public class TestingClusterClient<T> extends RestClusterClient<T> {
 
     @Override
     public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
-        throw new UnsupportedOperationException();
+        return requestResultFunction.apply(jobId);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index ac21037..c1b4e8e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -42,8 +42,10 @@ import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
+import org.apache.flink.util.SerializedThrowable;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.PodList;
@@ -82,6 +84,7 @@ public class TestingFlinkService extends FlinkService {
     private int triggerCounter = 0;
 
     private final List<Tuple2<String, JobStatusMessage>> jobs = new ArrayList<>();
+    private final Map<JobID, String> jobErrors = new HashMap<>();
     private final Map<JobID, SubmittedJobInfo> sessionJobs = new HashMap<>();
     private final Set<String> sessions = new HashSet<>();
     private boolean isPortReady = true;
@@ -299,6 +302,17 @@ public class TestingFlinkService extends FlinkService {
                     }
                     return CompletableFuture.completedFuture(Acknowledge.get());
                 });
+
+        clusterClient.setRequestResultFunction(
+                jobID -> {
+                    var builder = new JobResult.Builder().jobId(jobID).netRuntime(1);
+                    if (jobErrors.containsKey(jobID)) {
+                        builder.serializedThrowable(
+                                new SerializedThrowable(
+                                        new RuntimeException(jobErrors.get(jobID))));
+                    }
+                    return CompletableFuture.completedFuture(builder.build());
+                });
         return clusterClient;
     }
 
@@ -389,6 +403,21 @@ public class TestingFlinkService extends FlinkService {
         this.podList = podList;
     }
 
+    public void markApplicationJobFailedWithError(JobID jobID, String error) throws Exception {
+        var job = jobs.stream().filter(tuple -> tuple.f1.getJobId().equals(jobID)).findFirst();
+        if (!job.isPresent()) {
+            throw new Exception("The target job missed");
+        }
+        var oldStatus = job.get().f1;
+        job.get().f1 =
+                new JobStatusMessage(
+                        oldStatus.getJobId(),
+                        oldStatus.getJobName(),
+                        JobStatus.FAILED,
+                        oldStatus.getStartTime());
+        jobErrors.put(jobID, error);
+    }
+
     /** The information collector of a submitted job. */
     public static class SubmittedJobInfo {
         public final String savepointPath;
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
index 372aa75..a9d59fe 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
@@ -57,7 +57,7 @@ public class DeploymentRecoveryTest {
 
     @BeforeEach
     public void setup() {
-        flinkService = new TestingFlinkService();
+        flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
         testController =
                 TestUtils.createTestController(configManager, kubernetesClient, flinkService);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 8a76aa1..7b0c0c7 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -78,7 +78,7 @@ public class FlinkDeploymentControllerTest {
 
     @BeforeEach
     public void setup() {
-        flinkService = new TestingFlinkService();
+        flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
         testController =
                 TestUtils.createTestController(configManager, kubernetesClient, flinkService);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
index 4774e3b..fa216e4 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
@@ -64,7 +64,7 @@ public class RollbackTest {
 
     @BeforeEach
     public void setup() {
-        flinkService = new TestingFlinkService();
+        flinkService = new TestingFlinkService(kubernetesClient);
         context = flinkService.getContext();
         testController =
                 TestUtils.createTestController(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 104801c..53353db 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.observer.deployment;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
@@ -37,10 +38,12 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -57,10 +60,10 @@ public class ApplicationObserverTest {
 
     @Test
     public void observeApplicationCluster() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService();
+        TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
         ApplicationObserver observer =
                 new ApplicationObserver(
-                        null, flinkService, configManager, new TestingStatusHelper<>());
+                        kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         deployment.setStatus(new FlinkDeploymentStatus());
 
@@ -162,12 +165,80 @@ public class ApplicationObserverTest {
         assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
     }
 
+    @Test
+    public void testEventGeneratedWhenStatusChanged() throws Exception {
+        TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        Configuration conf =
+                configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
+        ApplicationObserver observer =
+                new ApplicationObserver(
+                        kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
+        flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
+
+        deployment.setStatus(new FlinkDeploymentStatus());
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .serializeAndSetLastReconciledSpec(deployment.getSpec());
+        deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+
+        observer.observe(deployment, readyContext);
+        var eventList =
+                kubernetesClient
+                        .v1()
+                        .events()
+                        .inNamespace(deployment.getMetadata().getNamespace())
+                        .list()
+                        .getItems();
+        Assertions.assertEquals(1, eventList.size());
+        Assertions.assertEquals("Job status changed to RUNNING", eventList.get(0).getMessage());
+        observer.observe(deployment, readyContext);
+        Assertions.assertEquals(
+                1,
+                kubernetesClient
+                        .v1()
+                        .events()
+                        .inNamespace(deployment.getMetadata().getNamespace())
+                        .list()
+                        .getItems()
+                        .size());
+    }
+
+    @Test
+    public void testErrorForwardToStatusWhenJobFailed() throws Exception {
+        TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        Configuration conf =
+                configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
+        ApplicationObserver observer =
+                new ApplicationObserver(
+                        kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
+        flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
+
+        deployment.setStatus(new FlinkDeploymentStatus());
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .serializeAndSetLastReconciledSpec(deployment.getSpec());
+        deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+
+        observer.observe(deployment, readyContext);
+        Assertions.assertEquals(1, flinkService.getRunningCount());
+        flinkService.markApplicationJobFailedWithError(
+                JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId()),
+                "Job failed");
+        observer.observe(deployment, readyContext);
+        Assertions.assertEquals(0, flinkService.getRunningCount());
+        Assertions.assertTrue(deployment.getStatus().getError().contains("Job failed"));
+    }
+
     @Test
     public void observeSavepoint() throws Exception {
         TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
         ApplicationObserver observer =
                 new ApplicationObserver(
-                        null, flinkService, configManager, new TestingStatusHelper<>());
+                        kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         Configuration conf =
                 configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
@@ -191,13 +262,12 @@ public class ApplicationObserverTest {
         // savepoint error within grace period
         assertEquals(
                 0,
-                kubernetesClient
-                        .v1()
-                        .events()
-                        .inNamespace(deployment.getMetadata().getNamespace())
-                        .list()
-                        .getItems()
-                        .size());
+                (int)
+                        kubernetesClient.v1().events()
+                                .inNamespace(deployment.getMetadata().getNamespace()).list()
+                                .getItems().stream()
+                                .filter(e -> e.getReason().contains("SavepointError"))
+                                .count());
         observer.observe(deployment, readyContext);
         assertFalse(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
         assertEquals(
@@ -226,21 +296,16 @@ public class ApplicationObserverTest {
         assertFalse(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
         assertEquals(
                 1,
-                kubernetesClient
-                        .v1()
-                        .events()
-                        .inNamespace(deployment.getMetadata().getNamespace())
-                        .list()
-                        .getItems()
-                        .size());
+                kubernetesClient.v1().events().inNamespace(deployment.getMetadata().getNamespace())
+                        .list().getItems().stream()
+                        .filter(e -> e.getReason().contains("SavepointError"))
+                        .count());
         assertEquals(
                 1,
-                kubernetesClient
-                        .v1()
-                        .events()
-                        .inNamespace(deployment.getMetadata().getNamespace())
-                        .list()
-                        .getItems()
+                kubernetesClient.v1().events().inNamespace(deployment.getMetadata().getNamespace())
+                        .list().getItems().stream()
+                        .filter(e -> e.getReason().contains("SavepointError"))
+                        .collect(Collectors.toList())
                         .get(0)
                         .getCount());
 
@@ -314,21 +379,16 @@ public class ApplicationObserverTest {
 
         assertEquals(
                 1,
-                kubernetesClient
-                        .v1()
-                        .events()
-                        .inNamespace(deployment.getMetadata().getNamespace())
-                        .list()
-                        .getItems()
-                        .size());
+                kubernetesClient.v1().events().inNamespace(deployment.getMetadata().getNamespace())
+                        .list().getItems().stream()
+                        .filter(e -> e.getReason().contains("SavepointError"))
+                        .count());
         assertEquals(
                 2,
-                kubernetesClient
-                        .v1()
-                        .events()
-                        .inNamespace(deployment.getMetadata().getNamespace())
-                        .list()
-                        .getItems()
+                kubernetesClient.v1().events().inNamespace(deployment.getMetadata().getNamespace())
+                        .list().getItems().stream()
+                        .filter(e -> e.getReason().contains("SavepointError"))
+                        .collect(Collectors.toList())
                         .get(0)
                         .getCount());
 
@@ -379,7 +439,7 @@ public class ApplicationObserverTest {
         TestingFlinkService flinkService = new TestingFlinkService();
         ApplicationObserver observer =
                 new ApplicationObserver(
-                        null, flinkService, configManager, new TestingStatusHelper<>());
+                        kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         bringToReadyStatus(deployment);
         observer.observe(deployment, readyContext);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index 68a0c36..766c35c 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -45,7 +45,7 @@ public class SessionJobObserverTest {
     @Test
     public void testBasicObserve() throws Exception {
         final var sessionJob = TestUtils.buildSessionJob();
-        final var flinkService = new TestingFlinkService();
+        final var flinkService = new TestingFlinkService(kubernetesClient);
         final var reconciler = new FlinkSessionJobReconciler(null, flinkService, configManager);
         final var observer =
                 new SessionJobObserver(flinkService, configManager, new TestingStatusHelper<>());
@@ -106,7 +106,7 @@ public class SessionJobObserverTest {
     @Test
     public void testObserveWithEffectiveConfig() throws Exception {
         final var sessionJob = TestUtils.buildSessionJob();
-        final var flinkService = new TestingFlinkService();
+        final var flinkService = new TestingFlinkService(kubernetesClient);
         final var reconciler = new FlinkSessionJobReconciler(null, flinkService, configManager);
         final var observer =
                 new SessionJobObserver(flinkService, configManager, new TestingStatusHelper<>());