You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/20 07:38:37 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-27688] Add pluggable FlinkResourceListener interface

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 869c029  [FLINK-27688] Add pluggable FlinkResourceListener interface
869c029 is described below

commit 869c029dc0662299ce34d1570510e65f4fe3794d
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu May 19 12:24:21 2022 +0200

    [FLINK-27688] Add pluggable FlinkResourceListener interface
---
 docs/content/docs/operations/listeners.md          |  41 ++++++
 docs/content/docs/operations/validator.md          |  18 +--
 flink-kubernetes-operator/pom.xml                  |   6 +-
 .../flink/kubernetes/operator/FlinkOperator.java   |  43 ++++---
 .../controller/FlinkDeploymentController.java      |  27 ++--
 .../controller/FlinkSessionJobController.java      |  18 +--
 .../operator/listener/FlinkResourceListener.java   |  65 ++++++++++
 .../operator/listener/ListenerUtils.java           | 137 +++++++++++++++++++++
 .../operator/observer/JobStatusObserver.java       |   8 +-
 .../operator/observer/SavepointObserver.java       |  17 +--
 .../deployment/AbstractDeploymentObserver.java     |  15 ++-
 .../observer/deployment/ApplicationObserver.java   |  15 +--
 .../observer/deployment/ObserverFactory.java       |  22 ++--
 .../observer/deployment/SessionObserver.java       |   8 +-
 .../observer/sessionjob/SessionJobObserver.java    |  18 +--
 .../operator/reconciler/ReconciliationUtils.java   |  10 +-
 .../deployment/AbstractDeploymentReconciler.java   |   6 +-
 .../deployment/ApplicationReconciler.java          |   6 +-
 .../reconciler/deployment/ReconcilerFactory.java   |  10 +-
 .../reconciler/deployment/SessionReconciler.java   |   9 +-
 .../kubernetes/operator/utils/EventRecorder.java   |  97 +++++++++++++++
 .../kubernetes/operator/utils/EventUtils.java      |   7 +-
 .../kubernetes/operator/utils/SavepointUtils.java  |   6 +-
 .../{StatusHelper.java => StatusRecorder.java}     |  74 +++++++++--
 ...ator-assembly.xml => test-plugins-assembly.xml} |   1 +
 .../flink/kubernetes/operator/TestUtils.java       |  35 +++++-
 ...tatusHelper.java => TestingStatusRecorder.java} |  14 +--
 .../listener/FlinkResourceListenerTest.java        | 114 +++++++++++++++++
 .../operator/listener/ListenerUtilsTest.java       |  73 +++++++++++
 .../operator/listener/TestingListener.java         |  71 +++++++++++
 .../operator/observer/SavepointObserverTest.java   |  27 ++--
 .../deployment/ApplicationObserverTest.java        |  34 ++---
 .../observer/deployment/SessionObserverTest.java   |   4 +-
 .../sessionjob/SessionJobObserverTest.java         |  30 +++--
 .../deployment/ApplicationReconcilerTest.java      |  37 ++----
 .../deployment/SessionReconcilerTest.java          |   5 +-
 .../kubernetes/operator/utils/EventUtilsTest.java  |   6 +-
 ...atusHelperTest.java => StatusRecorderTest.java} |   6 +-
 .../operator/utils/ValidatorUtilsTest.java         |  27 ++--
 ...ernetes.operator.listener.FlinkResourceListener |  16 +++
 40 files changed, 945 insertions(+), 238 deletions(-)

diff --git a/docs/content/docs/operations/listeners.md b/docs/content/docs/operations/listeners.md
new file mode 100644
index 0000000..755a725
--- /dev/null
+++ b/docs/content/docs/operations/listeners.md
@@ -0,0 +1,41 @@
+---
+title: "Custom Listeners"
+weight: 5
+type: docs
+aliases:
+- /operations/listeners.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Custom Flink Resource Listeners
+
+The Flink Kubernetes Operator allows users to listen to events and status updates triggered for the Flink Resources managed by the operator.
+This feature enables tighter integration with the user's own data platform.
+
+By implementing the `FlinkResourceListener` interface users can listen to both events and status updates per resource type (`FlinkDeployment` / `FlinkSessionJob`). These methods will be called after the respective events have been triggered by the system.
+Using the context provided on each listener method users can also get access to the related Flink resource and the `KubernetesClient` itself in order to trigger any further events etc on demand.
+
+Similar to [custom validator implementations]({{< ref "docs/operations/validator" >}}) resource listeners are loaded via the Flink [Plugins](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins) mechanism.
+
+In order to enable your custom `FlinkResourceListener` you need to:
+
+ 1. Implement the interface
+ 2. Add your listener class to `org.apache.flink.kubernetes.operator.listener.FlinkResourceListener` in `META-INF/services`
+ 3. Package your JAR and add it to the plugins directory of your operator image (`/opt/flink/plugins`)
diff --git a/docs/content/docs/operations/validator.md b/docs/content/docs/operations/validator.md
index 587cc63..316965e 100644
--- a/docs/content/docs/operations/validator.md
+++ b/docs/content/docs/operations/validator.md
@@ -1,5 +1,5 @@
 ---
-title: "Validator"
+title: "Custom Resource Validators"
 weight: 5
 type: docs
 aliases:
@@ -26,21 +26,21 @@ under the License.
 
 # Custom `FlinkResourceValidator` implementation
 
-`FlinkResourceValidator`, an interface for validating the resources of `FlinkDeployment` and `FlinkSessionJob`,  is a pluggable component based on the [Plugins](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins) mechanism. During development, we can customize the implementation of `FlinkResourceValidator` and make sure to retain the service definition in `META-INF/services`. 
+`FlinkResourceValidator`, an interface for validating the resources of `FlinkDeployment` and `FlinkSessionJob`,  is a pluggable component based on the [Plugins](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins) mechanism. During development, we can customize the implementation of `FlinkResourceValidator` and make sure to retain the service definition in `META-INF/services`.
 The following steps demonstrate how to develop and use a custom validator.
 
 1. Implement `FlinkResourceValidator` interface:
     ```java
     package org.apache.flink.kubernetes.operator.validation;
-   
+
     import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
     import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
-   
+
     import java.util.Optional;
-   
+
     /** Custom validator implementation of {@link FlinkResourceValidator}. */
     public class CustomValidator implements FlinkResourceValidator {
-   
+
         @Override
         public Optional<String> validateDeployment(FlinkDeployment deployment) {
             if (deployment.getSpec().getFlinkVersion() == null) {
@@ -48,7 +48,7 @@ The following steps demonstrate how to develop and use a custom validator.
             }
             return Optional.empty();
         }
-   
+
         @Override
         public Optional<String> validateSessionJob(
                  FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
@@ -67,7 +67,7 @@ The following steps demonstrate how to develop and use a custom validator.
 
 3. Use the Maven tool to package the project and generate the custom validator JAR.
 
-4. Create Dockerfile to build a custom image from the `apache/flink-kubernetes-operator` official image and copy the generated JAR to custom validator plugin directory. 
+4. Create Dockerfile to build a custom image from the `apache/flink-kubernetes-operator` official image and copy the generated JAR to custom validator plugin directory.
     `/opt/flink/plugins` is the value of `FLINK_PLUGINS_DIR` environment variable in the flink-kubernetes-operator helm chart. The structure of custom validator directory under `/opt/flink/plugins` is as follows:
     ```text
     /opt/flink/plugins
@@ -75,7 +75,7 @@ The following steps demonstrate how to develop and use a custom validator.
         │   ├── custom-validator.jar
         └── ...
     ```
-    
+
     With the custom validator directory location, the Dockerfile is defined as follows:
     ```shell script
     FROM apache/flink-kubernetes-operator
diff --git a/flink-kubernetes-operator/pom.xml b/flink-kubernetes-operator/pom.xml
index e6f60b9..2dd3eb7 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -277,16 +277,16 @@ under the License.
                 <artifactId>maven-assembly-plugin</artifactId>
                 <executions>
                     <execution>
-                        <id>create-test-validator-jar</id>
+                        <id>create-test-plugin-jar</id>
                         <phase>process-test-classes</phase>
                         <goals>
                             <goal>single</goal>
                         </goals>
                         <configuration>
-                            <finalName>test-validator</finalName>
+                            <finalName>test-plugins</finalName>
                             <attach>false</attach>
                             <descriptors>
-                                <descriptor>src/test/assembly/test-validator-assembly.xml</descriptor>
+                                <descriptor>src/test/assembly/test-plugins-assembly.xml</descriptor>
                             </descriptors>
                         </configuration>
                     </execution>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 507ab62..d5313d4 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -24,20 +24,20 @@ import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
-import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+import org.apache.flink.kubernetes.operator.listener.ListenerUtils;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
-import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
 import org.apache.flink.kubernetes.operator.observer.sessionjob.SessionJobObserver;
-import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.reconciler.sessionjob.FlinkSessionJobReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
 import org.apache.flink.metrics.MetricGroup;
@@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
@@ -70,6 +71,7 @@ public class FlinkOperator {
     private final Set<FlinkResourceValidator> validators;
     private final Set<RegisteredController> registeredControllers = new HashSet<>();
     private final MetricGroup metricGroup;
+    private final Collection<FlinkResourceListener> listeners;
 
     public FlinkOperator(@Nullable Configuration conf) {
         this.client = new DefaultKubernetesClient();
@@ -80,6 +82,7 @@ public class FlinkOperator {
         this.operator = new Operator(client, this::overrideOperatorConfigs);
         this.flinkService = new FlinkService(client, configManager);
         this.validators = ValidatorUtils.discoverValidators(configManager);
+        this.listeners = ListenerUtils.discoverListeners(configManager);
         this.metricGroup =
                 OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
         PluginManager pluginManager =
@@ -109,13 +112,14 @@ public class FlinkOperator {
     }
 
     private void registerDeploymentController() {
-        StatusHelper<FlinkDeploymentStatus> statusHelper = new StatusHelper<>(client);
-        ReconcilerFactory reconcilerFactory =
-                new ReconcilerFactory(client, flinkService, configManager);
-        ObserverFactory observerFactory =
-                new ObserverFactory(client, flinkService, configManager, statusHelper);
-
-        FlinkDeploymentController controller =
+        var statusRecorder = StatusRecorder.<FlinkDeploymentStatus>create(client, listeners);
+        var eventRecorder = EventRecorder.create(client, listeners);
+        var reconcilerFactory =
+                new ReconcilerFactory(client, flinkService, configManager, eventRecorder);
+        var observerFactory =
+                new ObserverFactory(flinkService, configManager, statusRecorder, eventRecorder);
+
+        var controller =
                 new FlinkDeploymentController(
                         configManager,
                         client,
@@ -123,24 +127,25 @@ public class FlinkOperator {
                         reconcilerFactory,
                         observerFactory,
                         new MetricManager<>(metricGroup),
-                        statusHelper);
+                        statusRecorder,
+                        eventRecorder);
         registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
     }
 
     private void registerSessionJobController() {
-        Reconciler<FlinkSessionJob> reconciler =
-                new FlinkSessionJobReconciler(client, flinkService, configManager);
-        StatusHelper<FlinkSessionJobStatus> statusHelper = new StatusHelper<>(client);
-        Observer<FlinkSessionJob> observer =
-                new SessionJobObserver(flinkService, configManager, statusHelper);
-        FlinkSessionJobController controller =
+        var reconciler = new FlinkSessionJobReconciler(client, flinkService, configManager);
+        var eventRecorder = EventRecorder.create(client, listeners);
+        var statusRecorder = StatusRecorder.<FlinkSessionJobStatus>create(client, listeners);
+        var observer =
+                new SessionJobObserver(flinkService, configManager, statusRecorder, eventRecorder);
+        var controller =
                 new FlinkSessionJobController(
                         configManager,
                         validators,
                         reconciler,
                         observer,
                         new MetricManager<>(metricGroup),
-                        statusHelper);
+                        statusRecorder);
 
         registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
     }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 439024c..df69871 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -29,9 +29,10 @@ import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
 import org.apache.flink.kubernetes.operator.utils.EventUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -70,7 +71,8 @@ public class FlinkDeploymentController
     private final ReconcilerFactory reconcilerFactory;
     private final ObserverFactory observerFactory;
     private final MetricManager<FlinkDeployment> metricManager;
-    private final StatusHelper<FlinkDeploymentStatus> statusHelper;
+    private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
+    private final EventRecorder eventRecorder;
     private final ConcurrentHashMap<Tuple2<String, String>, FlinkDeploymentStatus> statusCache =
             new ConcurrentHashMap<>();
 
@@ -81,27 +83,29 @@ public class FlinkDeploymentController
             ReconcilerFactory reconcilerFactory,
             ObserverFactory observerFactory,
             MetricManager<FlinkDeployment> metricManager,
-            StatusHelper<FlinkDeploymentStatus> statusHelper) {
+            StatusRecorder<FlinkDeploymentStatus> statusRecorder,
+            EventRecorder eventRecorder) {
         this.configManager = configManager;
         this.kubernetesClient = kubernetesClient;
         this.validators = validators;
         this.reconcilerFactory = reconcilerFactory;
         this.observerFactory = observerFactory;
         this.metricManager = metricManager;
-        this.statusHelper = statusHelper;
+        this.statusRecorder = statusRecorder;
+        this.eventRecorder = eventRecorder;
     }
 
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
         LOG.info("Deleting FlinkDeployment");
-        statusHelper.updateStatusFromCache(flinkApp);
+        statusRecorder.updateStatusFromCache(flinkApp);
         try {
             observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
         } catch (DeploymentFailedException dfe) {
             // ignore during cleanup
         }
         metricManager.onRemove(flinkApp);
-        statusHelper.removeCachedStatus(flinkApp);
+        statusRecorder.removeCachedStatus(flinkApp);
         return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, context);
     }
 
@@ -109,13 +113,13 @@ public class FlinkDeploymentController
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context)
             throws Exception {
         LOG.info("Starting reconciliation");
-        statusHelper.updateStatusFromCache(flinkApp);
+        statusRecorder.updateStatusFromCache(flinkApp);
         FlinkDeployment previousDeployment = ReconciliationUtils.clone(flinkApp);
         try {
             observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
             if (!validateDeployment(flinkApp)) {
                 metricManager.onUpdate(flinkApp);
-                statusHelper.patchAndCacheStatus(flinkApp);
+                statusRecorder.patchAndCacheStatus(flinkApp);
                 return ReconciliationUtils.toUpdateControl(
                         configManager.getOperatorConfiguration(),
                         flinkApp,
@@ -131,7 +135,7 @@ public class FlinkDeploymentController
 
         LOG.info("End of reconciliation");
         metricManager.onUpdate(flinkApp);
-        statusHelper.patchAndCacheStatus(flinkApp);
+        statusRecorder.patchAndCacheStatus(flinkApp);
         return ReconciliationUtils.toUpdateControl(
                 configManager.getOperatorConfiguration(), flinkApp, previousDeployment, true);
     }
@@ -141,8 +145,7 @@ public class FlinkDeploymentController
         flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
         flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
         ReconciliationUtils.updateForReconciliationError(flinkApp, dfe.getMessage());
-        EventUtils.createOrUpdateEvent(
-                kubernetesClient,
+        eventRecorder.triggerEvent(
                 flinkApp,
                 EventUtils.Type.Warning,
                 dfe.getReason(),
@@ -162,7 +165,7 @@ public class FlinkDeploymentController
     public ErrorStatusUpdateControl<FlinkDeployment> updateErrorStatus(
             FlinkDeployment flinkDeployment, Context<FlinkDeployment> context, Exception e) {
         return ReconciliationUtils.toErrorStatusUpdateControl(
-                flinkDeployment, context.getRetryInfo(), e, metricManager, statusHelper);
+                flinkDeployment, context.getRetryInfo(), e, metricManager, statusRecorder);
     }
 
     private boolean validateDeployment(FlinkDeployment deployment) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index a4b21f4..30ff1ea 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -27,7 +27,7 @@ import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
 
 import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
@@ -62,7 +62,7 @@ public class FlinkSessionJobController
     private final Reconciler<FlinkSessionJob> reconciler;
     private final Observer<FlinkSessionJob> observer;
     private final MetricManager<FlinkSessionJob> metricManager;
-    private final StatusHelper<FlinkSessionJobStatus> statusHelper;
+    private final StatusRecorder<FlinkSessionJobStatus> statusRecorder;
 
     public FlinkSessionJobController(
             FlinkConfigManager configManager,
@@ -70,13 +70,13 @@ public class FlinkSessionJobController
             Reconciler<FlinkSessionJob> reconciler,
             Observer<FlinkSessionJob> observer,
             MetricManager<FlinkSessionJob> metricManager,
-            StatusHelper<FlinkSessionJobStatus> statusHelper) {
+            StatusRecorder<FlinkSessionJobStatus> statusRecorder) {
         this.configManager = configManager;
         this.validators = validators;
         this.reconciler = reconciler;
         this.observer = observer;
         this.metricManager = metricManager;
-        this.statusHelper = statusHelper;
+        this.statusRecorder = statusRecorder;
     }
 
     @Override
@@ -84,13 +84,13 @@ public class FlinkSessionJobController
             FlinkSessionJob flinkSessionJob, Context context) {
         LOG.info("Starting reconciliation");
 
-        statusHelper.updateStatusFromCache(flinkSessionJob);
+        statusRecorder.updateStatusFromCache(flinkSessionJob);
         FlinkSessionJob previousJob = ReconciliationUtils.clone(flinkSessionJob);
 
         observer.observe(flinkSessionJob, context);
         if (!validateSessionJob(flinkSessionJob, context)) {
             metricManager.onUpdate(flinkSessionJob);
-            statusHelper.patchAndCacheStatus(flinkSessionJob);
+            statusRecorder.patchAndCacheStatus(flinkSessionJob);
             return ReconciliationUtils.toUpdateControl(
                     configManager.getOperatorConfiguration(), flinkSessionJob, previousJob, false);
         }
@@ -101,7 +101,7 @@ public class FlinkSessionJobController
             throw new ReconciliationException(e);
         }
         metricManager.onUpdate(flinkSessionJob);
-        statusHelper.patchAndCacheStatus(flinkSessionJob);
+        statusRecorder.patchAndCacheStatus(flinkSessionJob);
         return ReconciliationUtils.toUpdateControl(
                 configManager.getOperatorConfiguration(), flinkSessionJob, previousJob, true);
     }
@@ -110,7 +110,7 @@ public class FlinkSessionJobController
     public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) {
         LOG.info("Deleting FlinkSessionJob");
         metricManager.onRemove(sessionJob);
-        statusHelper.removeCachedStatus(sessionJob);
+        statusRecorder.removeCachedStatus(sessionJob);
         return reconciler.cleanup(sessionJob, context);
     }
 
@@ -118,7 +118,7 @@ public class FlinkSessionJobController
     public ErrorStatusUpdateControl<FlinkSessionJob> updateErrorStatus(
             FlinkSessionJob sessionJob, Context<FlinkSessionJob> context, Exception e) {
         return ReconciliationUtils.toErrorStatusUpdateControl(
-                sessionJob, context.getRetryInfo(), e, metricManager, statusHelper);
+                sessionJob, context.getRetryInfo(), e, metricManager, statusRecorder);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListener.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListener.java
new file mode 100644
index 0000000..5cd6143
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListener.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.listener;
+
+import org.apache.flink.core.plugin.Plugin;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+/** Listener interface for Flink resource related events and status changes. */
+public interface FlinkResourceListener extends Plugin {
+
+    void onDeploymentStatusUpdate(StatusUpdateContext<FlinkDeployment, FlinkDeploymentStatus> ctx);
+
+    void onDeploymentEvent(ResourceEventContext<FlinkDeployment> ctx);
+
+    void onSessionJobStatusUpdate(StatusUpdateContext<FlinkSessionJob, FlinkSessionJobStatus> ctx);
+
+    void onSessionJobEvent(ResourceEventContext<FlinkSessionJob> ctx);
+
+    /** Base for Resource Event and StatusUpdate contexts. */
+    interface ResourceContext<R extends AbstractFlinkResource<?, ?>> {
+        R getFlinkResource();
+
+        KubernetesClient getKubernetesClient();
+    }
+
+    /** Context for Resource Event listener methods. */
+    interface ResourceEventContext<R extends AbstractFlinkResource<?, ?>>
+            extends ResourceContext<R> {
+        Event getEvent();
+    }
+
+    /** Context for Status listener methods. */
+    interface StatusUpdateContext<R extends AbstractFlinkResource<?, S>, S extends CommonStatus<?>>
+            extends ResourceContext<R> {
+
+        default S getNewStatus() {
+            return getFlinkResource().getStatus();
+        }
+
+        S getPreviousStatus();
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
new file mode 100644
index 0000000..f5ce71d
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.listener;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** Flink resource listener utilities. */
+public class ListenerUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkUtils.class);
+
+    private static final String PREFIX = "kubernetes.operator.plugins.listeners.";
+    private static final String SUFFIX = ".class";
+    private static final Pattern PTN =
+            Pattern.compile(Pattern.quote(PREFIX) + "([\\S&&[^.]]*)" + Pattern.quote(SUFFIX));
+    private static final List<String> EXTRA_PARENT_FIRST_PATTERNS =
+            List.of("io.fabric8", "com.fasterxml");
+
+    /**
+     * Load {@link FlinkResourceListener} implementations from the plugin directory. Only listeners
+     * that are explicitly named and configured will be enabled.
+     *
+     * <p>Config format: kubernetes.operator.plugins.listeners.test.class: com.myorg.MyListener
+     * kubernetes.operator.plugins.listeners.test.k1: v1
+     *
+     * @param configManager {@link FlinkConfigManager} to access plugin configurations.
+     * @return Enabled listeners.
+     */
+    public static Collection<FlinkResourceListener> discoverListeners(
+            FlinkConfigManager configManager) {
+        var listeners = new ArrayList<FlinkResourceListener>();
+        var conf = getListenerBaseConf(configManager);
+
+        Map<String, Configuration> listenerConfigs = loadListenerConfigs(conf);
+        PluginUtils.createPluginManagerFromRootFolder(conf)
+                .load(FlinkResourceListener.class)
+                .forEachRemaining(
+                        listener -> {
+                            String clazz = listener.getClass().getName();
+                            LOG.info(
+                                    "Discovered resource listener from plugin directory[{}]: {}.",
+                                    System.getenv()
+                                            .getOrDefault(
+                                                    ConfigConstants.ENV_FLINK_PLUGINS_DIR,
+                                                    ConfigConstants.DEFAULT_FLINK_PLUGINS_DIRS),
+                                    clazz);
+
+                            if (listenerConfigs.containsKey(clazz)) {
+                                LOG.info("Initializing {}", clazz);
+                                listener.configure(listenerConfigs.get(clazz));
+                                listeners.add(listener);
+                            } else {
+                                LOG.warn("No configuration found for {}", clazz);
+                            }
+                        });
+        return listeners;
+    }
+
+    private static Configuration getListenerBaseConf(FlinkConfigManager configManager) {
+        var conf = new Configuration(configManager.getDefaultConfig());
+        List<String> additionalPatterns =
+                new ArrayList<>(
+                        conf.getOptional(
+                                        CoreOptions
+                                                .PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL)
+                                .orElse(Collections.emptyList()));
+        additionalPatterns.addAll(EXTRA_PARENT_FIRST_PATTERNS);
+        conf.set(
+                CoreOptions.PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
+                additionalPatterns);
+        return conf;
+    }
+
+    @VisibleForTesting
+    protected static Map<String, Configuration> loadListenerConfigs(Configuration configuration) {
+        Map<String, Configuration> listenerConfigs = new HashMap<>();
+        for (var enableListeners : findEnabledListeners(configuration)) {
+            DelegatingConfiguration delegatingConfiguration =
+                    new DelegatingConfiguration(configuration, PREFIX + enableListeners.f0 + '.');
+            listenerConfigs.put(enableListeners.f1, delegatingConfiguration);
+        }
+        return listenerConfigs;
+    }
+
+    private static Set<Tuple2<String, String>> findEnabledListeners(Configuration configuration) {
+        Set<Tuple2<String, String>> result = new HashSet<>();
+        for (String key : configuration.keySet()) {
+            Matcher matcher = PTN.matcher(key);
+            if (matcher.matches()) {
+                result.add(
+                        Tuple2.of(
+                                matcher.group(1),
+                                configuration.get(
+                                        ConfigOptions.key(key).stringType().noDefaultValue())));
+            }
+        }
+        return result;
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index afc9582..5c31d88 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
@@ -38,9 +39,11 @@ public abstract class JobStatusObserver<CTX> {
     private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
     private static final int MAX_ERROR_STRING_LENGTH = 512;
     private final FlinkService flinkService;
+    private final EventRecorder eventRecorder;
 
-    public JobStatusObserver(FlinkService flinkService) {
+    public JobStatusObserver(FlinkService flinkService, EventRecorder eventRecorder) {
         this.flinkService = flinkService;
+        this.eventRecorder = eventRecorder;
     }
 
     /**
@@ -135,8 +138,7 @@ public abstract class JobStatusObserver<CTX> {
             LOG.info(message);
 
             setErrorIfPresent(resource, clusterJobStatus, deployedConfig);
-            EventUtils.createOrUpdateEvent(
-                    flinkService.getKubernetesClient(),
+            eventRecorder.triggerEvent(
                     resource,
                     EventUtils.Type.Normal,
                     "Status Changed",
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
index a2a624a..202f7b3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
@@ -30,9 +30,10 @@ import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.ConfigOptionUtils;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,15 +50,18 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> {
 
     private final FlinkService flinkService;
     private final FlinkConfigManager configManager;
-    private final StatusHelper<STATUS> statusHelper;
+    private final StatusRecorder<STATUS> statusRecorder;
+    private final EventRecorder eventRecorder;
 
     public SavepointObserver(
             FlinkService flinkService,
             FlinkConfigManager configManager,
-            StatusHelper<STATUS> statusHelper) {
+            StatusRecorder<STATUS> statusRecorder,
+            EventRecorder eventRecorder) {
         this.flinkService = flinkService;
         this.configManager = configManager;
-        this.statusHelper = statusHelper;
+        this.statusRecorder = statusRecorder;
+        this.eventRecorder = eventRecorder;
     }
 
     public void observeSavepointStatus(
@@ -114,8 +118,7 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> {
                                 + err);
                 ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
                         savepointInfo, resource);
-                EventUtils.createOrUpdateEvent(
-                        flinkService.getKubernetesClient(),
+                eventRecorder.triggerEvent(
                         resource,
                         EventUtils.Type.Warning,
                         "SavepointError",
@@ -219,7 +222,7 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> {
             LOG.info(
                     "Updating resource status after observing new last savepoint {}",
                     currentLastSpPath);
-            statusHelper.patchAndCacheStatus(resource);
+            statusRecorder.patchAndCacheStatus(resource);
         }
     }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index 7711c4c..d3217f3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 
@@ -43,7 +44,6 @@ import io.fabric8.kubernetes.api.model.apps.Deployment;
 import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
 import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
 import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
-import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,17 +58,17 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
 
     protected final Logger logger = LoggerFactory.getLogger(this.getClass());
 
-    protected final KubernetesClient kubernetesClient;
     protected final FlinkService flinkService;
     protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
 
     public AbstractDeploymentObserver(
-            KubernetesClient kubernetesClient,
             FlinkService flinkService,
-            FlinkConfigManager configManager) {
-        this.kubernetesClient = kubernetesClient;
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
         this.flinkService = flinkService;
         this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
     }
 
     @Override
@@ -101,7 +101,7 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
             observeClusterInfo(flinkApp, observeConfig);
         }
 
-        SavepointUtils.resetTriggerIfJobNotRunning(flinkService.getKubernetesClient(), flinkApp);
+        SavepointUtils.resetTriggerIfJobNotRunning(flinkApp, eventRecorder);
         clearErrorsIfDeploymentIsHealthy(flinkApp);
     }
 
@@ -244,8 +244,7 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
         String err = "Missing JobManager deployment";
         logger.error(err);
         ReconciliationUtils.updateForReconciliationError(deployment, err);
-        EventUtils.createOrUpdateEvent(
-                kubernetesClient,
+        eventRecorder.triggerEvent(
                 deployment,
                 EventUtils.Type.Warning,
                 "Missing",
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index dac96a5..27a3150 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -27,10 +27,10 @@ import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.observer.SavepointObserver;
 import org.apache.flink.kubernetes.operator.observer.context.ApplicationObserverContext;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 
 import java.util.List;
@@ -46,14 +46,15 @@ public class ApplicationObserver extends AbstractDeploymentObserver {
     private final JobStatusObserver<ApplicationObserverContext> jobStatusObserver;
 
     public ApplicationObserver(
-            KubernetesClient kubernetesClient,
             FlinkService flinkService,
             FlinkConfigManager configManager,
-            StatusHelper<FlinkDeploymentStatus> statusHelper) {
-        super(kubernetesClient, flinkService, configManager);
-        this.savepointObserver = new SavepointObserver<>(flinkService, configManager, statusHelper);
+            StatusRecorder<FlinkDeploymentStatus> statusRecorder,
+            EventRecorder eventRecorder) {
+        super(flinkService, configManager, eventRecorder);
+        this.savepointObserver =
+                new SavepointObserver(flinkService, configManager, statusRecorder, eventRecorder);
         this.jobStatusObserver =
-                new JobStatusObserver<>(flinkService) {
+                new JobStatusObserver<>(flinkService, eventRecorder) {
                     @Override
                     public void onTimeout(ApplicationObserverContext ctx) {
                         observeJmDeployment(ctx.flinkApp, ctx.context, ctx.deployedConfig);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
index e307022..605c276 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
@@ -23,9 +23,8 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
-
-import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -33,21 +32,21 @@ import java.util.concurrent.ConcurrentHashMap;
 /** The factory to create the observer based ob the {@link FlinkDeployment} mode. */
 public class ObserverFactory {
 
-    private final KubernetesClient kubernetesClient;
     private final FlinkService flinkService;
     private final FlinkConfigManager configManager;
-    private final StatusHelper<FlinkDeploymentStatus> statusHelper;
+    private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
+    private final EventRecorder eventRecorder;
     private final Map<Mode, Observer<FlinkDeployment>> observerMap;
 
     public ObserverFactory(
-            KubernetesClient kubernetesClient,
             FlinkService flinkService,
             FlinkConfigManager configManager,
-            StatusHelper<FlinkDeploymentStatus> statusHelper) {
-        this.kubernetesClient = kubernetesClient;
+            StatusRecorder<FlinkDeploymentStatus> statusRecorder,
+            EventRecorder eventRecorder) {
         this.flinkService = flinkService;
         this.configManager = configManager;
-        this.statusHelper = statusHelper;
+        this.statusRecorder = statusRecorder;
+        this.eventRecorder = eventRecorder;
         this.observerMap = new ConcurrentHashMap<>();
     }
 
@@ -57,11 +56,10 @@ public class ObserverFactory {
                 mode -> {
                     switch (mode) {
                         case SESSION:
-                            return new SessionObserver(
-                                    kubernetesClient, flinkService, configManager);
+                            return new SessionObserver(flinkService, configManager, eventRecorder);
                         case APPLICATION:
                             return new ApplicationObserver(
-                                    kubernetesClient, flinkService, configManager, statusHelper);
+                                    flinkService, configManager, statusRecorder, eventRecorder);
                         default:
                             throw new UnsupportedOperationException(
                                     String.format("Unsupported running mode: %s", mode));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
index 4612fb3..fc6f4ed 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
@@ -21,8 +21,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 
 import java.util.concurrent.TimeoutException;
@@ -31,10 +31,10 @@ import java.util.concurrent.TimeoutException;
 public class SessionObserver extends AbstractDeploymentObserver {
 
     public SessionObserver(
-            KubernetesClient kubernetesClient,
             FlinkService flinkService,
-            FlinkConfigManager configManager) {
-        super(kubernetesClient, flinkService, configManager);
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        super(flinkService, configManager, eventRecorder);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 7d16a6a..6169a4d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -28,8 +28,9 @@ import org.apache.flink.kubernetes.operator.observer.SavepointObserver;
 import org.apache.flink.kubernetes.operator.observer.context.VoidObserverContext;
 import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobHelper;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.util.Preconditions;
 
@@ -45,20 +46,22 @@ import java.util.stream.Collectors;
 public class SessionJobObserver implements Observer<FlinkSessionJob> {
 
     private static final Logger LOG = LoggerFactory.getLogger(SessionJobObserver.class);
-    private final FlinkService flinkService;
     private final FlinkConfigManager configManager;
+    private final EventRecorder eventRecorder;
     private final SavepointObserver<FlinkSessionJobStatus> savepointObserver;
     private final JobStatusObserver<VoidObserverContext> jobStatusObserver;
 
     public SessionJobObserver(
             FlinkService flinkService,
             FlinkConfigManager configManager,
-            StatusHelper<FlinkSessionJobStatus> statusHelper) {
-        this.flinkService = flinkService;
+            StatusRecorder<FlinkSessionJobStatus> statusRecorder,
+            EventRecorder eventRecorder) {
         this.configManager = configManager;
-        this.savepointObserver = new SavepointObserver<>(flinkService, configManager, statusHelper);
+        this.eventRecorder = eventRecorder;
+        this.savepointObserver =
+                new SavepointObserver(flinkService, configManager, statusRecorder, eventRecorder);
         this.jobStatusObserver =
-                new JobStatusObserver<>(flinkService) {
+                new JobStatusObserver<>(flinkService, eventRecorder) {
                     @Override
                     protected void onTimeout(VoidObserverContext sessionJobObserverContext) {}
 
@@ -114,7 +117,6 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
         if (jobFound) {
             savepointObserver.observeSavepointStatus(flinkSessionJob, deployedConfig);
         }
-        SavepointUtils.resetTriggerIfJobNotRunning(
-                flinkService.getKubernetesClient(), flinkSessionJob);
+        SavepointUtils.resetTriggerIfJobNotRunning(flinkSessionJob, eventRecorder);
     }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index bd2c5bb..85bdf52 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -40,7 +40,7 @@ import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -372,7 +372,7 @@ public class ReconciliationUtils {
      * @param retryInfo Current RetryInformation
      * @param e Exception that caused the retry
      * @param metricManager Metric manager to be updated
-     * @param statusHelper StatusHelper object for patching status
+     * @param statusRecorder statusRecorder object for patching status
      * @return This always returns Empty optional currently, due to the status update logic
      */
     public static <STATUS extends CommonStatus<?>, R extends AbstractFlinkResource<?, STATUS>>
@@ -381,7 +381,7 @@ public class ReconciliationUtils {
                     Optional<RetryInfo> retryInfo,
                     Exception e,
                     MetricManager<R> metricManager,
-                    StatusHelper<STATUS> statusHelper) {
+                    StatusRecorder<STATUS> statusRecorder) {
 
         retryInfo.ifPresent(
                 r -> {
@@ -391,12 +391,12 @@ public class ReconciliationUtils {
                             r.isLastAttempt());
                 });
 
-        statusHelper.updateStatusFromCache(resource);
+        statusRecorder.updateStatusFromCache(resource);
         ReconciliationUtils.updateForReconciliationError(
                 resource,
                 (e instanceof ReconciliationException) ? e.getCause().toString() : e.toString());
         metricManager.onUpdate(resource);
-        statusHelper.patchAndCacheStatus(resource);
+        statusRecorder.patchAndCacheStatus(resource);
 
         // Status was updated already, no need to return anything
         return ErrorStatusUpdateControl.noStatusUpdate();
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
index 6c77657..350ff2e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
@@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -40,16 +41,19 @@ public abstract class AbstractDeploymentReconciler implements Reconciler<FlinkDe
     private static final Logger LOG = LoggerFactory.getLogger(AbstractDeploymentReconciler.class);
 
     protected final FlinkConfigManager configManager;
+    protected final EventRecorder eventRecorder;
     protected final KubernetesClient kubernetesClient;
     protected final FlinkService flinkService;
 
     public AbstractDeploymentReconciler(
             KubernetesClient kubernetesClient,
             FlinkService flinkService,
-            FlinkConfigManager configManager) {
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
         this.kubernetesClient = kubernetesClient;
         this.flinkService = flinkService;
         this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 01bfe69..595b7ef 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -35,6 +35,7 @@ import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
@@ -62,8 +63,9 @@ public class ApplicationReconciler extends AbstractDeploymentReconciler {
     public ApplicationReconciler(
             KubernetesClient kubernetesClient,
             FlinkService flinkService,
-            FlinkConfigManager configManager) {
-        super(kubernetesClient, flinkService, configManager);
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        super(kubernetesClient, flinkService, configManager, eventRecorder);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index 7603469..468526e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 
@@ -34,15 +35,18 @@ public class ReconcilerFactory {
     private final KubernetesClient kubernetesClient;
     private final FlinkService flinkService;
     private final FlinkConfigManager configManager;
+    private final EventRecorder eventRecorder;
     private final Map<Mode, Reconciler<FlinkDeployment>> reconcilerMap;
 
     public ReconcilerFactory(
             KubernetesClient kubernetesClient,
             FlinkService flinkService,
-            FlinkConfigManager configManager) {
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
         this.kubernetesClient = kubernetesClient;
         this.flinkService = flinkService;
         this.configManager = configManager;
+        this.eventRecorder = eventRecorder;
         this.reconcilerMap = new ConcurrentHashMap<>();
     }
 
@@ -53,10 +57,10 @@ public class ReconcilerFactory {
                     switch (mode) {
                         case SESSION:
                             return new SessionReconciler(
-                                    kubernetesClient, flinkService, configManager);
+                                    kubernetesClient, flinkService, configManager, eventRecorder);
                         case APPLICATION:
                             return new ApplicationReconciler(
-                                    kubernetesClient, flinkService, configManager);
+                                    kubernetesClient, flinkService, configManager, eventRecorder);
                         default:
                             throw new UnsupportedOperationException(
                                     String.format("Unsupported running mode: %s", mode));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 6057178..ccf74f5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
@@ -52,8 +53,9 @@ public class SessionReconciler extends AbstractDeploymentReconciler {
     public SessionReconciler(
             KubernetesClient kubernetesClient,
             FlinkService flinkService,
-            FlinkConfigManager configManager) {
-        super(kubernetesClient, flinkService, configManager);
+            FlinkConfigManager configManager,
+            EventRecorder eventRecorder) {
+        super(kubernetesClient, flinkService, configManager, eventRecorder);
     }
 
     @Override
@@ -155,8 +157,7 @@ public class SessionReconciler extends AbstractDeploymentReconciler {
                             sessionJobs.stream()
                                     .map(job -> job.getMetadata().getName())
                                     .collect(Collectors.toList()));
-            if (EventUtils.createOrUpdateEvent(
-                    kubernetesClient,
+            if (eventRecorder.triggerEvent(
                     flinkApp,
                     EventUtils.Type.Warning,
                     "Cleanup",
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
new file mode 100644
index 0000000..ff87e8f
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Helper class for creating Kubernetes events for Flink resources. */
+public class EventRecorder {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EventRecorder.class);
+
+    private final KubernetesClient client;
+    private final BiConsumer<AbstractFlinkResource<?, ?>, Event> eventListener;
+
+    public EventRecorder(
+            KubernetesClient client, BiConsumer<AbstractFlinkResource<?, ?>, Event> eventListener) {
+        this.client = client;
+        this.eventListener = eventListener;
+    }
+
+    public boolean triggerEvent(
+            AbstractFlinkResource<?, ?> resource,
+            EventUtils.Type type,
+            String reason,
+            String message,
+            EventUtils.Component component) {
+        return EventUtils.createOrUpdateEvent(
+                client,
+                resource,
+                type,
+                reason,
+                message,
+                component,
+                e -> eventListener.accept(resource, e));
+    }
+
+    public static EventRecorder create(
+            KubernetesClient client, Collection<FlinkResourceListener> listeners) {
+
+        BiConsumer<AbstractFlinkResource<?, ?>, Event> biConsumer =
+                (resource, event) ->
+                        listeners.forEach(
+                                listener -> {
+                                    var ctx =
+                                            new FlinkResourceListener.ResourceEventContext() {
+                                                @Override
+                                                public Event getEvent() {
+                                                    return event;
+                                                }
+
+                                                @Override
+                                                public AbstractFlinkResource<?, ?>
+                                                        getFlinkResource() {
+                                                    return resource;
+                                                }
+
+                                                @Override
+                                                public KubernetesClient getKubernetesClient() {
+                                                    return client;
+                                                }
+                                            };
+
+                                    if (resource instanceof FlinkDeployment) {
+                                        listener.onDeploymentEvent(ctx);
+                                    } else {
+                                        listener.onSessionJobEvent(ctx);
+                                    }
+                                });
+        return new EventRecorder(client, biConsumer);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
index f710378..100b857 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
@@ -17,12 +17,14 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
+import io.fabric8.kubernetes.api.model.Event;
 import io.fabric8.kubernetes.api.model.EventBuilder;
 import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
 
 import java.time.Instant;
+import java.util.function.Consumer;
 
 /**
  * The util to generate an event for the target resource. It is copied from
@@ -63,7 +65,8 @@ public class EventUtils {
             Type type,
             String reason,
             String message,
-            Component component) {
+            Component component,
+            Consumer<Event> eventListener) {
         var eventName = generateEventName(target, type, reason, message, component);
 
         var existing =
@@ -86,6 +89,7 @@ public class EventUtils {
                     .events()
                     .inNamespace(target.getMetadata().getNamespace())
                     .createOrReplace(existing);
+            eventListener.accept(existing);
             return false;
         } else {
             var event =
@@ -113,6 +117,7 @@ public class EventUtils {
                             .endMetadata()
                             .build();
             client.v1().events().inNamespace(target.getMetadata().getNamespace()).create(event);
+            eventListener.accept(event);
             return true;
         }
     }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
index 6858281..23e5871 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
@@ -27,7 +27,6 @@ import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -140,7 +139,7 @@ public class SavepointUtils {
     }
 
     public static void resetTriggerIfJobNotRunning(
-            KubernetesClient client, AbstractFlinkResource<?, ?> resource) {
+            AbstractFlinkResource<?, ?> resource, EventRecorder eventRecorder) {
         var status = resource.getStatus();
         var jobStatus = status.getJobStatus();
         if (!ReconciliationUtils.isJobRunning(status)
@@ -149,8 +148,7 @@ public class SavepointUtils {
             ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(savepointInfo, resource);
             savepointInfo.resetTrigger();
             LOG.error("Job is not running, cancelling savepoint operation");
-            EventUtils.createOrUpdateEvent(
-                    client,
+            eventRecorder.triggerEvent(
                     resource,
                     EventUtils.Type.Warning,
                     "SavepointError",
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusHelper.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
similarity index 57%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusHelper.java
rename to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index ca96244..ae32f92 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusHelper.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -19,7 +19,12 @@
 package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -30,12 +35,14 @@ import lombok.SneakyThrows;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
 
 /** Helper class for status management and updates. */
-public class StatusHelper<STATUS extends CommonStatus<?>> {
+public class StatusRecorder<STATUS extends CommonStatus<?>> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(StatusHelper.class);
+    private static final Logger LOG = LoggerFactory.getLogger(StatusRecorder.class);
 
     protected final ObjectMapper objectMapper = new ObjectMapper();
 
@@ -43,9 +50,13 @@ public class StatusHelper<STATUS extends CommonStatus<?>> {
             new ConcurrentHashMap<>();
 
     private final KubernetesClient client;
+    private final BiConsumer<AbstractFlinkResource<?, STATUS>, STATUS> statusUpdateListener;
 
-    public StatusHelper(KubernetesClient client) {
+    public StatusRecorder(
+            KubernetesClient client,
+            BiConsumer<AbstractFlinkResource<?, STATUS>, STATUS> statusUpdateListener) {
         this.client = client;
+        this.statusUpdateListener = statusUpdateListener;
     }
 
     /**
@@ -57,8 +68,7 @@ public class StatusHelper<STATUS extends CommonStatus<?>> {
      * @param resource Resource for which status update should be performed
      */
     @SneakyThrows
-    public <T extends CustomResource<?, STATUS>> void patchAndCacheStatus(T resource) {
-
+    public <T extends AbstractFlinkResource<?, STATUS>> void patchAndCacheStatus(T resource) {
         Class<T> resourceClass = (Class<T>) resource.getClass();
         String namespace = resource.getMetadata().getNamespace();
         String name = resource.getMetadata().getName();
@@ -67,14 +77,21 @@ public class StatusHelper<STATUS extends CommonStatus<?>> {
         // in the meantime
         resource.getMetadata().setResourceVersion(null);
 
-        ObjectNode newStatus = objectMapper.convertValue(resource.getStatus(), ObjectNode.class);
-        ObjectNode previousStatus = statusCache.put(getKey(resource), newStatus);
+        ObjectNode newStatusNode =
+                objectMapper.convertValue(resource.getStatus(), ObjectNode.class);
+        ObjectNode previousStatusNode = statusCache.put(getKey(resource), newStatusNode);
 
-        if (newStatus.equals(previousStatus)) {
+        if (newStatusNode.equals(previousStatusNode)) {
             LOG.debug("No status change.");
             return;
         }
 
+        var statusClass =
+                (resource instanceof FlinkDeployment)
+                        ? FlinkDeploymentStatus.class
+                        : FlinkSessionJobStatus.class;
+        var prevStatus = (STATUS) objectMapper.convertValue(previousStatusNode, statusClass);
+
         Exception err = null;
         for (int i = 0; i < 3; i++) {
             // In any case we retry the status update 3 times to avoid some intermittent
@@ -84,6 +101,7 @@ public class StatusHelper<STATUS extends CommonStatus<?>> {
                         .inNamespace(namespace)
                         .withName(name)
                         .patchStatus(resource);
+                statusUpdateListener.accept(resource, prevStatus);
                 return;
             } catch (Exception e) {
                 LOG.error("Error while patching status, retrying {}/3...", (i + 1), e);
@@ -105,12 +123,16 @@ public class StatusHelper<STATUS extends CommonStatus<?>> {
      * @param resource Resource for which the status should be updated from the cache
      */
     public <T extends CustomResource<?, STATUS>> void updateStatusFromCache(T resource) {
-        var cachedStatus = statusCache.get(getKey(resource));
+        var key = getKey(resource);
+        var cachedStatus = statusCache.get(key);
         if (cachedStatus != null) {
             resource.setStatus(
                     (STATUS)
                             objectMapper.convertValue(
                                     cachedStatus, resource.getStatus().getClass()));
+        } else {
+            // Initialize cache with current status copy
+            statusCache.put(key, objectMapper.convertValue(resource.getStatus(), ObjectNode.class));
         }
     }
 
@@ -122,4 +144,38 @@ public class StatusHelper<STATUS extends CommonStatus<?>> {
     protected static Tuple2<String, String> getKey(HasMetadata resource) {
         return Tuple2.of(resource.getMetadata().getNamespace(), resource.getMetadata().getName());
     }
+
+    public static <S extends CommonStatus<?>> StatusRecorder<S> create(
+            KubernetesClient kubernetesClient, Collection<FlinkResourceListener> listeners) {
+        BiConsumer<AbstractFlinkResource<?, S>, S> consumer =
+                (resource, previousStatus) -> {
+                    var ctx =
+                            new FlinkResourceListener.StatusUpdateContext() {
+                                @Override
+                                public S getPreviousStatus() {
+                                    return previousStatus;
+                                }
+
+                                @Override
+                                public AbstractFlinkResource<?, S> getFlinkResource() {
+                                    return resource;
+                                }
+
+                                @Override
+                                public KubernetesClient getKubernetesClient() {
+                                    return kubernetesClient;
+                                }
+                            };
+
+                    listeners.forEach(
+                            listener -> {
+                                if (resource instanceof FlinkDeployment) {
+                                    listener.onDeploymentStatusUpdate(ctx);
+                                } else {
+                                    listener.onSessionJobStatusUpdate(ctx);
+                                }
+                            });
+                };
+        return new StatusRecorder<>(kubernetesClient, consumer);
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/assembly/test-validator-assembly.xml b/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml
similarity index 93%
rename from flink-kubernetes-operator/src/test/assembly/test-validator-assembly.xml
rename to flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml
index b4abc44..93d7807 100644
--- a/flink-kubernetes-operator/src/test/assembly/test-validator-assembly.xml
+++ b/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml
@@ -29,6 +29,7 @@ under the License.
             <!-- the service impl -->
             <includes>
                 <include>org/apache/flink/kubernetes/operator/validation/TestValidator.class</include>
+                <include>org/apache/flink/kubernetes/operator/listener/TestingListener.class</include>
             </includes>
         </fileSet>
         <fileSet>
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 2ad7c1e..c619633 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -41,7 +41,8 @@ import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 import org.apache.flink.metrics.testutils.MetricListener;
 
@@ -68,8 +69,13 @@ import okhttp3.Headers;
 import okhttp3.mockwebserver.RecordedRequest;
 import org.junit.jupiter.api.Assertions;
 
+import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.HttpURLConnection;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
@@ -80,6 +86,8 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
+import static org.junit.Assert.assertTrue;
+
 /** Testing utilities. */
 public class TestUtils {
 
@@ -92,6 +100,9 @@ public class TestUtils {
     public static final String IMAGE_POLICY = "IfNotPresent";
     public static final String SAMPLE_JAR = "local:///tmp/sample.jar";
 
+    private static final String TEST_PLUGINS = "test-plugins";
+    private static final String PlUGINS_JAR = TEST_PLUGINS + "-test-jar.jar";
+
     public static FlinkDeployment buildSessionCluster() {
         return buildSessionCluster(FlinkVersion.v1_15);
     }
@@ -420,6 +431,17 @@ public class TestUtils {
         };
     }
 
+    public static String getTestPluginsRootDir(Path temporaryFolder) throws IOException {
+        File testValidatorFolder = new File(temporaryFolder.toFile(), TEST_PLUGINS);
+        assertTrue(testValidatorFolder.mkdirs());
+        File testValidatorJar = new File("target", PlUGINS_JAR);
+        assertTrue(testValidatorJar.exists());
+        Files.copy(
+                testValidatorJar.toPath(), Paths.get(testValidatorFolder.toString(), PlUGINS_JAR));
+
+        return temporaryFolder.toAbsolutePath().toString();
+    }
+
     // This code is taken slightly modified from: http://stackoverflow.com/a/7201825/568695
     // it changes the environment variables of this JVM. Use only for testing purposes!
     @SuppressWarnings("unchecked")
@@ -455,15 +477,18 @@ public class TestUtils {
             KubernetesClient kubernetesClient,
             TestingFlinkService flinkService) {
 
-        var statusHelper = new StatusHelper<FlinkDeploymentStatus>(kubernetesClient);
+        var statusRecorder =
+                new StatusRecorder<FlinkDeploymentStatus>(kubernetesClient, (r, s) -> {});
+        var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
         return new FlinkDeploymentController(
                 configManager,
                 kubernetesClient,
                 ValidatorUtils.discoverValidators(configManager),
-                new ReconcilerFactory(kubernetesClient, flinkService, configManager),
-                new ObserverFactory(kubernetesClient, flinkService, configManager, statusHelper),
+                new ReconcilerFactory(kubernetesClient, flinkService, configManager, eventRecorder),
+                new ObserverFactory(flinkService, configManager, statusRecorder, eventRecorder),
                 new MetricManager<>(new MetricListener().getMetricGroup()),
-                statusHelper);
+                statusRecorder,
+                eventRecorder);
     }
 
     /** Testing ResponseProvider. */
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusHelper.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
similarity index 72%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusHelper.java
rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
index ed29596..0243fd5 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusHelper.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
@@ -18,21 +18,21 @@
 
 package org.apache.flink.kubernetes.operator;
 
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import io.fabric8.kubernetes.client.CustomResource;
 
-/** Testing StatusHelper. */
-public class TestingStatusHelper<STATUS extends CommonStatus<?>> extends StatusHelper<STATUS> {
+/** Testing statusRecorder. */
+public class TestingStatusRecorder<STATUS extends CommonStatus<?>> extends StatusRecorder<STATUS> {
 
-    public TestingStatusHelper() {
-        super(null);
+    public TestingStatusRecorder() {
+        super(null, (r, s) -> {});
     }
 
     @Override
-    public <T extends CustomResource<?, STATUS>> void patchAndCacheStatus(T resource) {
+    public <T extends AbstractFlinkResource<?, STATUS>> void patchAndCacheStatus(T resource) {
         statusCache.put(
                 getKey(resource),
                 objectMapper.convertValue(resource.getStatus(), ObjectNode.class));
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
new file mode 100644
index 0000000..e624b6c
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.listener;
+
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventUtils;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link FlinkResourceListener}. */
+@EnableKubernetesMockClient(crud = true)
+public class FlinkResourceListenerTest {
+
+    private KubernetesClient kubernetesClient;
+
+    @BeforeEach
+    public void before() {
+        kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
+    }
+
+    @Test
+    public void testListeners() {
+        var listener1 = new TestingListener();
+        var listener2 = new TestingListener();
+        var listeners = List.<FlinkResourceListener>of(listener1, listener2);
+
+        var statusRecorder =
+                StatusRecorder.<FlinkDeploymentStatus>create(kubernetesClient, listeners);
+        var eventRecorder = EventRecorder.create(kubernetesClient, listeners);
+
+        var deployment = TestUtils.buildApplicationCluster();
+        statusRecorder.updateStatusFromCache(deployment);
+
+        statusRecorder.patchAndCacheStatus(deployment);
+        assertTrue(listener1.updates.isEmpty());
+        assertTrue(listener2.updates.isEmpty());
+        assertTrue(listener1.events.isEmpty());
+        assertTrue(listener2.events.isEmpty());
+
+        deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
+
+        statusRecorder.patchAndCacheStatus(deployment);
+        assertEquals(1, listener1.updates.size());
+        assertEquals(deployment, listener1.updates.get(0).getFlinkResource());
+
+        assertEquals(1, listener2.updates.size());
+        assertEquals(deployment, listener2.updates.get(0).getFlinkResource());
+
+        deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+        statusRecorder.patchAndCacheStatus(deployment);
+
+        assertEquals(2, listener1.updates.size());
+        assertEquals(deployment, listener1.updates.get(0).getFlinkResource());
+        assertEquals(2, listener2.updates.size());
+        assertEquals(deployment, listener2.updates.get(0).getFlinkResource());
+
+        var updateContext =
+                (FlinkResourceListener.StatusUpdateContext<FlinkDeployment, FlinkDeploymentStatus>)
+                        listener1.updates.get(1);
+        assertEquals(
+                JobManagerDeploymentStatus.ERROR,
+                updateContext.getPreviousStatus().getJobManagerDeploymentStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                updateContext.getNewStatus().getJobManagerDeploymentStatus());
+
+        eventRecorder.triggerEvent(
+                deployment,
+                EventUtils.Type.Warning,
+                "SavepointError",
+                "err",
+                EventUtils.Component.Operator);
+        assertEquals(1, listener1.events.size());
+        eventRecorder.triggerEvent(
+                deployment,
+                EventUtils.Type.Warning,
+                "SavepointError",
+                "err",
+                EventUtils.Component.Operator);
+        assertEquals(2, listener1.events.size());
+
+        for (int i = 0; i < listener1.events.size(); i++) {
+            assertEquals(listener1.events.get(i).getEvent(), listener2.events.get(i).getEvent());
+        }
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/ListenerUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/ListenerUtilsTest.java
new file mode 100644
index 0000000..ba5ddc3
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/ListenerUtilsTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.listener;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test for {@link ListenerUtils}. */
+public class ListenerUtilsTest {
+
+    @TempDir Path temporaryFolder;
+
+    @Test
+    public void testListenerConfiguration() throws IOException {
+        Map<String, String> testConfig = new HashMap<>();
+
+        testConfig.put(
+                "kubernetes.operator.plugins.listeners.test1.class",
+                TestingListener.class.getName());
+        testConfig.put("kubernetes.operator.plugins.listeners.test1.k1", "v1");
+        testConfig.put("kubernetes.operator.plugins.listeners.test1.k2", "v2");
+        testConfig.put("k3", "v3");
+
+        Map<String, String> originalEnv = System.getenv();
+        try {
+            Map<String, String> systemEnv = new HashMap<>(originalEnv);
+            systemEnv.put(
+                    ConfigConstants.ENV_FLINK_PLUGINS_DIR,
+                    TestUtils.getTestPluginsRootDir(temporaryFolder));
+            TestUtils.setEnv(systemEnv);
+            var listeners =
+                    new ArrayList<>(
+                            ListenerUtils.discoverListeners(
+                                    new FlinkConfigManager(Configuration.fromMap(testConfig))));
+            assertEquals(1, listeners.size());
+
+            var testingListener = (TestingListener) listeners.get(0);
+            assertEquals(
+                    Map.of("k1", "v1", "k2", "v2", "class", TestingListener.class.getName()),
+                    testingListener.config.toMap());
+        } finally {
+            TestUtils.setEnv(originalEnv);
+        }
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/TestingListener.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/TestingListener.java
new file mode 100644
index 0000000..bdb3917
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/TestingListener.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.listener;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Listener implementation for testing. */
+public class TestingListener implements FlinkResourceListener {
+
+    public List<StatusUpdateContext<?, ?>> updates = new ArrayList<>();
+    public List<ResourceEventContext<?>> events = new ArrayList<>();
+    public Configuration config;
+
+    public void onStatusUpdate(StatusUpdateContext<?, ?> ctx) {
+        updates.add(ctx);
+    }
+
+    public void onEvent(ResourceEventContext<?> ctx) {
+        events.add(ctx);
+    }
+
+    @Override
+    public void onDeploymentStatusUpdate(
+            StatusUpdateContext<FlinkDeployment, FlinkDeploymentStatus> ctx) {
+        onStatusUpdate(ctx);
+    }
+
+    @Override
+    public void onDeploymentEvent(ResourceEventContext<FlinkDeployment> ctx) {
+        onEvent(ctx);
+    }
+
+    @Override
+    public void onSessionJobStatusUpdate(
+            StatusUpdateContext<FlinkSessionJob, FlinkSessionJobStatus> ctx) {
+        onStatusUpdate(ctx);
+    }
+
+    @Override
+    public void onSessionJobEvent(ResourceEventContext<FlinkSessionJob> ctx) {
+        onEvent(ctx);
+    }
+
+    @Override
+    public void configure(Configuration config) {
+        this.config = config;
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
index 44aa3bf..946f7b5 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
@@ -20,15 +20,19 @@ package org.apache.flink.kubernetes.operator.observer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
-import org.apache.flink.kubernetes.operator.TestingStatusHelper;
+import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
@@ -39,15 +43,25 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for {@link SavepointObserver}. */
+@EnableKubernetesMockClient(crud = true)
 public class SavepointObserverTest {
 
+    private KubernetesClient kubernetesClient;
     private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
     private final TestingFlinkService flinkService = new TestingFlinkService();
+    private SavepointObserver observer;
+    private final EventRecorder eventRecorder = new EventRecorder(null, (r, e) -> {});
+
+    @BeforeEach
+    public void before() {
+        var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+        observer =
+                new SavepointObserver(
+                        flinkService, configManager, new TestingStatusRecorder<>(), eventRecorder);
+    }
 
     @Test
     public void testBasicObserve() {
-        SavepointObserver observer =
-                new SavepointObserver(flinkService, configManager, new TestingStatusHelper<>());
         SavepointInfo spInfo = new SavepointInfo();
         Assertions.assertTrue(spInfo.getSavepointHistory().isEmpty());
 
@@ -67,8 +81,6 @@ public class SavepointObserverTest {
                 KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
                 Duration.ofMillis(5));
 
-        SavepointObserver observer =
-                new SavepointObserver(flinkService, configManager, new TestingStatusHelper<>());
         SavepointInfo spInfo = new SavepointInfo();
 
         Savepoint sp1 = new Savepoint(1, "sp1", SavepointTriggerType.MANUAL);
@@ -98,9 +110,6 @@ public class SavepointObserverTest {
                 KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD,
                 Duration.ofMillis(5));
         configManager.updateDefaultConfig(conf);
-
-        SavepointObserver observer =
-                new SavepointObserver(flinkService, configManager, new TestingStatusHelper<>());
         SavepointInfo spInfo = new SavepointInfo();
 
         Savepoint sp1 = new Savepoint(1, "sp1", SavepointTriggerType.MANUAL);
@@ -135,8 +144,6 @@ public class SavepointObserverTest {
         jobStatus.setState("RUNNING");
 
         var savepointInfo = jobStatus.getSavepointInfo();
-        var observer =
-                new SavepointObserver(flinkService, configManager, new TestingStatusHelper<>());
         flinkService.triggerSavepoint(null, SavepointTriggerType.PERIODIC, savepointInfo, conf);
 
         var triggerTs = savepointInfo.getTriggerTimestamp();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 53353db..b9e30f8 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
-import org.apache.flink.kubernetes.operator.TestingStatusHelper;
+import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
@@ -39,6 +40,7 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
@@ -57,13 +59,19 @@ public class ApplicationObserverTest {
     private KubernetesClient kubernetesClient;
     private final Context readyContext = TestUtils.createContextWithReadyJobManagerDeployment();
     private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
+    private final TestingFlinkService flinkService = new TestingFlinkService();
+    private ApplicationObserver observer;
+
+    @BeforeEach
+    public void before() {
+        var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+        observer =
+                new ApplicationObserver(
+                        flinkService, configManager, new TestingStatusRecorder<>(), eventRecorder);
+    }
 
     @Test
     public void observeApplicationCluster() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
-        ApplicationObserver observer =
-                new ApplicationObserver(
-                        kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         deployment.setStatus(new FlinkDeploymentStatus());
 
@@ -167,13 +175,9 @@ public class ApplicationObserverTest {
 
     @Test
     public void testEventGeneratedWhenStatusChanged() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         Configuration conf =
                 configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
-        ApplicationObserver observer =
-                new ApplicationObserver(
-                        kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
         flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
 
         deployment.setStatus(new FlinkDeploymentStatus());
@@ -207,13 +211,9 @@ public class ApplicationObserverTest {
 
     @Test
     public void testErrorForwardToStatusWhenJobFailed() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         Configuration conf =
                 configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
-        ApplicationObserver observer =
-                new ApplicationObserver(
-                        kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
         flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
 
         deployment.setStatus(new FlinkDeploymentStatus());
@@ -235,10 +235,6 @@ public class ApplicationObserverTest {
 
     @Test
     public void observeSavepoint() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
-        ApplicationObserver observer =
-                new ApplicationObserver(
-                        kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         Configuration conf =
                 configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
@@ -436,10 +432,6 @@ public class ApplicationObserverTest {
 
     @Test
     public void observeListJobsError() {
-        TestingFlinkService flinkService = new TestingFlinkService();
-        ApplicationObserver observer =
-                new ApplicationObserver(
-                        kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         bringToReadyStatus(deployment);
         observer.observe(deployment, readyContext);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
index 3d96fc2..515cdb8 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.junit.jupiter.api.Test;
@@ -39,7 +40,8 @@ public class SessionObserverTest {
     public void observeSessionCluster() {
         TestingFlinkService flinkService = new TestingFlinkService();
         FlinkDeployment deployment = TestUtils.buildSessionCluster();
-        SessionObserver observer = new SessionObserver(null, flinkService, configManager);
+        SessionObserver observer =
+                new SessionObserver(flinkService, configManager, new EventRecorder(null, null));
         deployment
                 .getStatus()
                 .getReconciliationStatus()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index 766c35c..9f4a315 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -23,15 +23,18 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
-import org.apache.flink.kubernetes.operator.TestingStatusHelper;
+import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.reconciler.sessionjob.FlinkSessionJobReconciler;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Map;
@@ -39,16 +42,25 @@ import java.util.Map;
 /** Tests for {@link SessionJobObserver}. */
 @EnableKubernetesMockClient(crud = true)
 public class SessionJobObserverTest {
+
     private KubernetesClient kubernetesClient;
     private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
+    private final TestingFlinkService flinkService = new TestingFlinkService();
+    private SessionJobObserver observer;
+    private FlinkSessionJobReconciler reconciler;
+
+    @BeforeEach
+    public void before() {
+        var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+        TestingStatusRecorder<FlinkSessionJobStatus> statusRecorder = new TestingStatusRecorder<>();
+        observer =
+                new SessionJobObserver(flinkService, configManager, statusRecorder, eventRecorder);
+        reconciler = new FlinkSessionJobReconciler(kubernetesClient, flinkService, configManager);
+    }
 
     @Test
     public void testBasicObserve() throws Exception {
         final var sessionJob = TestUtils.buildSessionJob();
-        final var flinkService = new TestingFlinkService(kubernetesClient);
-        final var reconciler = new FlinkSessionJobReconciler(null, flinkService, configManager);
-        final var observer =
-                new SessionJobObserver(flinkService, configManager, new TestingStatusHelper<>());
         final var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
 
         // observe the brand new job, nothing to do.
@@ -106,10 +118,6 @@ public class SessionJobObserverTest {
     @Test
     public void testObserveWithEffectiveConfig() throws Exception {
         final var sessionJob = TestUtils.buildSessionJob();
-        final var flinkService = new TestingFlinkService(kubernetesClient);
-        final var reconciler = new FlinkSessionJobReconciler(null, flinkService, configManager);
-        final var observer =
-                new SessionJobObserver(flinkService, configManager, new TestingStatusHelper<>());
         final var readyContext =
                 TestUtils.createContextWithReadyFlinkDeployment(
                         Map.of(RestOptions.PORT.key(), "8088"));
@@ -132,10 +140,6 @@ public class SessionJobObserverTest {
     @Test
     public void testObserveSavepoint() throws Exception {
         final var sessionJob = TestUtils.buildSessionJob();
-        final var flinkService = new TestingFlinkService(kubernetesClient);
-        final var reconciler = new FlinkSessionJobReconciler(null, flinkService, configManager);
-        final var observer =
-                new SessionJobObserver(flinkService, configManager, new TestingStatusHelper<>());
         final var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
 
         // submit job
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 81978fb..ca9d819 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
@@ -46,6 +47,7 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -67,15 +69,22 @@ public class ApplicationReconcilerTest {
 
     private KubernetesClient kubernetesClient;
     private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
+    private TestingFlinkService flinkService = new TestingFlinkService();
+    private ApplicationReconciler reconciler;
+
+    @BeforeEach
+    public void before() {
+        var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+        reconciler =
+                new ApplicationReconciler(
+                        kubernetesClient, flinkService, configManager, eventRecorder);
+    }
 
     @ParameterizedTest
     @EnumSource(FlinkVersion.class)
     public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService();
         Context context = flinkService.getContext();
 
-        ApplicationReconciler reconciler =
-                new ApplicationReconciler(kubernetesClient, flinkService, configManager);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster(flinkVersion);
 
         reconciler.reconcile(deployment, context);
@@ -168,11 +177,8 @@ public class ApplicationReconcilerTest {
     @Test
     public void testUpgradeModeChangeFromSavepointToLastState() throws Exception {
         final String expectedSavepointPath = "savepoint_0";
-        TestingFlinkService flinkService = new TestingFlinkService();
         Context context = flinkService.getContext();
 
-        final ApplicationReconciler reconciler =
-                new ApplicationReconciler(kubernetesClient, flinkService, configManager);
         final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
 
         reconciler.reconcile(deployment, context);
@@ -211,10 +217,7 @@ public class ApplicationReconcilerTest {
 
     @Test
     public void triggerSavepoint() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService();
         Context context = flinkService.getContext();
-        ApplicationReconciler reconciler =
-                new ApplicationReconciler(kubernetesClient, flinkService, configManager);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
 
         reconciler.reconcile(deployment, context);
@@ -303,12 +306,9 @@ public class ApplicationReconcilerTest {
     @Test
     public void testUpgradeModeChangedToLastStateShouldTriggerSavepointWhileHADisabled()
             throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService();
         flinkService.setHaDataAvailable(false);
         Context context = flinkService.getContext();
 
-        final ApplicationReconciler reconciler =
-                new ApplicationReconciler(kubernetesClient, flinkService, configManager);
         final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         deployment.getSpec().getFlinkConfiguration().remove(HighAvailabilityOptions.HA_MODE.key());
 
@@ -352,11 +352,8 @@ public class ApplicationReconcilerTest {
     @Test
     public void testUpgradeModeChangedToLastStateShouldNotTriggerSavepointWhileHAEnabled()
             throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService();
         Context context = flinkService.getContext();
 
-        final ApplicationReconciler reconciler =
-                new ApplicationReconciler(kubernetesClient, flinkService, configManager);
         final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
 
         reconciler.reconcile(deployment, context);
@@ -389,11 +386,8 @@ public class ApplicationReconcilerTest {
 
     @Test
     public void triggerRestart() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService();
         Context context = flinkService.getContext();
 
-        ApplicationReconciler reconciler =
-                new ApplicationReconciler(kubernetesClient, flinkService, configManager);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
 
         reconciler.reconcile(deployment, context);
@@ -451,10 +445,7 @@ public class ApplicationReconcilerTest {
 
     @Test
     public void testJobUpgradeIgnorePendingSavepoint() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService();
         Context context = flinkService.getContext();
-        ApplicationReconciler reconciler =
-                new ApplicationReconciler(kubernetesClient, flinkService, configManager);
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         reconciler.reconcile(deployment, context);
         List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
@@ -489,10 +480,6 @@ public class ApplicationReconcilerTest {
 
     @Test
     public void testRandomJobResultStorePath() throws Exception {
-
-        TestingFlinkService flinkService = new TestingFlinkService();
-        ApplicationReconciler reconciler =
-                new ApplicationReconciler(kubernetesClient, flinkService, configManager);
         FlinkDeployment flinkApp = TestUtils.buildApplicationCluster();
         final String haStoragePath = "file:///flink-data/ha";
         flinkApp.getSpec()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index 0f7cfbf..e62cc0a 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.junit.jupiter.api.Test;
@@ -36,6 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 public class SessionReconcilerTest {
 
     private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
+    private final EventRecorder eventRecorder = new EventRecorder(null, (r, e) -> {});
 
     @Test
     public void testStartSession() throws Exception {
@@ -50,7 +52,8 @@ public class SessionReconcilerTest {
                     }
                 };
 
-        SessionReconciler reconciler = new SessionReconciler(null, flinkService, configManager);
+        SessionReconciler reconciler =
+                new SessionReconciler(null, flinkService, configManager, eventRecorder);
         FlinkDeployment deployment = TestUtils.buildSessionCluster();
         reconciler.reconcile(deployment, context);
         assertEquals(1, count.get());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
index db2dfec..29ee2c5 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
@@ -51,7 +51,8 @@ public class EventUtilsTest {
                         EventUtils.Type.Warning,
                         reason,
                         message,
-                        EventUtils.Component.Operator));
+                        EventUtils.Component.Operator,
+                        e -> {}));
         var event =
                 kubernetesClient
                         .v1()
@@ -70,7 +71,8 @@ public class EventUtilsTest {
                         EventUtils.Type.Warning,
                         reason,
                         message,
-                        EventUtils.Component.Operator));
+                        EventUtils.Component.Operator,
+                        e -> {}));
         event =
                 kubernetesClient
                         .v1()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusHelperTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
similarity index 93%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusHelperTest.java
rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
index 8527f93..209c1d2 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusHelperTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
@@ -28,16 +28,16 @@ import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-/** Test for {@link StatusHelper}. */
+/** Test for {@link StatusRecorder}. */
 @EnableKubernetesMockClient(crud = true)
-public class StatusHelperTest {
+public class StatusRecorderTest {
 
     private KubernetesClient kubernetesClient;
     private KubernetesMockServer mockServer;
 
     @Test
     public void testPatchOnlyWhenChanged() throws InterruptedException {
-        var helper = new StatusHelper<FlinkDeploymentStatus>(kubernetesClient);
+        var helper = new StatusRecorder<FlinkDeploymentStatus>(kubernetesClient, (e, s) -> {});
         var deployment = TestUtils.buildApplicationCluster();
         kubernetesClient.resource(deployment).createOrReplace();
         var lastRequest = mockServer.getLastRequest();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java
index 2417047..cee4071 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java
@@ -25,14 +25,11 @@ import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.validation.DefaultValidator;
 import org.apache.flink.kubernetes.operator.validation.TestValidator;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
-import java.io.File;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
+import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,30 +37,20 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 /** Test class for {@link ValidatorUtils}. */
 public class ValidatorUtilsTest {
 
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-    private static final String VALIDATOR_NAME = "test-validator";
-    private static final String VALIDATOR_JAR = VALIDATOR_NAME + "-test-jar.jar";
+    @TempDir public Path temporaryFolder;
 
     @Test
     public void testDiscoverValidators() throws IOException {
-        File validatorRootFolder = temporaryFolder.newFolder();
-        File testValidatorFolder = new File(validatorRootFolder, VALIDATOR_NAME);
-        assertTrue(testValidatorFolder.mkdirs());
-        File testValidatorJar = new File("target", VALIDATOR_JAR);
-        assertTrue(testValidatorJar.exists());
-        Files.copy(
-                testValidatorJar.toPath(),
-                Paths.get(testValidatorFolder.toString(), VALIDATOR_JAR));
         Map<String, String> originalEnv = System.getenv();
         try {
             Map<String, String> systemEnv = new HashMap<>(originalEnv);
-            systemEnv.put(ConfigConstants.ENV_FLINK_PLUGINS_DIR, validatorRootFolder.getPath());
+            systemEnv.put(
+                    ConfigConstants.ENV_FLINK_PLUGINS_DIR,
+                    TestUtils.getTestPluginsRootDir(temporaryFolder));
             TestUtils.setEnv(systemEnv);
             assertEquals(
                     new HashSet<>(
diff --git a/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.listener.FlinkResourceListener b/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.listener.FlinkResourceListener
new file mode 100644
index 0000000..8e89fa4
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.listener.FlinkResourceListener
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.kubernetes.operator.listener.TestingListener
\ No newline at end of file