You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/10 12:05:58 UTC

[incubator-streampark] branch dev updated: [bug] on k8s get metrics bug fixed (#1999)

This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 80f883b1f [bug] on k8s get metrics bug fixed (#1999)
80f883b1f is described below

commit 80f883b1f3d931d7d639fb179e07bbbc02c8412a
Author: benjobs <be...@apache.org>
AuthorDate: Thu Nov 10 20:05:52 2022 +0800

    [bug] on k8s get metrics bug fixed (#1999)
---
 .../core/task/K8sFlinkChangeEventListener.java     | 59 ++++++++++++----------
 1 file changed, 33 insertions(+), 26 deletions(-)

diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
index 04fcbe77e..da38e12b7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/K8sFlinkChangeEventListener.java
@@ -96,19 +96,15 @@ public class K8sFlinkChangeEventListener {
             return;
         }
         // update application record
-        updateApplicationWithJobStatusCV(app, jobStatus);
-        // when a flink job status change event can be received, it means
-        // that the operation command sent by streampark has been completed.
-        app.setOptionState(OptionState.NONE.getValue());
-        applicationService.updateById(app);
+        Application newApp = getUpdateAppWithJobStatusCV(app, jobStatus);
+        applicationService.updateTracking(newApp);
 
         // email alerts when necessary
         FlinkAppState state = FlinkAppState.of(app.getState());
         if (FlinkAppState.FAILED.equals(state) || FlinkAppState.LOST.equals(state)
             || FlinkAppState.RESTARTING.equals(state) || FlinkAppState.FINISHED.equals(state)) {
             IngressController.deleteIngress(app.getClusterId(), app.getK8sNamespace());
-            Application finalApp = app;
-            executor.execute(() -> alertService.alert(finalApp, state));
+            executor.execute(() -> alertService.alert(app, state));
         }
     }
 
@@ -126,13 +122,21 @@ public class K8sFlinkChangeEventListener {
         if (ExecutionMode.KUBERNETES_NATIVE_SESSION.equals(mode)) {
             return;
         }
-        Application app = applicationService.getById(trackId.appId());
-        app.setJmMemory(metrics.totalJmMemory());
-        app.setTmMemory(metrics.totalTmMemory());
-        app.setTotalTM(metrics.totalTm());
-        app.setTotalSlot(metrics.totalSlot());
-        app.setAvailableSlot(metrics.availableSlot());
-        applicationService.updateById(app);
+
+        Application app =  applicationService.getById(trackId.appId());
+        if (app == null) {
+            return;
+        }
+
+        Application newApp = new Application();
+        newApp.setId(trackId.appId());
+        newApp.setJmMemory(metrics.totalJmMemory());
+        newApp.setTmMemory(metrics.totalTmMemory());
+        newApp.setTotalTM(metrics.totalTm());
+        newApp.setTotalSlot(metrics.totalSlot());
+        newApp.setAvailableSlot(metrics.availableSlot());
+
+        applicationService.updateTracking(newApp);
     }
 
     @SuppressWarnings("UnstableApiUsage")
@@ -154,29 +158,28 @@ public class K8sFlinkChangeEventListener {
         checkpointProcessor.process(event.trackId().appId(), checkPoint);
     }
 
-    private void updateApplicationWithJobStatusCV(Application app, JobStatusCV jobStatus) {
+    private Application getUpdateAppWithJobStatusCV(Application app, JobStatusCV jobStatus) {
+        Application newApp = new Application();
+        newApp.setId(app.getId());
+        newApp.setJobId(jobStatus.jobId());
+        newApp.setTotalTask(jobStatus.taskTotal());
+
         // infer the final flink job state
         Enumeration.Value state = jobStatus.jobState();
         Enumeration.Value preState = toK8sFlinkJobState(FlinkAppState.of(app.getState()));
         state = FlinkJobStatusWatcher.inferFlinkJobStateFromPersist(state, preState);
-        app.setState(fromK8sFlinkJobState(state).getValue());
-
-        // update relevant fields of Application from JobStatusCV
-        app.setJobId(jobStatus.jobId());
-        app.setTotalTask(jobStatus.taskTotal());
+        newApp.setState(fromK8sFlinkJobState(state).getValue());
 
         // corrective start-time / end-time / duration
         long preStartTime = app.getStartTime() != null ? app.getStartTime().getTime() : 0;
         long startTime = Math.max(jobStatus.jobStartTime(), preStartTime);
-
         long preEndTime = app.getEndTime() != null ? app.getEndTime().getTime() : 0;
         long endTime = Math.max(jobStatus.jobEndTime(), preEndTime);
 
         long duration = jobStatus.duration();
-
         if (FlinkJobState.isEndState(state)) {
             IngressController.deleteIngress(app.getJobName(), app.getK8sNamespace());
-            app.setOptionState(OptionState.NONE.getValue());
+            newApp.setOptionState(OptionState.NONE.getValue());
             if (endTime < startTime) {
                 endTime = System.currentTimeMillis();
             }
@@ -184,9 +187,13 @@ public class K8sFlinkChangeEventListener {
                 duration = endTime - startTime;
             }
         }
-        app.setStartTime(new Date(startTime > 0 ? startTime : 0));
-        app.setEndTime(endTime > 0 && endTime >= startTime ? new Date(endTime) : null);
-        app.setDuration(duration > 0 ? duration : 0);
+        newApp.setStartTime(new Date(startTime > 0 ? startTime : 0));
+        newApp.setEndTime(endTime > 0 && endTime >= startTime ? new Date(endTime) : null);
+        newApp.setDuration(duration > 0 ? duration : 0);
+        // when a flink job status change event can be received, it means
+        // that the operation command sent by streampark has been completed.
+        newApp.setOptionState(OptionState.NONE.getValue());
+        return newApp;
     }
 
 }