You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/04/29 15:51:42 UTC
[incubator-linkis] 05/11: am prints entrance taskID log
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
commit 8e0c0ac9bbf8c61774e46a4900f3c3344b0ad6b7
Author: peacewong <wp...@gmail.com>
AuthorDate: Thu Apr 28 17:35:29 2022 +0800
am prints entrance taskID log
---
.../service/engine/DefaultEngineAskEngineService.scala | 18 ++++++++++--------
.../am/service/engine/DefaultEngineCreateService.scala | 10 ++++++----
2 files changed, 16 insertions(+), 12 deletions(-)
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
index 65e12d5fb..ef0dcbb49 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
@@ -21,6 +21,7 @@ import feign.RetryableException
import org.apache.commons.lang.exception.ExceptionUtils
import org.apache.linkis.common.exception.LinkisRetryException
import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.governance.common.utils.JobUtils
import org.apache.linkis.manager.am.conf.AMConfiguration
import org.apache.linkis.manager.common.constant.AMConstant
import org.apache.linkis.manager.common.protocol.engine._
@@ -55,7 +56,8 @@ class DefaultEngineAskEngineService extends AbstractEngineService with EngineAsk
@Receiver
override def askEngine(engineAskRequest: EngineAskRequest, sender: Sender): Any = {
- logger.info(s"received engineAskRequest $engineAskRequest")
+ val taskId = JobUtils.getJobIdFromStringMap(engineAskRequest.getProperties)
+ logger.info(s"received task: $taskId, engineAskRequest $engineAskRequest")
if(! engineAskRequest.getLabels.containsKey(LabelKeyConstant.EXECUTE_ONCE_KEY)) {
val engineReuseRequest = new EngineReuseRequest()
engineReuseRequest.setLabels(engineAskRequest.getLabels)
@@ -66,21 +68,21 @@ class DefaultEngineAskEngineService extends AbstractEngineService with EngineAsk
t: Throwable =>
t match {
case retryException: LinkisRetryException =>
- logger.warn(s"user ${engineAskRequest.getUser} reuse engine failed ${t.getMessage}")
+ logger.warn(s"task: $taskId user ${engineAskRequest.getUser} reuse engine failed ${t.getMessage}")
case _ =>
- logger.warn(s"user ${engineAskRequest.getUser} reuse engine failed", t)
+ logger.warn(s"task: $taskId user ${engineAskRequest.getUser} reuse engine failed", t)
}
null
}
if (null != reuseNode) {
- logger.info(s"Finished to ask engine for user ${engineAskRequest.getUser} by reuse node $reuseNode")
+ logger.info(s"Finished to ask engine for task: $taskId user ${engineAskRequest.getUser} by reuse node $reuseNode")
return reuseNode
}
}
val engineAskAsyncId = getAsyncId
val createNodeThread = Future {
- logger.info(s"Start to async($engineAskAsyncId) createEngine, ${engineAskRequest.getCreateService}")
+ logger.info(s"Task: $taskId start to async($engineAskAsyncId) createEngine, ${engineAskRequest.getCreateService}")
//如果原来的labels含engineInstance ,先去掉
engineAskRequest.getLabels.remove("engineInstance")
val engineCreateRequest = new EngineCreateRequest
@@ -96,14 +98,14 @@ class DefaultEngineAskEngineService extends AbstractEngineService with EngineAsk
if (null == createEngineNode) {
throw new LinkisRetryException(AMConstant.EM_ERROR_CODE, s"create engine${createNode.getServiceInstance} success, but to use engine failed")
}
- logger.info(s"Finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode")
+ logger.info(s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode")
createEngineNode
}
createNodeThread.onComplete {
case Success(engineNode) =>
- logger.info(s"Success to async($engineAskAsyncId) createEngine $engineNode")
+ logger.info(s"Task: $taskId Success to async($engineAskAsyncId) createEngine $engineNode")
sender.send(EngineCreateSuccess(engineAskAsyncId, engineNode))
case Failure(exception) =>
val retryFlag = exception match {
@@ -118,7 +120,7 @@ class DefaultEngineAskEngineService extends AbstractEngineService with EngineAsk
}
}
}
- logger.info(s"Failed to async($engineAskAsyncId) createEngine, can Retry $retryFlag", exception)
+ logger.info(s"Task: $taskId Failed to async($engineAskAsyncId) createEngine, can Retry $retryFlag", exception)
sender.send(EngineCreateError(engineAskAsyncId, ExceptionUtils.getRootCauseMessage(exception), retryFlag))
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
index 083524e36..dd24367b7 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
@@ -24,6 +24,7 @@ import org.apache.linkis.common.exception.LinkisRetryException
import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.conf.GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME
+import org.apache.linkis.governance.common.utils.JobUtils
import org.apache.linkis.manager.am.conf.{AMConfiguration, EngineConnConfigurationService}
import org.apache.linkis.manager.am.exception.{AMErrorCode, AMErrorException}
import org.apache.linkis.manager.am.label.EngineReuseLabelChooser
@@ -139,7 +140,8 @@ class DefaultEngineCreateService extends AbstractEngineService with EngineCreate
@throws[LinkisRetryException]
override def createEngine(engineCreateRequest: EngineCreateRequest, sender: Sender): EngineNode = {
val startTime = System.currentTimeMillis
- info(s"Start to create Engine for request: $engineCreateRequest.")
+ val taskId = JobUtils.getJobIdFromStringMap(engineCreateRequest.getProperties)
+ logger.info(s"Task: $taskId start to create Engine for request: $engineCreateRequest.")
val labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
val timeout = if (engineCreateRequest.getTimeOut <= 0) AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong else engineCreateRequest.getTimeOut
@@ -207,7 +209,7 @@ class DefaultEngineCreateService extends AbstractEngineService with EngineCreate
throw t
}
- info(s"Finished to create engineConn $engineNode. ticketId is $resourceTicketId")
+ info(s"Task: $taskId finished to create engineConn $engineNode. ticketId is $resourceTicketId")
engineNode.setTicketId(resourceTicketId)
//7. 更新持久化信息:包括插入engine/metrics
@@ -225,9 +227,9 @@ class DefaultEngineCreateService extends AbstractEngineService with EngineCreate
val leftWaitTime = timeout - (System.currentTimeMillis - startTime)
if (ECAvailableRule.getInstance.isNeedAvailable(labelList)) {
ensureECAvailable(engineNode, resourceTicketId, leftWaitTime)
- logger.info(s"Finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode. time taken ${System.currentTimeMillis() - startTime}ms")
+ logger.info(s"Task: $taskId finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode. time taken ${System.currentTimeMillis() - startTime}ms")
} else {
- logger.info(s"Finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode.And did not judge the availability,time taken ${System.currentTimeMillis() - startTime}ms")
+ logger.info(s"Task: $taskId finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode.And did not judge the availability,time taken ${System.currentTimeMillis() - startTime}ms")
}
engineNode
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org