You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/31 08:23:53 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-30669] update most recent job status in flinkdeployment

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 9168c98b [FLINK-30669] update most recent job status in flinkdeployment
9168c98b is described below

commit 9168c98b2e52538648670f79ee37a19e95d9580a
Author: Mohemmad Zaid Khan <za...@gmail.com>
AuthorDate: Tue Jan 31 13:53:48 2023 +0530

    [FLINK-30669] update most recent job status in flinkdeployment
---
 .../flink/kubernetes/operator/observer/JobStatusObserver.java |  4 +++-
 .../operator/observer/deployment/ApplicationObserver.java     |  3 +++
 .../operator/observer/deployment/ApplicationObserverTest.java | 11 ++++-------
 3 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 99501a7e..1062796f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -158,6 +158,7 @@ public abstract class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> {
     private void updateJobStatus(FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) {
         var resource = ctx.getResource();
         var jobStatus = resource.getStatus().getJobStatus();
+        var previousJobId = jobStatus.getJobId();
         var previousJobStatus = jobStatus.getState();
 
         jobStatus.setState(clusterJobStatus.getJobState().name());
@@ -165,7 +166,8 @@ public abstract class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> {
         jobStatus.setJobId(clusterJobStatus.getJobId().toHexString());
         jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
 
-        if (jobStatus.getState().equals(previousJobStatus)) {
+        if (jobStatus.getJobId().equals(previousJobId)
+                && jobStatus.getState().equals(previousJobStatus)) {
             LOG.info("Job status ({}) unchanged", previousJobStatus);
         } else {
             jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis()));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index d414ceff..a08cccda 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
+import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
 
@@ -79,6 +80,8 @@ public class ApplicationObserver extends AbstractFlinkDeploymentObserver {
         protected Optional<JobStatusMessage> filterTargetJob(
                 JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
             if (!clusterJobStatuses.isEmpty()) {
+                clusterJobStatuses.sort(
+                        Comparator.comparingLong(JobStatusMessage::getStartTime).reversed());
                 return Optional.of(clusterJobStatuses.get(0));
             }
             return Optional.empty();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 772cf68e..032bac2d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -263,13 +263,10 @@ public class ApplicationObserverTest extends OperatorTestBase {
         assertFalse(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
         assertEquals(
                 1,
-                kubernetesClient
-                        .v1()
-                        .events()
-                        .inNamespace(deployment.getMetadata().getNamespace())
-                        .list()
-                        .getItems()
-                        .size());
+                kubernetesClient.v1().events().inNamespace(deployment.getMetadata().getNamespace())
+                        .list().getItems().stream()
+                        .filter(e -> e.getReason().contains("SavepointError"))
+                        .count());
 
         deployment.getStatus().getJobStatus().getSavepointInfo().setTriggerId("unknown");
         deployment