You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/09/14 02:24:44 UTC

[incubator-linkis] branch dev-1.3.0 updated: [ISSUE-3297] Fix the problem of database update ec metrics delay (#3298)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.0 by this push:
     new 1eb7218b0 [ISSUE-3297]  Fix the problem of database update ec metrics delay (#3298)
1eb7218b0 is described below

commit 1eb7218b0c22e25bc507db91d68d07751ad7c3bf
Author: weipengfei <36...@users.noreply.github.com>
AuthorDate: Wed Sep 14 10:24:38 2022 +0800

    [ISSUE-3297]  Fix the problem of database update ec metrics delay (#3298)
    
    * Fix the problem of database update  ec metrics  delay
---
 .../linkis/entrance/persistence/QueryPersistenceManager.java   |  5 ++++-
 .../linkis/entrance/execute/DefaultEntranceExecutor.scala      | 10 +++++-----
 .../scala/org/apache/linkis/entrance/execute/EntranceJob.scala |  6 ++++++
 3 files changed, 15 insertions(+), 6 deletions(-)

diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
index a70abca5b..29a0000f0 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
@@ -146,10 +146,13 @@ public class QueryPersistenceManager extends PersistenceManager {
     } catch (Exception e) {
       logger.warn("Invalid progress : " + entranceJob.getJobRequest().getProgress(), e);
     }
-    if (job.getProgress() >= 0 && persistedProgress >= updatedProgress) {
+    if (job.getProgress() >= 0
+        && persistedProgress >= updatedProgress
+        && entranceJob.getUpdateMetrisFlag()) {
       return;
     }
     job.setProgress(updatedProgress);
+    entranceJob.setUpdateMetrisFlag(true);
     entranceJob.getJobRequest().setProgress(String.valueOf(updatedProgress));
     updateJobStatus(job);
   }
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index 2d8cb1c07..f2c63e59e 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -92,6 +92,11 @@ class DefaultEntranceExecutor(
       orchestratorFuture.operate[ProgressProcessor](DefaultProgressOperation.PROGRESS_NAME)
     progressProcessor.doOnObtain(progressInfoEvent => {
       if (null != entranceJob) {
+        JobHistoryHelper.updateJobRequestMetrics(
+          entranceJob.getJobRequest,
+          progressInfoEvent.resourceMap,
+          progressInfoEvent.infoMap
+        )
         val jobGroups = entranceJob.getJobGroups
         if (jobGroups.length > 0) {
           val subJobInfo = entranceJob.getRunningSubJob
@@ -120,11 +125,6 @@ class DefaultEntranceExecutor(
             _.onProgressUpdate(entranceJob, progressInfoEvent.progress, entranceJob.getProgressInfo)
           )
         }
-        JobHistoryHelper.updateJobRequestMetrics(
-          entranceJob.getJobRequest,
-          progressInfoEvent.resourceMap,
-          progressInfoEvent.infoMap
-        )
       }
     })
     progressProcessor
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
index eba19091a..d45500eb1 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
@@ -68,6 +68,12 @@ abstract class EntranceJob extends Job {
   //  private var resultSize = -1
   private var entranceContext: EntranceContext = _
 
+  private var updateMetrisFlag: Boolean = false
+
+  def getUpdateMetrisFlag: Boolean = this.updateMetrisFlag
+
+  def setUpdateMetrisFlag(updateDbFlag: Boolean): Unit = this.updateMetrisFlag = updateDbFlag
+
   /**
    * Record newest time that a client access status of this job Can be used to monitor client
    * status. e.g. server can detect if linkis-cli process has abnormally ended then kill the job


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org