You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/09/14 02:24:44 UTC
[incubator-linkis] branch dev-1.3.0 updated: [ISSUE-3297] Fix the problem of database update ec metrics delay (#3298)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.0 by this push:
new 1eb7218b0 [ISSUE-3297] Fix the problem of database update ec metrics delay (#3298)
1eb7218b0 is described below
commit 1eb7218b0c22e25bc507db91d68d07751ad7c3bf
Author: weipengfei <36...@users.noreply.github.com>
AuthorDate: Wed Sep 14 10:24:38 2022 +0800
[ISSUE-3297] Fix the problem of database update ec metrics delay (#3298)
* Fix the problem of database update ec metrics delay
---
.../linkis/entrance/persistence/QueryPersistenceManager.java | 5 ++++-
.../linkis/entrance/execute/DefaultEntranceExecutor.scala | 10 +++++-----
.../scala/org/apache/linkis/entrance/execute/EntranceJob.scala | 6 ++++++
3 files changed, 15 insertions(+), 6 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
index a70abca5b..29a0000f0 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java
@@ -146,10 +146,13 @@ public class QueryPersistenceManager extends PersistenceManager {
} catch (Exception e) {
logger.warn("Invalid progress : " + entranceJob.getJobRequest().getProgress(), e);
}
- if (job.getProgress() >= 0 && persistedProgress >= updatedProgress) {
+ if (job.getProgress() >= 0
+ && persistedProgress >= updatedProgress
+ && entranceJob.getUpdateMetrisFlag()) {
return;
}
job.setProgress(updatedProgress);
+ entranceJob.setUpdateMetrisFlag(true);
entranceJob.getJobRequest().setProgress(String.valueOf(updatedProgress));
updateJobStatus(job);
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index 2d8cb1c07..f2c63e59e 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -92,6 +92,11 @@ class DefaultEntranceExecutor(
orchestratorFuture.operate[ProgressProcessor](DefaultProgressOperation.PROGRESS_NAME)
progressProcessor.doOnObtain(progressInfoEvent => {
if (null != entranceJob) {
+ JobHistoryHelper.updateJobRequestMetrics(
+ entranceJob.getJobRequest,
+ progressInfoEvent.resourceMap,
+ progressInfoEvent.infoMap
+ )
val jobGroups = entranceJob.getJobGroups
if (jobGroups.length > 0) {
val subJobInfo = entranceJob.getRunningSubJob
@@ -120,11 +125,6 @@ class DefaultEntranceExecutor(
_.onProgressUpdate(entranceJob, progressInfoEvent.progress, entranceJob.getProgressInfo)
)
}
- JobHistoryHelper.updateJobRequestMetrics(
- entranceJob.getJobRequest,
- progressInfoEvent.resourceMap,
- progressInfoEvent.infoMap
- )
}
})
progressProcessor
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
index eba19091a..d45500eb1 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
@@ -68,6 +68,12 @@ abstract class EntranceJob extends Job {
// private var resultSize = -1
private var entranceContext: EntranceContext = _
+ private var updateMetrisFlag: Boolean = false
+
+ def getUpdateMetrisFlag: Boolean = this.updateMetrisFlag
+
+ def setUpdateMetrisFlag(updateDbFlag: Boolean): Unit = this.updateMetrisFlag = updateDbFlag
+
/**
* Record newest time that a client access status of this job Can be used to monitor client
* status. e.g. server can detect if linkis-cli process has abnormally ended then kill the job
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org