You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/10 12:05:58 UTC
[incubator-streampark] branch dev updated: [bug] on k8s get metrics bug fixed (#1999)
This is an automated email from the ASF dual-hosted git repository.
monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 80f883b1f [bug] on k8s get metrics bug fixed (#1999)
80f883b1f is described below
commit 80f883b1f3d931d7d639fb179e07bbbc02c8412a
Author: benjobs <be...@apache.org>
AuthorDate: Thu Nov 10 20:05:52 2022 +0800
[bug] on k8s get metrics bug fixed (#1999)
---
.../core/task/K8sFlinkChangeEventListener.java | 59 ++++++++++++----------
1 file changed, 33 insertions(+), 26 deletions(-)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
index 04fcbe77e..da38e12b7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
@@ -96,19 +96,15 @@ public class K8sFlinkChangeEventListener {
return;
}
// update application record
- updateApplicationWithJobStatusCV(app, jobStatus);
- // when a flink job status change event can be received, it means
- // that the operation command sent by streampark has been completed.
- app.setOptionState(OptionState.NONE.getValue());
- applicationService.updateById(app);
+ Application newApp = getUpdateAppWithJobStatusCV(app, jobStatus);
+ applicationService.updateTracking(newApp);
// email alerts when necessary
FlinkAppState state = FlinkAppState.of(app.getState());
if (FlinkAppState.FAILED.equals(state) || FlinkAppState.LOST.equals(state)
|| FlinkAppState.RESTARTING.equals(state) || FlinkAppState.FINISHED.equals(state)) {
IngressController.deleteIngress(app.getClusterId(), app.getK8sNamespace());
- Application finalApp = app;
- executor.execute(() -> alertService.alert(finalApp, state));
+ executor.execute(() -> alertService.alert(app, state));
}
}
@@ -126,13 +122,21 @@ public class K8sFlinkChangeEventListener {
if (ExecutionMode.KUBERNETES_NATIVE_SESSION.equals(mode)) {
return;
}
- Application app = applicationService.getById(trackId.appId());
- app.setJmMemory(metrics.totalJmMemory());
- app.setTmMemory(metrics.totalTmMemory());
- app.setTotalTM(metrics.totalTm());
- app.setTotalSlot(metrics.totalSlot());
- app.setAvailableSlot(metrics.availableSlot());
- applicationService.updateById(app);
+
+ Application app = applicationService.getById(trackId.appId());
+ if (app == null) {
+ return;
+ }
+
+ Application newApp = new Application();
+ newApp.setId(trackId.appId());
+ newApp.setJmMemory(metrics.totalJmMemory());
+ newApp.setTmMemory(metrics.totalTmMemory());
+ newApp.setTotalTM(metrics.totalTm());
+ newApp.setTotalSlot(metrics.totalSlot());
+ newApp.setAvailableSlot(metrics.availableSlot());
+
+ applicationService.updateTracking(newApp);
}
@SuppressWarnings("UnstableApiUsage")
@@ -154,29 +158,28 @@ public class K8sFlinkChangeEventListener {
checkpointProcessor.process(event.trackId().appId(), checkPoint);
}
- private void updateApplicationWithJobStatusCV(Application app, JobStatusCV jobStatus) {
+ private Application getUpdateAppWithJobStatusCV(Application app, JobStatusCV jobStatus) {
+ Application newApp = new Application();
+ newApp.setId(app.getId());
+ newApp.setJobId(jobStatus.jobId());
+ newApp.setTotalTask(jobStatus.taskTotal());
+
// infer the final flink job state
Enumeration.Value state = jobStatus.jobState();
Enumeration.Value preState = toK8sFlinkJobState(FlinkAppState.of(app.getState()));
state = FlinkJobStatusWatcher.inferFlinkJobStateFromPersist(state, preState);
- app.setState(fromK8sFlinkJobState(state).getValue());
-
- // update relevant fields of Application from JobStatusCV
- app.setJobId(jobStatus.jobId());
- app.setTotalTask(jobStatus.taskTotal());
+ newApp.setState(fromK8sFlinkJobState(state).getValue());
// corrective start-time / end-time / duration
long preStartTime = app.getStartTime() != null ? app.getStartTime().getTime() : 0;
long startTime = Math.max(jobStatus.jobStartTime(), preStartTime);
-
long preEndTime = app.getEndTime() != null ? app.getEndTime().getTime() : 0;
long endTime = Math.max(jobStatus.jobEndTime(), preEndTime);
long duration = jobStatus.duration();
-
if (FlinkJobState.isEndState(state)) {
IngressController.deleteIngress(app.getJobName(), app.getK8sNamespace());
- app.setOptionState(OptionState.NONE.getValue());
+ newApp.setOptionState(OptionState.NONE.getValue());
if (endTime < startTime) {
endTime = System.currentTimeMillis();
}
@@ -184,9 +187,13 @@ public class K8sFlinkChangeEventListener {
duration = endTime - startTime;
}
}
- app.setStartTime(new Date(startTime > 0 ? startTime : 0));
- app.setEndTime(endTime > 0 && endTime >= startTime ? new Date(endTime) : null);
- app.setDuration(duration > 0 ? duration : 0);
+ newApp.setStartTime(new Date(startTime > 0 ? startTime : 0));
+ newApp.setEndTime(endTime > 0 && endTime >= startTime ? new Date(endTime) : null);
+ newApp.setDuration(duration > 0 ? duration : 0);
+ // when a flink job status change event can be received, it means
+ // that the operation command sent by streampark has been completed.
+ newApp.setOptionState(OptionState.NONE.getValue());
+ return newApp;
}
}