You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/29 14:41:06 UTC

[flink-kubernetes-operator] branch main updated (97883eb -> 1b4fc47)

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


    from 97883eb  [tooling] Use 7 letter hash in release scripts
     new 4143700  [FLINK-26891] Record important deployment events
     new 1b4fc47  [FLINK-26891] Centralize reason codes in EventRecorder

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../controller/FlinkDeploymentController.java      |  5 ++-
 .../operator/observer/JobStatusObserver.java       |  9 +++--
 .../operator/observer/SavepointObserver.java       |  9 +++--
 .../deployment/AbstractDeploymentObserver.java     |  9 +++--
 .../AbstractFlinkResourceReconciler.java           | 27 ++++++++++++---
 .../deployment/AbstractJobReconciler.java          | 15 ++++++---
 .../deployment/ApplicationReconciler.java          | 20 +++++++-----
 .../reconciler/deployment/SessionReconciler.java   | 15 ++++-----
 .../sessionjob/SessionJobReconciler.java           |  4 +--
 .../kubernetes/operator/utils/EventRecorder.java   | 38 ++++++++++++++++++++--
 .../kubernetes/operator/utils/EventUtils.java      | 23 ++++---------
 .../kubernetes/operator/utils/SavepointUtils.java  |  8 ++---
 .../controller/FlinkDeploymentControllerTest.java  | 35 ++++++++++++++++++++
 .../listener/FlinkResourceListenerTest.java        | 17 +++++-----
 .../deployment/ApplicationReconcilerTest.java      |  4 +--
 .../sessionjob/SessionJobReconcilerTest.java       | 19 ++++++++---
 .../kubernetes/operator/utils/EventUtilsTest.java  | 20 ++++++------
 17 files changed, 183 insertions(+), 94 deletions(-)


[flink-kubernetes-operator] 01/02: [FLINK-26891] Record important deployment events

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 41437000190952587529876b7fefce6f521335f9
Author: Thomas Weise <th...@apache.org>
AuthorDate: Mon Jun 27 19:11:06 2022 -0400

    [FLINK-26891] Record important deployment events
---
 .../AbstractFlinkResourceReconciler.java           | 32 ++++++++++++++++----
 .../deployment/AbstractJobReconciler.java          | 16 ++++++----
 .../deployment/ApplicationReconciler.java          | 21 +++++++------
 .../reconciler/deployment/SessionReconciler.java   |  6 ++--
 .../sessionjob/SessionJobReconciler.java           |  4 +--
 .../controller/FlinkDeploymentControllerTest.java  | 35 ++++++++++++++++++++++
 .../deployment/ApplicationReconcilerTest.java      |  4 +--
 .../sessionjob/SessionJobReconcilerTest.java       | 20 ++++++++++---
 8 files changed, 108 insertions(+), 30 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 2e0d592..737e9cd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -33,6 +33,7 @@ import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -65,6 +66,15 @@ public abstract class AbstractFlinkResourceReconciler<
     protected final KubernetesClient kubernetesClient;
     protected final FlinkService flinkService;
 
+    public static final String MSG_SUSPENDED = "Suspending existing deployment.";
+    public static final String MSG_SPEC_CHANGED = "Detected spec change, starting reconciliation.";
+    public static final String MSG_ROLLBACK = "Rolling back failed deployment.";
+    public static final String MSG_SUBMIT = "Starting deployment";
+    public static final String REASON_SUSPENDED = "Suspended";
+    public static final String REASON_SPEC_CHANGED = "Spec Changed";
+    public static final String REASON_ROLLBACK = "Rollback";
+    public static final String REASON_SUBMIT = "Submit";
+
     public AbstractFlinkResourceReconciler(
             KubernetesClient kubernetesClient,
             FlinkService flinkService,
@@ -95,7 +105,7 @@ public abstract class AbstractFlinkResourceReconciler<
         if (firstDeployment) {
             LOG.info("Deploying for the first time");
             deploy(
-                    cr.getMetadata(),
+                    cr,
                     spec,
                     status,
                     deployConfig,
@@ -118,14 +128,26 @@ public abstract class AbstractFlinkResourceReconciler<
             if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
                 return;
             }
-            LOG.info("Reconciling spec change");
+            LOG.info(MSG_SPEC_CHANGED);
+            eventRecorder.triggerEvent(
+                    cr,
+                    EventUtils.Type.Normal,
+                    REASON_SPEC_CHANGED,
+                    MSG_SPEC_CHANGED,
+                    EventUtils.Component.JobManagerDeployment);
             reconcileSpecChange(cr, observeConfig, deployConfig);
         } else if (shouldRollBack(reconciliationStatus, observeConfig)) {
             // Rollbacks are executed in two steps, we initiate it first then return
             if (initiateRollBack(status)) {
                 return;
             }
-            LOG.warn("Executing rollback operation");
+            LOG.warn(MSG_ROLLBACK);
+            eventRecorder.triggerEvent(
+                    cr,
+                    EventUtils.Type.Normal,
+                    REASON_ROLLBACK,
+                    MSG_ROLLBACK,
+                    EventUtils.Component.JobManagerDeployment);
             rollback(cr, ctx, observeConfig);
         } else if (!reconcileOtherChanges(cr, observeConfig)) {
             LOG.info("Resource fully reconciled, nothing to do...");
@@ -206,7 +228,7 @@ public abstract class AbstractFlinkResourceReconciler<
     /**
      * Deploys the target resource spec to Kubernetes.
      *
-     * @param meta ObjectMeta of the related resource.
+     * @param relatedResource Related resource.
      * @param spec Spec that should be deployed to Kubernetes.
      * @param status Status object of the resource
      * @param deployConfig Flink conf for the deployment.
@@ -215,7 +237,7 @@ public abstract class AbstractFlinkResourceReconciler<
      * @throws Exception
      */
     protected abstract void deploy(
-            ObjectMeta meta,
+            CR relatedResource,
             SPEC spec,
             STATUS status,
             Configuration deployConfig,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 3ca8735..367d3e4 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -30,9 +30,9 @@ import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 
-import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.slf4j.Logger;
@@ -98,6 +98,12 @@ public abstract class AbstractJobReconciler<
             if (availableUpgradeMode.isEmpty()) {
                 return;
             }
+            eventRecorder.triggerEvent(
+                    resource,
+                    EventUtils.Type.Normal,
+                    REASON_SUSPENDED,
+                    MSG_SUSPENDED,
+                    EventUtils.Component.JobManagerDeployment);
             // We must record the upgrade mode used to the status later
             currentDeploySpec.getJob().setUpgradeMode(availableUpgradeMode.get());
             cancelJob(resource, availableUpgradeMode.get(), observeConfig);
@@ -105,7 +111,7 @@ public abstract class AbstractJobReconciler<
         }
         if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
             restoreJob(
-                    deployMeta,
+                    resource,
                     currentDeploySpec,
                     status,
                     deployConfig,
@@ -150,7 +156,7 @@ public abstract class AbstractJobReconciler<
     }
 
     protected void restoreJob(
-            ObjectMeta meta,
+            CR resource,
             SPEC spec,
             STATUS status,
             Configuration deployConfig,
@@ -164,7 +170,7 @@ public abstract class AbstractJobReconciler<
                             .flatMap(s -> Optional.ofNullable(s.getLocation()));
         }
 
-        deploy(meta, spec, status, deployConfig, savepointOpt, requireHaMetadata);
+        deploy(resource, spec, status, deployConfig, savepointOpt, requireHaMetadata);
     }
 
     @Override
@@ -183,7 +189,7 @@ public abstract class AbstractJobReconciler<
                 observeConfig);
 
         restoreJob(
-                resource.getMetadata(),
+                resource,
                 rollbackSpec,
                 resource.getStatus(),
                 getDeployConfig(resource.getMetadata(), rollbackSpec, context),
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index b89a88b..f270571 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -30,6 +30,7 @@ import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
@@ -107,7 +108,7 @@ public class ApplicationReconciler
 
     @Override
     protected void deploy(
-            ObjectMeta meta,
+            FlinkDeployment relatedResource,
             FlinkDeploymentSpec spec,
             FlinkDeploymentStatus status,
             Configuration deployConfig,
@@ -129,7 +130,7 @@ public class ApplicationReconciler
                 throw new RuntimeException("This indicates a bug...");
             }
             LOG.info("Deleting deployment with terminated application before new deployment");
-            flinkService.deleteClusterDeployment(meta, status, true);
+            flinkService.deleteClusterDeployment(relatedResource.getMetadata(), status, true);
             FlinkUtils.waitForClusterShutdown(
                     kubernetesClient,
                     deployConfig,
@@ -138,11 +139,18 @@ public class ApplicationReconciler
                             .getFlinkShutdownClusterTimeout()
                             .toSeconds());
         }
+        eventRecorder.triggerEvent(
+                relatedResource,
+                EventUtils.Type.Normal,
+                REASON_SUBMIT,
+                MSG_SUBMIT,
+                EventUtils.Component.JobManagerDeployment);
         flinkService.submitApplicationCluster(spec.getJob(), deployConfig, requireHaMetadata);
         status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
 
-        IngressUtils.updateIngressRules(meta, spec, deployConfig, kubernetesClient);
+        IngressUtils.updateIngressRules(
+                relatedResource.getMetadata(), spec, deployConfig, kubernetesClient);
     }
 
     @Override
@@ -188,12 +196,7 @@ public class ApplicationReconciler
             throws Exception {
         LOG.info("Missing Flink Cluster deployment, trying to recover...");
         FlinkDeploymentSpec specToRecover = ReconciliationUtils.getDeployedSpec(deployment);
-        restoreJob(
-                deployment.getMetadata(),
-                specToRecover,
-                deployment.getStatus(),
-                observeConfig,
-                true);
+        restoreJob(deployment, specToRecover, deployment.getStatus(), observeConfig, true);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index ba8d390..21803c1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -103,7 +103,7 @@ public class SessionReconciler
                         .getFlinkShutdownClusterTimeout()
                         .toSeconds());
         deploy(
-                deployment.getMetadata(),
+                deployment,
                 deploySpec,
                 deployment.getStatus(),
                 effectiveConfig,
@@ -113,7 +113,7 @@ public class SessionReconciler
 
     @Override
     protected void deploy(
-            ObjectMeta meta,
+            FlinkDeployment cr,
             FlinkDeploymentSpec spec,
             FlinkDeploymentStatus status,
             Configuration deployConfig,
@@ -122,7 +122,7 @@ public class SessionReconciler
             throws Exception {
         flinkService.submitSessionCluster(deployConfig);
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
-        IngressUtils.updateIngressRules(meta, spec, deployConfig, kubernetesClient);
+        IngressUtils.updateIngressRules(cr.getMetadata(), spec, deployConfig, kubernetesClient);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index 7757500..5869b91 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -79,7 +79,7 @@ public class SessionJobReconciler
 
     @Override
     protected void deploy(
-            ObjectMeta meta,
+            FlinkSessionJob cr,
             FlinkSessionJobSpec sessionJobSpec,
             FlinkSessionJobStatus status,
             Configuration deployConfig,
@@ -88,7 +88,7 @@ public class SessionJobReconciler
             throws Exception {
         var jobID =
                 flinkService.submitJobToSessionCluster(
-                        meta, sessionJobSpec, deployConfig, savepoint.orElse(null));
+                        cr.getMetadata(), sessionJobSpec, deployConfig, savepoint.orElse(null));
         status.setJobStatus(
                 new JobStatus()
                         .toBuilder()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 482ba76..b7c1f1c 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.crd.status.TaskManagerInfo;
 import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
@@ -164,6 +165,22 @@ public class FlinkDeploymentControllerTest {
 
     @Test
     public void verifyFailedDeployment() throws Exception {
+        var submittedEventValidatingResponseProvider =
+                new TestUtils.ValidatingResponseProvider<>(
+                        new EventBuilder().withNewMetadata().endMetadata().build(),
+                        r -> {
+                            assertTrue(
+                                    r.getBody()
+                                            .readUtf8()
+                                            .contains(AbstractFlinkResourceReconciler.MSG_SUBMIT));
+                        });
+        mockServer
+                .expect()
+                .post()
+                .withPath("/api/v1/namespaces/flink-operator-test/events")
+                .andReply(submittedEventValidatingResponseProvider)
+                .once();
+
         var validatingResponseProvider =
                 new TestUtils.ValidatingResponseProvider<>(
                         new EventBuilder().withNewMetadata().endMetadata().build(),
@@ -184,6 +201,7 @@ public class FlinkDeploymentControllerTest {
         updateControl =
                 testController.reconcile(
                         appCluster, TestUtils.createContextWithFailedJobManagerDeployment());
+        submittedEventValidatingResponseProvider.assertValidated();
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -218,6 +236,22 @@ public class FlinkDeploymentControllerTest {
     public void verifyInProgressDeploymentWithCrashLoopBackoff() throws Exception {
         String crashLoopMessage = "container fails";
 
+        var submittedEventValidatingResponseProvider =
+                new TestUtils.ValidatingResponseProvider<>(
+                        new EventBuilder().withNewMetadata().endMetadata().build(),
+                        r -> {
+                            assertTrue(
+                                    r.getBody()
+                                            .readUtf8()
+                                            .contains(AbstractFlinkResourceReconciler.MSG_SUBMIT));
+                        });
+        mockServer
+                .expect()
+                .post()
+                .withPath("/api/v1/namespaces/flink-operator-test/events")
+                .andReply(submittedEventValidatingResponseProvider)
+                .once();
+
         var validatingResponseProvider =
                 new TestUtils.ValidatingResponseProvider<>(
                         new EventBuilder().withNewMetadata().endMetadata().build(),
@@ -244,6 +278,7 @@ public class FlinkDeploymentControllerTest {
         updateControl =
                 testController.reconcile(
                         appCluster, TestUtils.createContextWithInProgressDeployment());
+        submittedEventValidatingResponseProvider.assertValidated();
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 18c847c..0dcc41b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -492,14 +492,14 @@ public class ApplicationReconcilerTest {
 
         status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
-        reconciler.deploy(deployMeta, spec, status, deployConfig, Optional.empty(), false);
+        reconciler.deploy(flinkApp, spec, status, deployConfig, Optional.empty(), false);
 
         String path1 = deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
         Assertions.assertTrue(path1.startsWith(haStoragePath));
 
         status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
-        reconciler.deploy(deployMeta, spec, status, deployConfig, Optional.empty(), false);
+        reconciler.deploy(flinkApp, spec, status, deployConfig, Optional.empty(), false);
         String path2 = deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
         Assertions.assertTrue(path2.startsWith(haStoragePath));
         assertNotEquals(path1, path2);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index 6d1c414..59908d8 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
@@ -31,6 +32,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
@@ -55,11 +57,23 @@ public class SessionJobReconcilerTest {
 
     private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
     private TestingFlinkService flinkService = new TestingFlinkService();
+    private EventRecorder eventRecorder;
     private SessionJobReconciler reconciler;
 
     @BeforeEach
     public void before() {
-        var eventRecorder = new EventRecorder(null, (r, e) -> {});
+        eventRecorder =
+                new EventRecorder(null, (r, e) -> {}) {
+                    @Override
+                    public boolean triggerEvent(
+                            AbstractFlinkResource<?, ?> resource,
+                            EventUtils.Type type,
+                            String reason,
+                            String message,
+                            EventUtils.Component component) {
+                        return false;
+                    }
+                };
         reconciler = new SessionJobReconciler(null, flinkService, configManager, eventRecorder);
     }
 
@@ -430,9 +444,7 @@ public class SessionJobReconcilerTest {
                                         .key(),
                                 "true")));
         // Force upgrade when savepoint is in progress.
-        reconciler =
-                new SessionJobReconciler(
-                        null, flinkService, configManager, new EventRecorder(null, null));
+        reconciler = new SessionJobReconciler(null, flinkService, configManager, eventRecorder);
         spSessionJob.getSpec().getJob().setParallelism(100);
         reconciler.reconcile(spSessionJob, readyContext);
         assertEquals(


[flink-kubernetes-operator] 02/02: [FLINK-26891] Centralize reason codes in EventRecorder

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 1b4fc472e0e31818157d4940b2b06e902299c851
Author: Thomas Weise <th...@apache.org>
AuthorDate: Tue Jun 28 20:43:03 2022 -0400

    [FLINK-26891] Centralize reason codes in EventRecorder
---
 .../controller/FlinkDeploymentController.java      |  5 ++-
 .../operator/observer/JobStatusObserver.java       |  9 +++--
 .../operator/observer/SavepointObserver.java       |  9 +++--
 .../deployment/AbstractDeploymentObserver.java     |  9 +++--
 .../AbstractFlinkResourceReconciler.java           | 21 +++++-------
 .../deployment/AbstractJobReconciler.java          |  9 +++--
 .../deployment/ApplicationReconciler.java          |  9 +++--
 .../reconciler/deployment/SessionReconciler.java   |  9 +++--
 .../kubernetes/operator/utils/EventRecorder.java   | 38 ++++++++++++++++++++--
 .../kubernetes/operator/utils/EventUtils.java      | 23 ++++---------
 .../kubernetes/operator/utils/SavepointUtils.java  |  8 ++---
 .../listener/FlinkResourceListenerTest.java        | 17 +++++-----
 .../sessionjob/SessionJobReconcilerTest.java       |  5 ++-
 .../kubernetes/operator/utils/EventUtilsTest.java  | 20 ++++++------
 14 files changed, 101 insertions(+), 90 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 346bbe3..d36f96b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -30,7 +30,6 @@ import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
-import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
 
@@ -139,10 +138,10 @@ public class FlinkDeploymentController
         ReconciliationUtils.updateForReconciliationError(flinkApp, dfe.getMessage());
         eventRecorder.triggerEvent(
                 flinkApp,
-                EventUtils.Type.Warning,
+                EventRecorder.Type.Warning,
                 dfe.getReason(),
                 dfe.getMessage(),
-                EventUtils.Component.JobManagerDeployment);
+                EventRecorder.Component.JobManagerDeployment);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 4836d71..cfe87fd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -23,7 +23,6 @@ import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
 import org.slf4j.Logger;
@@ -142,10 +141,10 @@ public abstract class JobStatusObserver<CTX> {
             setErrorIfPresent(resource, clusterJobStatus, deployedConfig);
             eventRecorder.triggerEvent(
                     resource,
-                    EventUtils.Type.Normal,
-                    "Status Changed",
-                    message,
-                    EventUtils.Component.Job);
+                    EventRecorder.Type.Normal,
+                    EventRecorder.Reason.StatusChanged,
+                    EventRecorder.Component.Job,
+                    message);
         }
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
index 202f7b3..74f3d35 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
@@ -31,7 +31,6 @@ import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.ConfigOptionUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
@@ -120,12 +119,12 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> {
                         savepointInfo, resource);
                 eventRecorder.triggerEvent(
                         resource,
-                        EventUtils.Type.Warning,
-                        "SavepointError",
+                        EventRecorder.Type.Warning,
+                        EventRecorder.Reason.SavepointError,
+                        EventRecorder.Component.Operator,
                         SavepointUtils.createSavepointError(
                                 savepointInfo,
-                                resource.getSpec().getJob().getSavepointTriggerNonce()),
-                        EventUtils.Component.Operator);
+                                resource.getSpec().getJob().getSavepointTriggerNonce()));
             } else {
                 LOG.warn("Savepoint failed within grace period, retrying: " + err);
             }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index f67bc53..1bbc6ca 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -33,7 +33,6 @@ import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 
@@ -247,10 +246,10 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
         ReconciliationUtils.updateForReconciliationError(deployment, err);
         eventRecorder.triggerEvent(
                 deployment,
-                EventUtils.Type.Warning,
-                "Missing",
-                err,
-                EventUtils.Component.JobManagerDeployment);
+                EventRecorder.Type.Warning,
+                EventRecorder.Reason.Missing,
+                EventRecorder.Component.JobManagerDeployment,
+                err);
     }
 
     /**
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 737e9cd..06cce7c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -33,7 +33,6 @@ import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -70,10 +69,6 @@ public abstract class AbstractFlinkResourceReconciler<
     public static final String MSG_SPEC_CHANGED = "Detected spec change, starting reconciliation.";
     public static final String MSG_ROLLBACK = "Rolling back failed deployment.";
     public static final String MSG_SUBMIT = "Starting deployment";
-    public static final String REASON_SUSPENDED = "Suspended";
-    public static final String REASON_SPEC_CHANGED = "Spec Changed";
-    public static final String REASON_ROLLBACK = "Rollback";
-    public static final String REASON_SUBMIT = "Submit";
 
     public AbstractFlinkResourceReconciler(
             KubernetesClient kubernetesClient,
@@ -131,10 +126,10 @@ public abstract class AbstractFlinkResourceReconciler<
             LOG.info(MSG_SPEC_CHANGED);
             eventRecorder.triggerEvent(
                     cr,
-                    EventUtils.Type.Normal,
-                    REASON_SPEC_CHANGED,
-                    MSG_SPEC_CHANGED,
-                    EventUtils.Component.JobManagerDeployment);
+                    EventRecorder.Type.Normal,
+                    EventRecorder.Reason.SpecChanged,
+                    EventRecorder.Component.JobManagerDeployment,
+                    MSG_SPEC_CHANGED);
             reconcileSpecChange(cr, observeConfig, deployConfig);
         } else if (shouldRollBack(reconciliationStatus, observeConfig)) {
             // Rollbacks are executed in two steps, we initiate it first then return
@@ -144,10 +139,10 @@ public abstract class AbstractFlinkResourceReconciler<
             LOG.warn(MSG_ROLLBACK);
             eventRecorder.triggerEvent(
                     cr,
-                    EventUtils.Type.Normal,
-                    REASON_ROLLBACK,
-                    MSG_ROLLBACK,
-                    EventUtils.Component.JobManagerDeployment);
+                    EventRecorder.Type.Normal,
+                    EventRecorder.Reason.Rollback,
+                    EventRecorder.Component.JobManagerDeployment,
+                    MSG_ROLLBACK);
             rollback(cr, ctx, observeConfig);
         } else if (!reconcileOtherChanges(cr, observeConfig)) {
             LOG.info("Resource fully reconciled, nothing to do...");
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 367d3e4..0ef1948 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -30,7 +30,6 @@ import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -100,10 +99,10 @@ public abstract class AbstractJobReconciler<
             }
             eventRecorder.triggerEvent(
                     resource,
-                    EventUtils.Type.Normal,
-                    REASON_SUSPENDED,
-                    MSG_SUSPENDED,
-                    EventUtils.Component.JobManagerDeployment);
+                    EventRecorder.Type.Normal,
+                    EventRecorder.Reason.Suspended,
+                    EventRecorder.Component.JobManagerDeployment,
+                    MSG_SUSPENDED);
             // We must record the upgrade mode used to the status later
             currentDeploySpec.getJob().setUpgradeMode(availableUpgradeMode.get());
             cancelJob(resource, availableUpgradeMode.get(), observeConfig);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index f270571..1508b9c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -30,7 +30,6 @@ import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
@@ -141,10 +140,10 @@ public class ApplicationReconciler
         }
         eventRecorder.triggerEvent(
                 relatedResource,
-                EventUtils.Type.Normal,
-                REASON_SUBMIT,
-                MSG_SUBMIT,
-                EventUtils.Component.JobManagerDeployment);
+                EventRecorder.Type.Normal,
+                EventRecorder.Reason.Submit,
+                EventRecorder.Component.JobManagerDeployment,
+                MSG_SUBMIT);
         flinkService.submitApplicationCluster(spec.getJob(), deployConfig, requireHaMetadata);
         status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 21803c1..e816582 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -29,7 +29,6 @@ import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 
@@ -166,10 +165,10 @@ public class SessionReconciler
                                     .collect(Collectors.toList()));
             if (eventRecorder.triggerEvent(
                     deployment,
-                    EventUtils.Type.Warning,
-                    "Cleanup",
-                    error,
-                    EventUtils.Component.Operator)) {
+                    EventRecorder.Type.Warning,
+                    EventRecorder.Reason.Cleanup,
+                    EventRecorder.Component.Operator,
+                    error)) {
                 LOG.warn(error);
             }
             return DeleteControl.noFinalizerRemoval()
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index ff87e8f..c96fec0 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -46,10 +46,19 @@ public class EventRecorder {
 
     public boolean triggerEvent(
             AbstractFlinkResource<?, ?> resource,
-            EventUtils.Type type,
+            Type type,
+            Reason reason,
+            Component component,
+            String message) {
+        return triggerEvent(resource, type, reason.toString(), message, component);
+    }
+
+    public boolean triggerEvent(
+            AbstractFlinkResource<?, ?> resource,
+            Type type,
             String reason,
             String message,
-            EventUtils.Component component) {
+            Component component) {
         return EventUtils.createOrUpdateEvent(
                 client,
                 resource,
@@ -94,4 +103,29 @@ public class EventRecorder {
                                 });
         return new EventRecorder(client, biConsumer);
     }
+
+    /** The type of the events. */
+    public enum Type {
+        Normal,
+        Warning
+    }
+
+    /** The component of events. */
+    public enum Component {
+        Operator,
+        JobManagerDeployment,
+        Job
+    }
+
+    /** The reason codes of events. */
+    public enum Reason {
+        Suspended,
+        SpecChanged,
+        Rollback,
+        Submit,
+        StatusChanged,
+        SavepointError,
+        Cleanup,
+        Missing
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
index 100b857..5c26d1f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
@@ -32,21 +32,12 @@ import java.util.function.Consumer;
  */
 public class EventUtils {
 
-    /** The type of the events. */
-    public enum Type {
-        Normal,
-        Warning
-    }
-
-    /** The component of events. */
-    public enum Component {
-        Operator,
-        JobManagerDeployment,
-        Job
-    }
-
     public static String generateEventName(
-            HasMetadata target, Type type, String reason, String message, Component component) {
+            HasMetadata target,
+            EventRecorder.Type type,
+            String reason,
+            String message,
+            EventRecorder.Component component) {
         return component
                 + "."
                 + ((reason
@@ -62,10 +53,10 @@ public class EventUtils {
     public static boolean createOrUpdateEvent(
             KubernetesClient client,
             HasMetadata target,
-            Type type,
+            EventRecorder.Type type,
             String reason,
             String message,
-            Component component,
+            EventRecorder.Component component,
             Consumer<Event> eventListener) {
         var eventName = generateEventName(target, type, reason, message, component);
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
index 23e5871..5a47add 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
@@ -150,11 +150,11 @@ public class SavepointUtils {
             LOG.error("Job is not running, cancelling savepoint operation");
             eventRecorder.triggerEvent(
                     resource,
-                    EventUtils.Type.Warning,
-                    "SavepointError",
+                    EventRecorder.Type.Warning,
+                    EventRecorder.Reason.SavepointError,
+                    EventRecorder.Component.Operator,
                     createSavepointError(
-                            savepointInfo, resource.getSpec().getJob().getSavepointTriggerNonce()),
-                    EventUtils.Component.Operator);
+                            savepointInfo, resource.getSpec().getJob().getSavepointTriggerNonce()));
         }
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
index e624b6c..86fe742 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -94,17 +93,17 @@ public class FlinkResourceListenerTest {
 
         eventRecorder.triggerEvent(
                 deployment,
-                EventUtils.Type.Warning,
-                "SavepointError",
-                "err",
-                EventUtils.Component.Operator);
+                EventRecorder.Type.Warning,
+                EventRecorder.Reason.SavepointError,
+                EventRecorder.Component.Operator,
+                "err");
         assertEquals(1, listener1.events.size());
         eventRecorder.triggerEvent(
                 deployment,
-                EventUtils.Type.Warning,
-                "SavepointError",
-                "err",
-                EventUtils.Component.Operator);
+                EventRecorder.Type.Warning,
+                EventRecorder.Reason.SavepointError,
+                EventRecorder.Component.Operator,
+                "err");
         assertEquals(2, listener1.events.size());
 
         for (int i = 0; i < listener1.events.size(); i++) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index 59908d8..b47c111 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
@@ -67,10 +66,10 @@ public class SessionJobReconcilerTest {
                     @Override
                     public boolean triggerEvent(
                             AbstractFlinkResource<?, ?> resource,
-                            EventUtils.Type type,
+                            Type type,
                             String reason,
                             String message,
-                            EventUtils.Component component) {
+                            Component component) {
                         return false;
                     }
                 };
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
index 29ee2c5..9e9404e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
@@ -40,18 +40,18 @@ public class EventUtilsTest {
         var eventName =
                 EventUtils.generateEventName(
                         flinkApp,
-                        EventUtils.Type.Warning,
+                        EventRecorder.Type.Warning,
                         reason,
                         message,
-                        EventUtils.Component.Operator);
+                        EventRecorder.Component.Operator);
         Assertions.assertTrue(
                 EventUtils.createOrUpdateEvent(
                         kubernetesClient,
                         flinkApp,
-                        EventUtils.Type.Warning,
+                        EventRecorder.Type.Warning,
                         reason,
                         message,
-                        EventUtils.Component.Operator,
+                        EventRecorder.Component.Operator,
                         e -> {}));
         var event =
                 kubernetesClient
@@ -68,10 +68,10 @@ public class EventUtilsTest {
                 EventUtils.createOrUpdateEvent(
                         kubernetesClient,
                         flinkApp,
-                        EventUtils.Type.Warning,
+                        EventRecorder.Type.Warning,
                         reason,
                         message,
-                        EventUtils.Component.Operator,
+                        EventRecorder.Component.Operator,
                         e -> {}));
         event =
                 kubernetesClient
@@ -93,18 +93,18 @@ public class EventUtilsTest {
         var name1 =
                 EventUtils.generateEventName(
                         flinkApp,
-                        EventUtils.Type.Warning,
+                        EventRecorder.Type.Warning,
                         reason,
                         message,
-                        EventUtils.Component.Operator);
+                        EventRecorder.Component.Operator);
         flinkApp.getMetadata().setUid("uid2");
         var name2 =
                 EventUtils.generateEventName(
                         flinkApp,
-                        EventUtils.Type.Warning,
+                        EventRecorder.Type.Warning,
                         reason,
                         message,
-                        EventUtils.Component.Operator);
+                        EventRecorder.Component.Operator);
         Assertions.assertNotEquals(name1, name2);
     }
 }