You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2023/03/07 11:32:19 UTC

[linkis] branch dev-1.3.2 updated: [Feature][Entrance] Task Metrics add multiple time indicators (#4324)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new 7d2688c08 [Feature][Entrance] Task Metrics add multiple time indicators  (#4324)
7d2688c08 is described below

commit 7d2688c08b1d84fccc87d8eeece93bee90f0ebe9
Author: peacewong <wp...@gmail.com>
AuthorDate: Tue Mar 7 19:32:13 2023 +0800

    [Feature][Entrance] Task Metrics add multiple time indicators  (#4324)
    
    * add the job's metrics including running time, time to request EC, and time to submit the task to EC
    
    * add ec time log
    
    * Fix unit test
    
    * Fix ec time metrics to metrics map
    
    * Fix time print dateformat
    
    * Discard blank logs
    
    * [Feature][Entrance] Task Metrics add multiple time indicators #4323
---
 .../linkis/protocol/constants/TaskConstant.java    | 25 +++---
 .../linkis/entrance/job/EntranceExecutionJob.java  | 88 +++++++++++++++-------
 .../entrance/restful/EntranceRestfulApi.java       | 21 +++---
 .../entrance/cli/heartbeat/KillHandler.scala       |  4 +-
 .../entrance/execute/DefaultEntranceExecutor.scala |  4 +-
 .../linkis/entrance/execute/EntranceJob.scala      | 10 ++-
 .../linkis/entrance/log/HDFSCacheLogWriter.scala   |  6 +-
 .../entrance/parser/CommonEntranceParser.scala     |  7 +-
 .../linkis/entrance/utils/JobHistoryHelper.scala   | 31 ++++++--
 .../physical/CodeLogicalUnitExecTask.scala         | 15 +++-
 .../jobhistory/conversions/TaskConversions.scala   | 29 ++-----
 11 files changed, 143 insertions(+), 97 deletions(-)

diff --git a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java
index 8f5a68008..42661f828 100644
--- a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java
+++ b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java
@@ -50,18 +50,21 @@ public interface TaskConstant {
   String PARAMS_CONFIGURATION_RUNTIME = "runtime";
   String PARAMS_CONFIGURATION_SPECIAL = "special";
 
-  String ENTRANCEJOB_SUBMIT_TIME = "submitTime";
-  String ENTRANCEJOB_SCHEDULE_TIME = "scheduleTime";
-  String ENTRANCEJOB_TO_ORCHESTRATOR = "timeToOrchestrator";
-  String ENTRANCEJOB_COMPLETE_TIME = "completeTime";
-  String ENTRANCEJOB_YARN_METRICS = "yarnMetrics";
-  String ENTRANCEJOB_YARNRESOURCE = "yarnResource";
-  String ENTRANCEJOB_CORE_PERCENT = "corePercent";
-  String ENTRANCEJOB_MEMORY_PERCENT = "memoryPercent";
-  String ENTRANCEJOB_CORE_RGB = "coreRGB";
-  String ENTRANCEJOB_MEMORY_RGB = "memoryRGB";
+  String JOB_SUBMIT_TIME = "submitTime";
+  String JOB_SCHEDULE_TIME = "scheduleTime";
+  String JOB_RUNNING_TIME = "runningTime";
+  String JOB_TO_ORCHESTRATOR = "jobToOrchestrator";
+  String JOB_REQUEST_EC_TIME = "requestECTime";
+  String JOB_SUBMIT_TO_EC_TIME = "jobToECTIme";
+  String JOB_COMPLETE_TIME = "completeTime";
+  String JOB_YARN_METRICS = "yarnMetrics";
+  String JOB_YARNRESOURCE = "yarnResource";
+  String JOB_CORE_PERCENT = "corePercent";
+  String JOB_MEMORY_PERCENT = "memoryPercent";
+  String JOB_CORE_RGB = "coreRGB";
+  String JOB_MEMORY_RGB = "memoryRGB";
 
-  String ENTRANCEJOB_ENGINECONN_MAP = "engineconnMap";
+  String JOB_ENGINECONN_MAP = "engineconnMap";
   String ENGINE_INSTANCE = "engineInstance";
   String TICKET_ID = "ticketId";
   String ENGINE_CONN_TASK_ID = "engineConnTaskId";
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java
index ad9c698a4..d9b33820f 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java
@@ -54,12 +54,17 @@ import org.slf4j.LoggerFactory;
 
 public class EntranceExecutionJob extends EntranceJob implements LogHandler {
 
+  private static final Logger logger = LoggerFactory.getLogger(EntranceExecutionJob.class);
+
+  private static final ThreadLocal<SimpleDateFormat> dateFormatLocal =
+      ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
+
   private LogReader logReader;
   private LogWriter logWriter;
   private Object logWriterLocker = new Object();
   private WebSocketCacheLogReader webSocketCacheLogReader;
   private WebSocketLogWriter webSocketLogWriter;
-  private static final Logger logger = LoggerFactory.getLogger(EntranceExecutionJob.class);
+
   private PersistenceManager persistenceManager;
 
   public EntranceExecutionJob(PersistenceManager persistenceManager) {
@@ -167,68 +172,93 @@ public class EntranceExecutionJob extends EntranceJob implements LogHandler {
   }
 
   @Override
-  public JobInfo getJobInfo() { // TODO You can put this method on LockJob(可以将该方法放到LockJob上去)
+  public JobInfo getJobInfo() {
     String execID = getId();
     String state = this.getState().toString();
     float progress = this.getProgress();
-    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    SimpleDateFormat simpleDateFormat = dateFormatLocal.get();
+
     if (getJobRequest().getMetrics() == null) {
       getJobRequest().setMetrics(new HashMap<>());
     }
+
     Map<String, Object> metricsMap = getJobRequest().getMetrics();
     String createTime =
-        metricsMap.containsKey(TaskConstant.ENTRANCEJOB_SUBMIT_TIME)
-            ? simpleDateFormat.format(metricsMap.get(TaskConstant.ENTRANCEJOB_SUBMIT_TIME))
+        metricsMap.containsKey(TaskConstant.JOB_SUBMIT_TIME)
+            ? simpleDateFormat.format(metricsMap.get(TaskConstant.JOB_SUBMIT_TIME))
             : "not created";
     String scheduleTime =
-        metricsMap.containsKey(TaskConstant.ENTRANCEJOB_SCHEDULE_TIME)
-            ? simpleDateFormat.format(metricsMap.get(TaskConstant.ENTRANCEJOB_SCHEDULE_TIME))
+        metricsMap.containsKey(TaskConstant.JOB_SCHEDULE_TIME)
+            ? simpleDateFormat.format(metricsMap.get(TaskConstant.JOB_SCHEDULE_TIME))
             : "not scheduled";
     String startTime =
-        metricsMap.containsKey(TaskConstant.ENTRANCEJOB_TO_ORCHESTRATOR)
-            ? simpleDateFormat.format(metricsMap.get(TaskConstant.ENTRANCEJOB_TO_ORCHESTRATOR))
+        metricsMap.containsKey(TaskConstant.JOB_RUNNING_TIME)
+            ? simpleDateFormat.format(metricsMap.get(TaskConstant.JOB_RUNNING_TIME))
             : "not submitted to orchestrator";
     String endTime =
-        metricsMap.containsKey(TaskConstant.ENTRANCEJOB_COMPLETE_TIME)
-            ? simpleDateFormat.format(metricsMap.get(TaskConstant.ENTRANCEJOB_COMPLETE_TIME))
+        metricsMap.containsKey(TaskConstant.JOB_COMPLETE_TIME)
+            ? simpleDateFormat.format(metricsMap.get(TaskConstant.JOB_COMPLETE_TIME))
             : "on running or not started";
     String runTime;
-    if (metricsMap.containsKey(TaskConstant.ENTRANCEJOB_COMPLETE_TIME)) {
+    if (metricsMap.containsKey(TaskConstant.JOB_COMPLETE_TIME)) {
       runTime =
           ByteTimeUtils.msDurationToString(
-              (((Date) metricsMap.get(TaskConstant.ENTRANCEJOB_COMPLETE_TIME))).getTime()
-                  - (((Date) metricsMap.get(TaskConstant.ENTRANCEJOB_SUBMIT_TIME))).getTime());
+              (((Date) metricsMap.get(TaskConstant.JOB_COMPLETE_TIME))).getTime()
+                  - (((Date) metricsMap.get(TaskConstant.JOB_SUBMIT_TIME))).getTime());
     } else {
       runTime =
           "The task did not end normally and the usage time could not be counted.(任务并未正常结束,无法统计使用时间)";
     }
-    String metric =
-        "Task creation time(任务创建时间): "
-            + createTime
-            + ", Task scheduling time(任务调度时间): "
-            + scheduleTime
-            + ", Task start time(任务开始时间): "
-            + startTime
-            + ", Mission end time(任务结束时间): "
-            + endTime
-            + "\n\n\n"
-            + LogUtils.generateInfo(
+
+    String jobToOrchestrator =
+        metricsMap.containsKey(TaskConstant.JOB_TO_ORCHESTRATOR)
+            ? simpleDateFormat.format(metricsMap.get(TaskConstant.JOB_TO_ORCHESTRATOR))
+            : "not to orchestrator";
+    String jobRequestEC =
+        metricsMap.containsKey(TaskConstant.JOB_REQUEST_EC_TIME)
+            ? simpleDateFormat.format(metricsMap.get(TaskConstant.JOB_REQUEST_EC_TIME))
+            : "not request ec";
+    String jobSubmitToEC =
+        metricsMap.containsKey(TaskConstant.JOB_SUBMIT_TO_EC_TIME)
+            ? simpleDateFormat.format(metricsMap.get(TaskConstant.JOB_SUBMIT_TO_EC_TIME))
+            : "not submit to  ec";
+
+    StringBuffer sb = new StringBuffer();
+    sb.append("Task creation time(任务创建时间): ")
+        .append(createTime)
+        .append(", Task scheduling time(任务调度时间): ")
+        .append(scheduleTime)
+        .append(", Task start time(任务开始时间): ")
+        .append(startTime)
+        .append(", Mission end time(任务结束时间): ")
+        .append(endTime)
+        .append("\n")
+        .append(LogUtils.generateInfo(""))
+        .append("Task submit to Orchestrator time:")
+        .append(jobToOrchestrator)
+        .append(", Task request EngineConn time:")
+        .append(jobRequestEC)
+        .append(", Task submit to EngineConn time:")
+        .append(jobSubmitToEC)
+        .append("\n")
+        .append(
+            LogUtils.generateInfo(
                 "Your mission(您的任务) "
                     + this.getJobRequest().getId()
                     + " The total time spent is(总耗时时间为): "
-                    + runTime);
+                    + runTime));
+
+    String metric = sb.toString();
+
     return new JobInfo(execID, null, state, progress, metric);
   }
 
   @Override
   public void close() throws IOException {
     logger.info("job:" + jobRequest().getId() + " is closing");
-
     try {
-      // todo  Do a lot of aftercare work when close(close时候要做很多的善后工作)
       if (this.getLogWriter().isDefined()) {
         IOUtils.closeQuietly(this.getLogWriter().get());
-        // this.setLogWriter(null);
       } else {
         logger.info("job:" + jobRequest().getId() + "LogWriter is null");
       }
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
index c94bdd79a..a5d3ace35 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
@@ -326,17 +326,16 @@ public class EntranceRestfulApi implements EntranceRestfulRemote {
         JobRequest jobRequest = ((EntranceJob) job.get()).getJobRequest();
         Map<String, Object> metrics = jobRequest.getMetrics();
         Map<String, Object> metricsVo = new HashMap<>();
-        if (metrics.containsKey(TaskConstant.ENTRANCEJOB_YARNRESOURCE)) {
+        if (metrics.containsKey(TaskConstant.JOB_YARNRESOURCE)) {
           HashMap<String, ResourceWithStatus> resourceMap =
-              (HashMap<String, ResourceWithStatus>)
-                  metrics.get(TaskConstant.ENTRANCEJOB_YARNRESOURCE);
+              (HashMap<String, ResourceWithStatus>) metrics.get(TaskConstant.JOB_YARNRESOURCE);
           ArrayList<YarnResourceWithStatusVo> resoureList = new ArrayList<>(12);
           if (null != resourceMap && !resourceMap.isEmpty()) {
             resourceMap.forEach(
                 (applicationId, resource) -> {
                   resoureList.add(new YarnResourceWithStatusVo(applicationId, resource));
                 });
-            metricsVo.put(TaskConstant.ENTRANCEJOB_YARNRESOURCE, resoureList);
+            metricsVo.put(TaskConstant.JOB_YARNRESOURCE, resoureList);
             Optional<Integer> cores =
                 resourceMap.values().stream()
                     .map(resource -> resource.queueCores())
@@ -360,17 +359,17 @@ public class EntranceRestfulApi implements EntranceRestfulRemote {
             }
             String coreRGB = RGBUtils.getRGB(corePercent);
             String memoryRGB = RGBUtils.getRGB(memoryPercent);
-            metricsVo.put(TaskConstant.ENTRANCEJOB_CORE_PERCENT, corePercent);
-            metricsVo.put(TaskConstant.ENTRANCEJOB_MEMORY_PERCENT, memoryPercent);
-            metricsVo.put(TaskConstant.ENTRANCEJOB_CORE_RGB, coreRGB);
-            metricsVo.put(TaskConstant.ENTRANCEJOB_MEMORY_RGB, memoryRGB);
+            metricsVo.put(TaskConstant.JOB_CORE_PERCENT, corePercent);
+            metricsVo.put(TaskConstant.JOB_MEMORY_PERCENT, memoryPercent);
+            metricsVo.put(TaskConstant.JOB_CORE_RGB, coreRGB);
+            metricsVo.put(TaskConstant.JOB_MEMORY_RGB, memoryRGB);
 
-            message.data(TaskConstant.ENTRANCEJOB_YARN_METRICS, metricsVo);
+            message.data(TaskConstant.JOB_YARN_METRICS, metricsVo);
           } else {
-            message.data(TaskConstant.ENTRANCEJOB_YARNRESOURCE, null);
+            message.data(TaskConstant.JOB_YARNRESOURCE, null);
           }
         } else {
-          message.data(TaskConstant.ENTRANCEJOB_YARNRESOURCE, null);
+          message.data(TaskConstant.JOB_YARNRESOURCE, null);
         }
 
         message
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/KillHandler.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/KillHandler.scala
index 9add1fbe7..88453577c 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/KillHandler.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/KillHandler.scala
@@ -25,11 +25,11 @@ class KillHandler extends HeartbeatLossHandler with Logging {
   override def handle(jobs: List[EntranceJob]): Unit = {
     for (job <- jobs) {
       if (job != null) {
-        logger.info("Killing job: " + job.getJobInfo.getId)
+        logger.info("Killing job: " + job.getId)
         Utils.tryCatch(
           job.onFailure("Job is killed because of client-server connection lost", null)
         ) { t =>
-          logger.error("failed to kill job: " + job.getJobInfo.getId, t)
+          logger.error("failed to kill job: " + job.getId, t)
         }
       }
     }
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index e601b3801..c509f1005 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -285,11 +285,11 @@ class DefaultEntranceExecutor(id: Long)
       } else {
         if (
             !entranceExecuteRequest.getJob.getJobRequest.getMetrics.containsKey(
-              TaskConstant.ENTRANCEJOB_TO_ORCHESTRATOR
+              TaskConstant.JOB_TO_ORCHESTRATOR
             )
         ) {
           entranceExecuteRequest.getJob.getJobRequest.getMetrics
-            .put(TaskConstant.ENTRANCEJOB_TO_ORCHESTRATOR, new Date(System.currentTimeMillis()))
+            .put(TaskConstant.JOB_TO_ORCHESTRATOR, new Date(System.currentTimeMillis()))
         }
       }
       // 2. deal log And Response
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
index 5246ead4f..b762f5460 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
@@ -143,7 +143,7 @@ abstract class EntranceJob extends Job {
               _.onLogUpdate(this, LogUtils.generateWarn("Job Metrics has not been initialized."))
             )
           } else {
-            if (getJobRequest.getMetrics.containsKey(TaskConstant.ENTRANCEJOB_SCHEDULE_TIME)) {
+            if (getJobRequest.getMetrics.containsKey(TaskConstant.JOB_SCHEDULE_TIME)) {
               getLogListener.foreach(
                 _.onLogUpdate(
                   this,
@@ -152,7 +152,7 @@ abstract class EntranceJob extends Job {
               )
             } else {
               getJobRequest.getMetrics.put(
-                TaskConstant.ENTRANCEJOB_SCHEDULE_TIME,
+                TaskConstant.JOB_SCHEDULE_TIME,
                 new Date(System.currentTimeMillis)
               )
             }
@@ -177,9 +177,13 @@ abstract class EntranceJob extends Job {
               LogUtils.generateInfo("Your job is Running now. Please wait it to complete.")
             )
           )
+          getJobRequest.getMetrics.put(
+            TaskConstant.JOB_RUNNING_TIME,
+            new Date(System.currentTimeMillis)
+          )
         case _ if SchedulerEventState.isCompleted(toState) =>
           getJobRequest.getMetrics.put(
-            TaskConstant.ENTRANCEJOB_COMPLETE_TIME,
+            TaskConstant.JOB_COMPLETE_TIME,
             new Date(System.currentTimeMillis())
           )
           if (getJobInfo != null) {
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala
index 8aa02f949..24633dfbb 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala
@@ -123,12 +123,10 @@ class HDFSCacheLogWriter(logPath: String, charset: String, sharedCache: Cache, u
   }
 
   override def write(msg: String): Unit = {
-    if (StringUtils.isBlank(msg)) {
-      cache("")
-    } else {
+    if (StringUtils.isNotBlank(msg)) {
       val rows = msg.split("\n")
       rows.foreach(row => {
-        if (row == null) cache("") else cache(row)
+        if (StringUtils.isNotBlank(row)) cache(row)
       })
     }
   }
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
index 5108a7bf4..60164ca58 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
@@ -123,10 +123,7 @@ class CommonEntranceParser(val persistenceManager: PersistenceManager)
     jobRequest.setStatus(SchedulerEventState.Inited.toString)
     // Entry indicator: task submission time
     jobRequest.setMetrics(new util.HashMap[String, AnyRef]())
-    jobRequest.getMetrics.put(
-      TaskConstant.ENTRANCEJOB_SUBMIT_TIME,
-      new Date(System.currentTimeMillis)
-    )
+    jobRequest.getMetrics.put(TaskConstant.JOB_SUBMIT_TIME, new Date(System.currentTimeMillis))
     jobRequest.setParams(configMap)
     jobRequest
   }
@@ -272,7 +269,7 @@ class CommonEntranceParser(val persistenceManager: PersistenceManager)
     // Package labels
     jobReq.setLabels(labelList)
     jobReq.setMetrics(new util.HashMap[String, AnyRef]())
-    jobReq.getMetrics.put(TaskConstant.ENTRANCEJOB_SUBMIT_TIME, new Date(System.currentTimeMillis))
+    jobReq.getMetrics.put(TaskConstant.JOB_SUBMIT_TIME, new Date(System.currentTimeMillis))
     jobReq
   }
 
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
index cfe14672c..ec2912888 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
@@ -165,31 +165,31 @@ object JobHistoryHelper extends Logging {
   def updateJobRequestMetrics(
       jobRequest: JobRequest,
       resourceInfo: util.Map[String, ResourceWithStatus],
-      ecInfo: util.Map[String, AnyRef]
+      infoMap: util.Map[String, AnyRef]
   ): Unit = {
     // update resource
     if (jobRequest.getMetrics == null) {
       jobRequest.setMetrics(new util.HashMap[String, AnyRef]())
     }
     val metricsMap = jobRequest.getMetrics
-    val resourceMap = metricsMap.get(TaskConstant.ENTRANCEJOB_YARNRESOURCE)
+    val resourceMap = metricsMap.get(TaskConstant.JOB_YARNRESOURCE)
     val ecResourceMap =
       if (resourceInfo == null) new util.HashMap[String, ResourceWithStatus] else resourceInfo
     if (resourceMap != null) {
       resourceMap.asInstanceOf[util.HashMap[String, ResourceWithStatus]].putAll(ecResourceMap)
     } else {
-      metricsMap.put(TaskConstant.ENTRANCEJOB_YARNRESOURCE, ecResourceMap)
+      metricsMap.put(TaskConstant.JOB_YARNRESOURCE, ecResourceMap)
     }
     var engineInstanceMap: util.HashMap[String, AnyRef] = null
-    if (metricsMap.containsKey(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)) {
+    if (metricsMap.containsKey(TaskConstant.JOB_ENGINECONN_MAP)) {
       engineInstanceMap = metricsMap
-        .get(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)
+        .get(TaskConstant.JOB_ENGINECONN_MAP)
         .asInstanceOf[util.HashMap[String, AnyRef]]
     } else {
       engineInstanceMap = new util.HashMap[String, AnyRef]()
-      metricsMap.put(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP, engineInstanceMap)
+      metricsMap.put(TaskConstant.JOB_ENGINECONN_MAP, engineInstanceMap)
     }
-    val infoMap = ecInfo
+
     if (null != infoMap && infoMap.containsKey(TaskConstant.TICKET_ID)) {
       val ticketId = infoMap.get(TaskConstant.TICKET_ID).asInstanceOf[String]
       val engineExtraInfoMap = engineInstanceMap
@@ -200,6 +200,23 @@ object JobHistoryHelper extends Logging {
     } else {
       logger.warn("Ec info map must contains ticketID")
     }
+
+    if (null != infoMap && infoMap.containsKey(TaskConstant.JOB_REQUEST_EC_TIME)) {
+      metricsMap.put(
+        TaskConstant.JOB_REQUEST_EC_TIME,
+        infoMap.get(TaskConstant.JOB_REQUEST_EC_TIME)
+      )
+    }
+
+    if (null != infoMap && infoMap.containsKey(TaskConstant.JOB_SUBMIT_TO_EC_TIME)) {
+      metricsMap.put(
+        TaskConstant.JOB_SUBMIT_TO_EC_TIME,
+        infoMap.get(TaskConstant.JOB_SUBMIT_TO_EC_TIME)
+      )
+    }
+    if (null != infoMap && infoMap.containsKey(TaskConstant.ENGINE_INSTANCE)) {
+      metricsMap.put(TaskConstant.ENGINE_INSTANCE, infoMap.get(TaskConstant.ENGINE_INSTANCE))
+    }
   }
 
 }
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
index bd665cd34..82e75c0bd 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
@@ -29,7 +29,6 @@ import org.apache.linkis.orchestrator.computation.execute.{
   CodeExecTaskExecutorManager
 }
 import org.apache.linkis.orchestrator.ecm.conf.ECMPluginConf
-import org.apache.linkis.orchestrator.ecm.service.impl.ComputationEngineConnExecutor
 import org.apache.linkis.orchestrator.exception.{
   OrchestratorErrorCodeSummary,
   OrchestratorErrorException,
@@ -51,6 +50,7 @@ import org.apache.linkis.scheduler.executer.{ErrorExecuteResponse, SubmitRespons
 import org.apache.commons.lang3.StringUtils
 
 import java.util
+import java.util.Date
 import java.util.concurrent.TimeUnit
 
 import scala.collection.convert.decorateAsScala._
@@ -85,6 +85,10 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask], children: Array[ExecTask
     logger.info(s"Start to execute CodeLogicalUnitExecTask(${getIDInfo()}).")
     var executor: Option[CodeExecTaskExecutor] = None
     var retryException: LinkisRetryException = null
+    if (!getPhysicalContext.exists(TaskConstant.JOB_REQUEST_EC_TIME)) {
+      // Here you should use the time when you first applied for EC
+      getPhysicalContext.set(TaskConstant.JOB_REQUEST_EC_TIME, new Date(System.currentTimeMillis))
+    }
     executor = Utils.tryCatch(codeExecTaskExecutorManager.askExecutor(this)) {
       case retry: LinkisRetryException =>
         retryException = retry
@@ -132,7 +136,14 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask], children: Array[ExecTask
             }
           )
           infoMap.put(TaskConstant.ENGINE_CONN_TASK_ID, engineConnExecId)
-          infoMap.put(TaskConstant.ENGINE_CONN_SUBMIT_TIME, System.currentTimeMillis.toString)
+          infoMap.put(TaskConstant.JOB_SUBMIT_TO_EC_TIME, new Date(System.currentTimeMillis))
+          if (getPhysicalContext.exists(TaskConstant.JOB_REQUEST_EC_TIME)) {
+            infoMap.put(
+              TaskConstant.JOB_REQUEST_EC_TIME,
+              getPhysicalContext.get(TaskConstant.JOB_REQUEST_EC_TIME).asInstanceOf[Object]
+            )
+          }
+
           val event = TaskRunningInfoEvent(
             this,
             0f,
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala
index 3ebd8ff41..15e6953ae 100644
--- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala
@@ -262,17 +262,17 @@ object TaskConversions extends Logging {
       BDPJettyServerHelper.gson.fromJson((job.getMetrics), classOf[util.Map[String, Object]])
     var completeTime: Date = null
     if (
-        null != metrics && metrics.containsKey(TaskConstant.ENTRANCEJOB_COMPLETE_TIME) && metrics
-          .get(TaskConstant.ENTRANCEJOB_COMPLETE_TIME) != null
+        null != metrics && metrics.containsKey(TaskConstant.JOB_COMPLETE_TIME) && metrics
+          .get(TaskConstant.JOB_COMPLETE_TIME) != null
     ) {
-      completeTime = dealString2Date(metrics.get(TaskConstant.ENTRANCEJOB_COMPLETE_TIME).toString)
+      completeTime = dealString2Date(metrics.get(TaskConstant.JOB_COMPLETE_TIME).toString)
     }
     var createTime: Date = null
     if (
-        null != metrics && metrics.containsKey(TaskConstant.ENTRANCEJOB_SUBMIT_TIME) && metrics
-          .get(TaskConstant.ENTRANCEJOB_SUBMIT_TIME) != null
+        null != metrics && metrics.containsKey(TaskConstant.JOB_SUBMIT_TIME) && metrics
+          .get(TaskConstant.JOB_SUBMIT_TIME) != null
     ) {
-      createTime = dealString2Date(metrics.get(TaskConstant.ENTRANCEJOB_SUBMIT_TIME).toString)
+      createTime = dealString2Date(metrics.get(TaskConstant.JOB_SUBMIT_TIME).toString)
     }
     if (null != createTime) {
       if (isJobFinished(job.getStatus)) {
@@ -287,21 +287,8 @@ object TaskConversions extends Logging {
         taskVO.setCostTime(System.currentTimeMillis() - createTime.getTime)
       }
     }
-    if (metrics.containsKey(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)) {
-      val engineMap = metrics
-        .get(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)
-        .asInstanceOf[util.Map[String, Object]]
-      if (null != engineMap && !engineMap.isEmpty) {
-        // the engineInstance in metrics may be repeat, so it needs to be distinct
-        val engineInstances =
-          engineMap.asScala
-            .map(_._2.asInstanceOf[util.Map[String, Object]])
-            .map(_.get(TaskConstant.ENGINE_INSTANCE))
-            .toList
-            .distinct
-            .mkString(",")
-        taskVO.setEngineInstance(engineInstances)
-      }
+    if (metrics.containsKey(TaskConstant.ENGINE_INSTANCE)) {
+      taskVO.setEngineInstance(metrics.get(TaskConstant.ENGINE_INSTANCE).toString)
     } else if (TaskStatus.Failed.toString.equals(job.getStatus)) {
       taskVO.setCanRetry(true)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org