You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2023/03/07 11:32:19 UTC
[linkis] branch dev-1.3.2 updated: [Feature][Entrance] Task Metrics add multiple time indicators (#4324)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new 7d2688c08 [Feature][Entrance] Task Metrics add multiple time indicators (#4324)
7d2688c08 is described below
commit 7d2688c08b1d84fccc87d8eeece93bee90f0ebe9
Author: peacewong <wp...@gmail.com>
AuthorDate: Tue Mar 7 19:32:13 2023 +0800
[Feature][Entrance] Task Metrics add multiple time indicators (#4324)
* add the job's metrics including running time, time to request EC, and time to submit the task to EC
* add ec time log
* Fix unit test
* Fix ec time metrics to metrics map
* Fix time print dateformat
* Discard blank logs
* [Feature][Entrance] Task Metrics add multiple time indicators #4323
---
.../linkis/protocol/constants/TaskConstant.java | 25 +++---
.../linkis/entrance/job/EntranceExecutionJob.java | 88 +++++++++++++++-------
.../entrance/restful/EntranceRestfulApi.java | 21 +++---
.../entrance/cli/heartbeat/KillHandler.scala | 4 +-
.../entrance/execute/DefaultEntranceExecutor.scala | 4 +-
.../linkis/entrance/execute/EntranceJob.scala | 10 ++-
.../linkis/entrance/log/HDFSCacheLogWriter.scala | 6 +-
.../entrance/parser/CommonEntranceParser.scala | 7 +-
.../linkis/entrance/utils/JobHistoryHelper.scala | 31 ++++++--
.../physical/CodeLogicalUnitExecTask.scala | 15 +++-
.../jobhistory/conversions/TaskConversions.scala | 29 ++-----
11 files changed, 143 insertions(+), 97 deletions(-)
diff --git a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java
index 8f5a68008..42661f828 100644
--- a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java
+++ b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java
@@ -50,18 +50,21 @@ public interface TaskConstant {
String PARAMS_CONFIGURATION_RUNTIME = "runtime";
String PARAMS_CONFIGURATION_SPECIAL = "special";
- String ENTRANCEJOB_SUBMIT_TIME = "submitTime";
- String ENTRANCEJOB_SCHEDULE_TIME = "scheduleTime";
- String ENTRANCEJOB_TO_ORCHESTRATOR = "timeToOrchestrator";
- String ENTRANCEJOB_COMPLETE_TIME = "completeTime";
- String ENTRANCEJOB_YARN_METRICS = "yarnMetrics";
- String ENTRANCEJOB_YARNRESOURCE = "yarnResource";
- String ENTRANCEJOB_CORE_PERCENT = "corePercent";
- String ENTRANCEJOB_MEMORY_PERCENT = "memoryPercent";
- String ENTRANCEJOB_CORE_RGB = "coreRGB";
- String ENTRANCEJOB_MEMORY_RGB = "memoryRGB";
+ String JOB_SUBMIT_TIME = "submitTime";
+ String JOB_SCHEDULE_TIME = "scheduleTime";
+ String JOB_RUNNING_TIME = "runningTime";
+ String JOB_TO_ORCHESTRATOR = "jobToOrchestrator";
+ String JOB_REQUEST_EC_TIME = "requestECTime";
+ String JOB_SUBMIT_TO_EC_TIME = "jobToECTIme";
+ String JOB_COMPLETE_TIME = "completeTime";
+ String JOB_YARN_METRICS = "yarnMetrics";
+ String JOB_YARNRESOURCE = "yarnResource";
+ String JOB_CORE_PERCENT = "corePercent";
+ String JOB_MEMORY_PERCENT = "memoryPercent";
+ String JOB_CORE_RGB = "coreRGB";
+ String JOB_MEMORY_RGB = "memoryRGB";
- String ENTRANCEJOB_ENGINECONN_MAP = "engineconnMap";
+ String JOB_ENGINECONN_MAP = "engineconnMap";
String ENGINE_INSTANCE = "engineInstance";
String TICKET_ID = "ticketId";
String ENGINE_CONN_TASK_ID = "engineConnTaskId";
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java
index ad9c698a4..d9b33820f 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java
@@ -54,12 +54,17 @@ import org.slf4j.LoggerFactory;
public class EntranceExecutionJob extends EntranceJob implements LogHandler {
+ private static final Logger logger = LoggerFactory.getLogger(EntranceExecutionJob.class);
+
+ private static final ThreadLocal<SimpleDateFormat> dateFormatLocal =
+ ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
+
private LogReader logReader;
private LogWriter logWriter;
private Object logWriterLocker = new Object();
private WebSocketCacheLogReader webSocketCacheLogReader;
private WebSocketLogWriter webSocketLogWriter;
- private static final Logger logger = LoggerFactory.getLogger(EntranceExecutionJob.class);
+
private PersistenceManager persistenceManager;
public EntranceExecutionJob(PersistenceManager persistenceManager) {
@@ -167,68 +172,93 @@ public class EntranceExecutionJob extends EntranceJob implements LogHandler {
}
@Override
- public JobInfo getJobInfo() { // TODO You can put this method on LockJob(可以将该方法放到LockJob上去)
+ public JobInfo getJobInfo() {
String execID = getId();
String state = this.getState().toString();
float progress = this.getProgress();
- SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ SimpleDateFormat simpleDateFormat = dateFormatLocal.get();
+
if (getJobRequest().getMetrics() == null) {
getJobRequest().setMetrics(new HashMap<>());
}
+
Map<String, Object> metricsMap = getJobRequest().getMetrics();
String createTime =
- metricsMap.containsKey(TaskConstant.ENTRANCEJOB_SUBMIT_TIME)
- ? simpleDateFormat.format(metricsMap.get(TaskConstant.ENTRANCEJOB_SUBMIT_TIME))
+ metricsMap.containsKey(TaskConstant.JOB_SUBMIT_TIME)
+ ? simpleDateFormat.format(metricsMap.get(TaskConstant.JOB_SUBMIT_TIME))
: "not created";
String scheduleTime =
- metricsMap.containsKey(TaskConstant.ENTRANCEJOB_SCHEDULE_TIME)
- ? simpleDateFormat.format(metricsMap.get(TaskConstant.ENTRANCEJOB_SCHEDULE_TIME))
+ metricsMap.containsKey(TaskConstant.JOB_SCHEDULE_TIME)
+ ? simpleDateFormat.format(metricsMap.get(TaskConstant.JOB_SCHEDULE_TIME))
: "not scheduled";
String startTime =
- metricsMap.containsKey(TaskConstant.ENTRANCEJOB_TO_ORCHESTRATOR)
- ? simpleDateFormat.format(metricsMap.get(TaskConstant.ENTRANCEJOB_TO_ORCHESTRATOR))
+ metricsMap.containsKey(TaskConstant.JOB_RUNNING_TIME)
+ ? simpleDateFormat.format(metricsMap.get(TaskConstant.JOB_RUNNING_TIME))
: "not submitted to orchestrator";
String endTime =
- metricsMap.containsKey(TaskConstant.ENTRANCEJOB_COMPLETE_TIME)
- ? simpleDateFormat.format(metricsMap.get(TaskConstant.ENTRANCEJOB_COMPLETE_TIME))
+ metricsMap.containsKey(TaskConstant.JOB_COMPLETE_TIME)
+ ? simpleDateFormat.format(metricsMap.get(TaskConstant.JOB_COMPLETE_TIME))
: "on running or not started";
String runTime;
- if (metricsMap.containsKey(TaskConstant.ENTRANCEJOB_COMPLETE_TIME)) {
+ if (metricsMap.containsKey(TaskConstant.JOB_COMPLETE_TIME)) {
runTime =
ByteTimeUtils.msDurationToString(
- (((Date) metricsMap.get(TaskConstant.ENTRANCEJOB_COMPLETE_TIME))).getTime()
- - (((Date) metricsMap.get(TaskConstant.ENTRANCEJOB_SUBMIT_TIME))).getTime());
+ (((Date) metricsMap.get(TaskConstant.JOB_COMPLETE_TIME))).getTime()
+ - (((Date) metricsMap.get(TaskConstant.JOB_SUBMIT_TIME))).getTime());
} else {
runTime =
"The task did not end normally and the usage time could not be counted.(任务并未正常结束,无法统计使用时间)";
}
- String metric =
- "Task creation time(任务创建时间): "
- + createTime
- + ", Task scheduling time(任务调度时间): "
- + scheduleTime
- + ", Task start time(任务开始时间): "
- + startTime
- + ", Mission end time(任务结束时间): "
- + endTime
- + "\n\n\n"
- + LogUtils.generateInfo(
+
+ String jobToOrchestrator =
+ metricsMap.containsKey(TaskConstant.JOB_TO_ORCHESTRATOR)
+ ? simpleDateFormat.format(metricsMap.get(TaskConstant.JOB_TO_ORCHESTRATOR))
+ : "not to orchestrator";
+ String jobRequestEC =
+ metricsMap.containsKey(TaskConstant.JOB_REQUEST_EC_TIME)
+ ? simpleDateFormat.format(metricsMap.get(TaskConstant.JOB_REQUEST_EC_TIME))
+ : "not request ec";
+ String jobSubmitToEC =
+ metricsMap.containsKey(TaskConstant.JOB_SUBMIT_TO_EC_TIME)
+ ? simpleDateFormat.format(metricsMap.get(TaskConstant.JOB_SUBMIT_TO_EC_TIME))
+ : "not submit to ec";
+
+ StringBuffer sb = new StringBuffer();
+ sb.append("Task creation time(任务创建时间): ")
+ .append(createTime)
+ .append(", Task scheduling time(任务调度时间): ")
+ .append(scheduleTime)
+ .append(", Task start time(任务开始时间): ")
+ .append(startTime)
+ .append(", Mission end time(任务结束时间): ")
+ .append(endTime)
+ .append("\n")
+ .append(LogUtils.generateInfo(""))
+ .append("Task submit to Orchestrator time:")
+ .append(jobToOrchestrator)
+ .append(", Task request EngineConn time:")
+ .append(jobRequestEC)
+ .append(", Task submit to EngineConn time:")
+ .append(jobSubmitToEC)
+ .append("\n")
+ .append(
+ LogUtils.generateInfo(
"Your mission(您的任务) "
+ this.getJobRequest().getId()
+ " The total time spent is(总耗时时间为): "
- + runTime);
+ + runTime));
+
+ String metric = sb.toString();
+
return new JobInfo(execID, null, state, progress, metric);
}
@Override
public void close() throws IOException {
logger.info("job:" + jobRequest().getId() + " is closing");
-
try {
- // todo Do a lot of aftercare work when close(close时候要做很多的善后工作)
if (this.getLogWriter().isDefined()) {
IOUtils.closeQuietly(this.getLogWriter().get());
- // this.setLogWriter(null);
} else {
logger.info("job:" + jobRequest().getId() + "LogWriter is null");
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
index c94bdd79a..a5d3ace35 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
@@ -326,17 +326,16 @@ public class EntranceRestfulApi implements EntranceRestfulRemote {
JobRequest jobRequest = ((EntranceJob) job.get()).getJobRequest();
Map<String, Object> metrics = jobRequest.getMetrics();
Map<String, Object> metricsVo = new HashMap<>();
- if (metrics.containsKey(TaskConstant.ENTRANCEJOB_YARNRESOURCE)) {
+ if (metrics.containsKey(TaskConstant.JOB_YARNRESOURCE)) {
HashMap<String, ResourceWithStatus> resourceMap =
- (HashMap<String, ResourceWithStatus>)
- metrics.get(TaskConstant.ENTRANCEJOB_YARNRESOURCE);
+ (HashMap<String, ResourceWithStatus>) metrics.get(TaskConstant.JOB_YARNRESOURCE);
ArrayList<YarnResourceWithStatusVo> resoureList = new ArrayList<>(12);
if (null != resourceMap && !resourceMap.isEmpty()) {
resourceMap.forEach(
(applicationId, resource) -> {
resoureList.add(new YarnResourceWithStatusVo(applicationId, resource));
});
- metricsVo.put(TaskConstant.ENTRANCEJOB_YARNRESOURCE, resoureList);
+ metricsVo.put(TaskConstant.JOB_YARNRESOURCE, resoureList);
Optional<Integer> cores =
resourceMap.values().stream()
.map(resource -> resource.queueCores())
@@ -360,17 +359,17 @@ public class EntranceRestfulApi implements EntranceRestfulRemote {
}
String coreRGB = RGBUtils.getRGB(corePercent);
String memoryRGB = RGBUtils.getRGB(memoryPercent);
- metricsVo.put(TaskConstant.ENTRANCEJOB_CORE_PERCENT, corePercent);
- metricsVo.put(TaskConstant.ENTRANCEJOB_MEMORY_PERCENT, memoryPercent);
- metricsVo.put(TaskConstant.ENTRANCEJOB_CORE_RGB, coreRGB);
- metricsVo.put(TaskConstant.ENTRANCEJOB_MEMORY_RGB, memoryRGB);
+ metricsVo.put(TaskConstant.JOB_CORE_PERCENT, corePercent);
+ metricsVo.put(TaskConstant.JOB_MEMORY_PERCENT, memoryPercent);
+ metricsVo.put(TaskConstant.JOB_CORE_RGB, coreRGB);
+ metricsVo.put(TaskConstant.JOB_MEMORY_RGB, memoryRGB);
- message.data(TaskConstant.ENTRANCEJOB_YARN_METRICS, metricsVo);
+ message.data(TaskConstant.JOB_YARN_METRICS, metricsVo);
} else {
- message.data(TaskConstant.ENTRANCEJOB_YARNRESOURCE, null);
+ message.data(TaskConstant.JOB_YARNRESOURCE, null);
}
} else {
- message.data(TaskConstant.ENTRANCEJOB_YARNRESOURCE, null);
+ message.data(TaskConstant.JOB_YARNRESOURCE, null);
}
message
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/KillHandler.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/KillHandler.scala
index 9add1fbe7..88453577c 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/KillHandler.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cli/heartbeat/KillHandler.scala
@@ -25,11 +25,11 @@ class KillHandler extends HeartbeatLossHandler with Logging {
override def handle(jobs: List[EntranceJob]): Unit = {
for (job <- jobs) {
if (job != null) {
- logger.info("Killing job: " + job.getJobInfo.getId)
+ logger.info("Killing job: " + job.getId)
Utils.tryCatch(
job.onFailure("Job is killed because of client-server connection lost", null)
) { t =>
- logger.error("failed to kill job: " + job.getJobInfo.getId, t)
+ logger.error("failed to kill job: " + job.getId, t)
}
}
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index e601b3801..c509f1005 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -285,11 +285,11 @@ class DefaultEntranceExecutor(id: Long)
} else {
if (
!entranceExecuteRequest.getJob.getJobRequest.getMetrics.containsKey(
- TaskConstant.ENTRANCEJOB_TO_ORCHESTRATOR
+ TaskConstant.JOB_TO_ORCHESTRATOR
)
) {
entranceExecuteRequest.getJob.getJobRequest.getMetrics
- .put(TaskConstant.ENTRANCEJOB_TO_ORCHESTRATOR, new Date(System.currentTimeMillis()))
+ .put(TaskConstant.JOB_TO_ORCHESTRATOR, new Date(System.currentTimeMillis()))
}
}
// 2. deal log And Response
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
index 5246ead4f..b762f5460 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
@@ -143,7 +143,7 @@ abstract class EntranceJob extends Job {
_.onLogUpdate(this, LogUtils.generateWarn("Job Metrics has not been initialized."))
)
} else {
- if (getJobRequest.getMetrics.containsKey(TaskConstant.ENTRANCEJOB_SCHEDULE_TIME)) {
+ if (getJobRequest.getMetrics.containsKey(TaskConstant.JOB_SCHEDULE_TIME)) {
getLogListener.foreach(
_.onLogUpdate(
this,
@@ -152,7 +152,7 @@ abstract class EntranceJob extends Job {
)
} else {
getJobRequest.getMetrics.put(
- TaskConstant.ENTRANCEJOB_SCHEDULE_TIME,
+ TaskConstant.JOB_SCHEDULE_TIME,
new Date(System.currentTimeMillis)
)
}
@@ -177,9 +177,13 @@ abstract class EntranceJob extends Job {
LogUtils.generateInfo("Your job is Running now. Please wait it to complete.")
)
)
+ getJobRequest.getMetrics.put(
+ TaskConstant.JOB_RUNNING_TIME,
+ new Date(System.currentTimeMillis)
+ )
case _ if SchedulerEventState.isCompleted(toState) =>
getJobRequest.getMetrics.put(
- TaskConstant.ENTRANCEJOB_COMPLETE_TIME,
+ TaskConstant.JOB_COMPLETE_TIME,
new Date(System.currentTimeMillis())
)
if (getJobInfo != null) {
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala
index 8aa02f949..24633dfbb 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/HDFSCacheLogWriter.scala
@@ -123,12 +123,10 @@ class HDFSCacheLogWriter(logPath: String, charset: String, sharedCache: Cache, u
}
override def write(msg: String): Unit = {
- if (StringUtils.isBlank(msg)) {
- cache("")
- } else {
+ if (StringUtils.isNotBlank(msg)) {
val rows = msg.split("\n")
rows.foreach(row => {
- if (row == null) cache("") else cache(row)
+ if (StringUtils.isNotBlank(row)) cache(row)
})
}
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
index 5108a7bf4..60164ca58 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala
@@ -123,10 +123,7 @@ class CommonEntranceParser(val persistenceManager: PersistenceManager)
jobRequest.setStatus(SchedulerEventState.Inited.toString)
// Entry indicator: task submission time
jobRequest.setMetrics(new util.HashMap[String, AnyRef]())
- jobRequest.getMetrics.put(
- TaskConstant.ENTRANCEJOB_SUBMIT_TIME,
- new Date(System.currentTimeMillis)
- )
+ jobRequest.getMetrics.put(TaskConstant.JOB_SUBMIT_TIME, new Date(System.currentTimeMillis))
jobRequest.setParams(configMap)
jobRequest
}
@@ -272,7 +269,7 @@ class CommonEntranceParser(val persistenceManager: PersistenceManager)
// Package labels
jobReq.setLabels(labelList)
jobReq.setMetrics(new util.HashMap[String, AnyRef]())
- jobReq.getMetrics.put(TaskConstant.ENTRANCEJOB_SUBMIT_TIME, new Date(System.currentTimeMillis))
+ jobReq.getMetrics.put(TaskConstant.JOB_SUBMIT_TIME, new Date(System.currentTimeMillis))
jobReq
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
index cfe14672c..ec2912888 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
@@ -165,31 +165,31 @@ object JobHistoryHelper extends Logging {
def updateJobRequestMetrics(
jobRequest: JobRequest,
resourceInfo: util.Map[String, ResourceWithStatus],
- ecInfo: util.Map[String, AnyRef]
+ infoMap: util.Map[String, AnyRef]
): Unit = {
// update resource
if (jobRequest.getMetrics == null) {
jobRequest.setMetrics(new util.HashMap[String, AnyRef]())
}
val metricsMap = jobRequest.getMetrics
- val resourceMap = metricsMap.get(TaskConstant.ENTRANCEJOB_YARNRESOURCE)
+ val resourceMap = metricsMap.get(TaskConstant.JOB_YARNRESOURCE)
val ecResourceMap =
if (resourceInfo == null) new util.HashMap[String, ResourceWithStatus] else resourceInfo
if (resourceMap != null) {
resourceMap.asInstanceOf[util.HashMap[String, ResourceWithStatus]].putAll(ecResourceMap)
} else {
- metricsMap.put(TaskConstant.ENTRANCEJOB_YARNRESOURCE, ecResourceMap)
+ metricsMap.put(TaskConstant.JOB_YARNRESOURCE, ecResourceMap)
}
var engineInstanceMap: util.HashMap[String, AnyRef] = null
- if (metricsMap.containsKey(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)) {
+ if (metricsMap.containsKey(TaskConstant.JOB_ENGINECONN_MAP)) {
engineInstanceMap = metricsMap
- .get(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)
+ .get(TaskConstant.JOB_ENGINECONN_MAP)
.asInstanceOf[util.HashMap[String, AnyRef]]
} else {
engineInstanceMap = new util.HashMap[String, AnyRef]()
- metricsMap.put(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP, engineInstanceMap)
+ metricsMap.put(TaskConstant.JOB_ENGINECONN_MAP, engineInstanceMap)
}
- val infoMap = ecInfo
+
if (null != infoMap && infoMap.containsKey(TaskConstant.TICKET_ID)) {
val ticketId = infoMap.get(TaskConstant.TICKET_ID).asInstanceOf[String]
val engineExtraInfoMap = engineInstanceMap
@@ -200,6 +200,23 @@ object JobHistoryHelper extends Logging {
} else {
logger.warn("Ec info map must contains ticketID")
}
+
+ if (null != infoMap && infoMap.containsKey(TaskConstant.JOB_REQUEST_EC_TIME)) {
+ metricsMap.put(
+ TaskConstant.JOB_REQUEST_EC_TIME,
+ infoMap.get(TaskConstant.JOB_REQUEST_EC_TIME)
+ )
+ }
+
+ if (null != infoMap && infoMap.containsKey(TaskConstant.JOB_SUBMIT_TO_EC_TIME)) {
+ metricsMap.put(
+ TaskConstant.JOB_SUBMIT_TO_EC_TIME,
+ infoMap.get(TaskConstant.JOB_SUBMIT_TO_EC_TIME)
+ )
+ }
+ if (null != infoMap && infoMap.containsKey(TaskConstant.ENGINE_INSTANCE)) {
+ metricsMap.put(TaskConstant.ENGINE_INSTANCE, infoMap.get(TaskConstant.ENGINE_INSTANCE))
+ }
}
}
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
index bd665cd34..82e75c0bd 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
@@ -29,7 +29,6 @@ import org.apache.linkis.orchestrator.computation.execute.{
CodeExecTaskExecutorManager
}
import org.apache.linkis.orchestrator.ecm.conf.ECMPluginConf
-import org.apache.linkis.orchestrator.ecm.service.impl.ComputationEngineConnExecutor
import org.apache.linkis.orchestrator.exception.{
OrchestratorErrorCodeSummary,
OrchestratorErrorException,
@@ -51,6 +50,7 @@ import org.apache.linkis.scheduler.executer.{ErrorExecuteResponse, SubmitRespons
import org.apache.commons.lang3.StringUtils
import java.util
+import java.util.Date
import java.util.concurrent.TimeUnit
import scala.collection.convert.decorateAsScala._
@@ -85,6 +85,10 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask], children: Array[ExecTask
logger.info(s"Start to execute CodeLogicalUnitExecTask(${getIDInfo()}).")
var executor: Option[CodeExecTaskExecutor] = None
var retryException: LinkisRetryException = null
+ if (!getPhysicalContext.exists(TaskConstant.JOB_REQUEST_EC_TIME)) {
+ // Here you should use the time when you first applied for EC
+ getPhysicalContext.set(TaskConstant.JOB_REQUEST_EC_TIME, new Date(System.currentTimeMillis))
+ }
executor = Utils.tryCatch(codeExecTaskExecutorManager.askExecutor(this)) {
case retry: LinkisRetryException =>
retryException = retry
@@ -132,7 +136,14 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask], children: Array[ExecTask
}
)
infoMap.put(TaskConstant.ENGINE_CONN_TASK_ID, engineConnExecId)
- infoMap.put(TaskConstant.ENGINE_CONN_SUBMIT_TIME, System.currentTimeMillis.toString)
+ infoMap.put(TaskConstant.JOB_SUBMIT_TO_EC_TIME, new Date(System.currentTimeMillis))
+ if (getPhysicalContext.exists(TaskConstant.JOB_REQUEST_EC_TIME)) {
+ infoMap.put(
+ TaskConstant.JOB_REQUEST_EC_TIME,
+ getPhysicalContext.get(TaskConstant.JOB_REQUEST_EC_TIME).asInstanceOf[Object]
+ )
+ }
+
val event = TaskRunningInfoEvent(
this,
0f,
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala
index 3ebd8ff41..15e6953ae 100644
--- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala
@@ -262,17 +262,17 @@ object TaskConversions extends Logging {
BDPJettyServerHelper.gson.fromJson((job.getMetrics), classOf[util.Map[String, Object]])
var completeTime: Date = null
if (
- null != metrics && metrics.containsKey(TaskConstant.ENTRANCEJOB_COMPLETE_TIME) && metrics
- .get(TaskConstant.ENTRANCEJOB_COMPLETE_TIME) != null
+ null != metrics && metrics.containsKey(TaskConstant.JOB_COMPLETE_TIME) && metrics
+ .get(TaskConstant.JOB_COMPLETE_TIME) != null
) {
- completeTime = dealString2Date(metrics.get(TaskConstant.ENTRANCEJOB_COMPLETE_TIME).toString)
+ completeTime = dealString2Date(metrics.get(TaskConstant.JOB_COMPLETE_TIME).toString)
}
var createTime: Date = null
if (
- null != metrics && metrics.containsKey(TaskConstant.ENTRANCEJOB_SUBMIT_TIME) && metrics
- .get(TaskConstant.ENTRANCEJOB_SUBMIT_TIME) != null
+ null != metrics && metrics.containsKey(TaskConstant.JOB_SUBMIT_TIME) && metrics
+ .get(TaskConstant.JOB_SUBMIT_TIME) != null
) {
- createTime = dealString2Date(metrics.get(TaskConstant.ENTRANCEJOB_SUBMIT_TIME).toString)
+ createTime = dealString2Date(metrics.get(TaskConstant.JOB_SUBMIT_TIME).toString)
}
if (null != createTime) {
if (isJobFinished(job.getStatus)) {
@@ -287,21 +287,8 @@ object TaskConversions extends Logging {
taskVO.setCostTime(System.currentTimeMillis() - createTime.getTime)
}
}
- if (metrics.containsKey(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)) {
- val engineMap = metrics
- .get(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)
- .asInstanceOf[util.Map[String, Object]]
- if (null != engineMap && !engineMap.isEmpty) {
- // the engineInstance in metrics may be repeat, so it needs to be distinct
- val engineInstances =
- engineMap.asScala
- .map(_._2.asInstanceOf[util.Map[String, Object]])
- .map(_.get(TaskConstant.ENGINE_INSTANCE))
- .toList
- .distinct
- .mkString(",")
- taskVO.setEngineInstance(engineInstances)
- }
+ if (metrics.containsKey(TaskConstant.ENGINE_INSTANCE)) {
+ taskVO.setEngineInstance(metrics.get(TaskConstant.ENGINE_INSTANCE).toString)
} else if (TaskStatus.Failed.toString.equals(job.getStatus)) {
taskVO.setCanRetry(true)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org