You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/08/22 05:40:04 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-29033] Expose timestamp in resource listener context
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 964bedef [FLINK-29033] Expose timestamp in resource listener context
964bedef is described below
commit 964bedefc1e6011110c5a45ecc007494cd531300
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Tue Aug 16 20:32:44 2022 +0200
[FLINK-29033] Expose timestamp in resource listener context
---
.../kubernetes/operator/listener/FlinkResourceListener.java | 9 +++++++++
.../apache/flink/kubernetes/operator/utils/StatusRecorder.java | 7 +++++++
.../kubernetes/operator/listener/FlinkResourceListenerTest.java | 8 ++++++++
3 files changed, 24 insertions(+)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListener.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListener.java
index 5cd61433..73e54862 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListener.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListener.java
@@ -28,6 +28,8 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.client.KubernetesClient;
+import java.time.Instant;
+
/** Listener interface for Flink resource related events and status changes. */
public interface FlinkResourceListener extends Plugin {
@@ -44,12 +46,19 @@ public interface FlinkResourceListener extends Plugin {
R getFlinkResource();
KubernetesClient getKubernetesClient();
+
+ Instant getTimestamp();
}
/** Context for Resource Event listener methods. */
interface ResourceEventContext<R extends AbstractFlinkResource<?, ?>>
extends ResourceContext<R> {
Event getEvent();
+
+ @Override
+ default Instant getTimestamp() {
+ return Instant.parse(getEvent().getLastTimestamp());
+ }
}
/** Context for Status listener methods. */
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index 47e34806..c042e0de 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -35,6 +35,7 @@ import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Instant;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
@@ -163,6 +164,7 @@ public class StatusRecorder<
Collection<FlinkResourceListener> listeners) {
BiConsumer<CR, S> consumer =
(resource, previousStatus) -> {
+ var now = Instant.now();
var ctx =
new FlinkResourceListener.StatusUpdateContext() {
@Override
@@ -179,6 +181,11 @@ public class StatusRecorder<
public KubernetesClient getKubernetesClient() {
return kubernetesClient;
}
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
};
listeners.forEach(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
index c3a42c8c..9d18aa6f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
@@ -30,6 +30,7 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.time.Instant;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -73,6 +74,8 @@ public class FlinkResourceListenerTest {
assertEquals(1, listener2.updates.size());
assertEquals(deployment, listener2.updates.get(0).getFlinkResource());
+ assertEquals(
+ listener1.updates.get(0).getTimestamp(), listener2.updates.get(0).getTimestamp());
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
statusRecorder.patchAndCacheStatus(deployment);
@@ -109,6 +112,11 @@ public class FlinkResourceListenerTest {
for (int i = 0; i < listener1.events.size(); i++) {
assertEquals(listener1.events.get(i).getEvent(), listener2.events.get(i).getEvent());
+ assertEquals(
+ listener1.events.get(i).getTimestamp(),
+ Instant.parse(listener1.events.get(i).getEvent().getLastTimestamp()));
+ assertEquals(
+ listener1.events.get(i).getTimestamp(), listener2.events.get(i).getTimestamp());
}
}
}