You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/08/22 05:40:04 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-29033] Expose timestamp in resource listener context

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 964bedef [FLINK-29033] Expose timestamp in resource listener context
964bedef is described below

commit 964bedefc1e6011110c5a45ecc007494cd531300
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Tue Aug 16 20:32:44 2022 +0200

    [FLINK-29033] Expose timestamp in resource listener context
---
 .../kubernetes/operator/listener/FlinkResourceListener.java      | 9 +++++++++
 .../apache/flink/kubernetes/operator/utils/StatusRecorder.java   | 7 +++++++
 .../kubernetes/operator/listener/FlinkResourceListenerTest.java  | 8 ++++++++
 3 files changed, 24 insertions(+)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListener.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListener.java
index 5cd61433..73e54862 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListener.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListener.java
@@ -28,6 +28,8 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import io.fabric8.kubernetes.api.model.Event;
 import io.fabric8.kubernetes.client.KubernetesClient;
 
+import java.time.Instant;
+
 /** Listener interface for Flink resource related events and status changes. */
 public interface FlinkResourceListener extends Plugin {
 
@@ -44,12 +46,19 @@ public interface FlinkResourceListener extends Plugin {
         R getFlinkResource();
 
         KubernetesClient getKubernetesClient();
+
+        Instant getTimestamp();
     }
 
     /** Context for Resource Event listener methods. */
     interface ResourceEventContext<R extends AbstractFlinkResource<?, ?>>
             extends ResourceContext<R> {
         Event getEvent();
+
+        @Override
+        default Instant getTimestamp() {
+            return Instant.parse(getEvent().getLastTimestamp());
+        }
     }
 
     /** Context for Status listener methods. */
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index 47e34806..c042e0de 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -35,6 +35,7 @@ import lombok.SneakyThrows;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Instant;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiConsumer;
@@ -163,6 +164,7 @@ public class StatusRecorder<
                     Collection<FlinkResourceListener> listeners) {
         BiConsumer<CR, S> consumer =
                 (resource, previousStatus) -> {
+                    var now = Instant.now();
                     var ctx =
                             new FlinkResourceListener.StatusUpdateContext() {
                                 @Override
@@ -179,6 +181,11 @@ public class StatusRecorder<
                                 public KubernetesClient getKubernetesClient() {
                                     return kubernetesClient;
                                 }
+
+                                @Override
+                                public Instant getTimestamp() {
+                                    return now;
+                                }
                             };
 
                     listeners.forEach(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
index c3a42c8c..9d18aa6f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
@@ -30,6 +30,7 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.time.Instant;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -73,6 +74,8 @@ public class FlinkResourceListenerTest {
 
         assertEquals(1, listener2.updates.size());
         assertEquals(deployment, listener2.updates.get(0).getFlinkResource());
+        assertEquals(
+                listener1.updates.get(0).getTimestamp(), listener2.updates.get(0).getTimestamp());
 
         deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
         statusRecorder.patchAndCacheStatus(deployment);
@@ -109,6 +112,11 @@ public class FlinkResourceListenerTest {
 
         for (int i = 0; i < listener1.events.size(); i++) {
             assertEquals(listener1.events.get(i).getEvent(), listener2.events.get(i).getEvent());
+            assertEquals(
+                    listener1.events.get(i).getTimestamp(),
+                    Instant.parse(listener1.events.get(i).getEvent().getLastTimestamp()));
+            assertEquals(
+                    listener1.events.get(i).getTimestamp(), listener2.events.get(i).getTimestamp());
         }
     }
 }