You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/31 08:23:53 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-30669] update most recent job status in flinkdeployment
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 9168c98b [FLINK-30669] update most recent job status in flinkdeployment
9168c98b is described below
commit 9168c98b2e52538648670f79ee37a19e95d9580a
Author: Mohemmad Zaid Khan <za...@gmail.com>
AuthorDate: Tue Jan 31 13:53:48 2023 +0530
[FLINK-30669] update most recent job status in flinkdeployment
---
.../flink/kubernetes/operator/observer/JobStatusObserver.java | 4 +++-
.../operator/observer/deployment/ApplicationObserver.java | 3 +++
.../operator/observer/deployment/ApplicationObserverTest.java | 11 ++++-------
3 files changed, 10 insertions(+), 8 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 99501a7e..1062796f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -158,6 +158,7 @@ public abstract class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> {
private void updateJobStatus(FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) {
var resource = ctx.getResource();
var jobStatus = resource.getStatus().getJobStatus();
+ var previousJobId = jobStatus.getJobId();
var previousJobStatus = jobStatus.getState();
jobStatus.setState(clusterJobStatus.getJobState().name());
@@ -165,7 +166,8 @@ public abstract class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> {
jobStatus.setJobId(clusterJobStatus.getJobId().toHexString());
jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
- if (jobStatus.getState().equals(previousJobStatus)) {
+ if (jobStatus.getJobId().equals(previousJobId)
+ && jobStatus.getState().equals(previousJobStatus)) {
LOG.info("Job status ({}) unchanged", previousJobStatus);
} else {
jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis()));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index d414ceff..a08cccda 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.client.JobStatusMessage;
+import java.util.Comparator;
import java.util.List;
import java.util.Optional;
@@ -79,6 +80,8 @@ public class ApplicationObserver extends AbstractFlinkDeploymentObserver {
protected Optional<JobStatusMessage> filterTargetJob(
JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
if (!clusterJobStatuses.isEmpty()) {
+ clusterJobStatuses.sort(
+ Comparator.comparingLong(JobStatusMessage::getStartTime).reversed());
return Optional.of(clusterJobStatuses.get(0));
}
return Optional.empty();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 772cf68e..032bac2d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -263,13 +263,10 @@ public class ApplicationObserverTest extends OperatorTestBase {
assertFalse(SavepointUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
assertEquals(
1,
- kubernetesClient
- .v1()
- .events()
- .inNamespace(deployment.getMetadata().getNamespace())
- .list()
- .getItems()
- .size());
+ kubernetesClient.v1().events().inNamespace(deployment.getMetadata().getNamespace())
+ .list().getItems().stream()
+ .filter(e -> e.getReason().contains("SavepointError"))
+ .count());
deployment.getStatus().getJobStatus().getSavepointInfo().setTriggerId("unknown");
deployment