You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/04/29 15:51:42 UTC

[incubator-linkis] 05/11: am prints entrance taskID log

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit 8e0c0ac9bbf8c61774e46a4900f3c3344b0ad6b7
Author: peacewong <wp...@gmail.com>
AuthorDate: Thu Apr 28 17:35:29 2022 +0800

    am prints entrance taskID log
---
 .../service/engine/DefaultEngineAskEngineService.scala | 18 ++++++++++--------
 .../am/service/engine/DefaultEngineCreateService.scala | 10 ++++++----
 2 files changed, 16 insertions(+), 12 deletions(-)

diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
index 65e12d5fb..ef0dcbb49 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
@@ -21,6 +21,7 @@ import feign.RetryableException
 import org.apache.commons.lang.exception.ExceptionUtils
 import org.apache.linkis.common.exception.LinkisRetryException
 import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.governance.common.utils.JobUtils
 import org.apache.linkis.manager.am.conf.AMConfiguration
 import org.apache.linkis.manager.common.constant.AMConstant
 import org.apache.linkis.manager.common.protocol.engine._
@@ -55,7 +56,8 @@ class DefaultEngineAskEngineService extends AbstractEngineService with EngineAsk
 
   @Receiver
   override def askEngine(engineAskRequest: EngineAskRequest, sender: Sender): Any = {
-    logger.info(s"received engineAskRequest $engineAskRequest")
+    val taskId = JobUtils.getJobIdFromStringMap(engineAskRequest.getProperties)
+    logger.info(s"received task: $taskId, engineAskRequest $engineAskRequest")
     if(! engineAskRequest.getLabels.containsKey(LabelKeyConstant.EXECUTE_ONCE_KEY)) {
       val engineReuseRequest = new EngineReuseRequest()
       engineReuseRequest.setLabels(engineAskRequest.getLabels)
@@ -66,21 +68,21 @@ class DefaultEngineAskEngineService extends AbstractEngineService with EngineAsk
         t: Throwable =>
           t match {
             case retryException: LinkisRetryException =>
-              logger.warn(s"user ${engineAskRequest.getUser} reuse engine failed ${t.getMessage}")
+              logger.warn(s"task: $taskId user ${engineAskRequest.getUser} reuse engine failed ${t.getMessage}")
             case _ =>
-              logger.warn(s"user ${engineAskRequest.getUser} reuse engine failed", t)
+              logger.warn(s"task: $taskId user ${engineAskRequest.getUser} reuse engine failed", t)
           }
           null
       }
       if (null != reuseNode) {
-        logger.info(s"Finished to ask engine for user ${engineAskRequest.getUser} by reuse node $reuseNode")
+        logger.info(s"Finished to ask engine for task: $taskId user ${engineAskRequest.getUser} by reuse node $reuseNode")
         return reuseNode
       }
     }
 
     val engineAskAsyncId = getAsyncId
     val createNodeThread = Future {
-      logger.info(s"Start to async($engineAskAsyncId) createEngine, ${engineAskRequest.getCreateService}")
+      logger.info(s"Task: $taskId start to async($engineAskAsyncId) createEngine, ${engineAskRequest.getCreateService}")
       //如果原来的labels含engineInstance ,先去掉
       engineAskRequest.getLabels.remove("engineInstance")
       val engineCreateRequest = new EngineCreateRequest
@@ -96,14 +98,14 @@ class DefaultEngineAskEngineService extends AbstractEngineService with EngineAsk
       if (null == createEngineNode) {
         throw new LinkisRetryException(AMConstant.EM_ERROR_CODE, s"create engine${createNode.getServiceInstance} success, but to use engine failed")
       }
-      logger.info(s"Finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode")
+      logger.info(s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode")
       createEngineNode
     }
 
 
     createNodeThread.onComplete {
       case Success(engineNode) =>
-        logger.info(s"Success to async($engineAskAsyncId) createEngine $engineNode")
+        logger.info(s"Task: $taskId Success to async($engineAskAsyncId) createEngine $engineNode")
         sender.send(EngineCreateSuccess(engineAskAsyncId, engineNode))
       case Failure(exception) =>
         val retryFlag = exception match {
@@ -118,7 +120,7 @@ class DefaultEngineAskEngineService extends AbstractEngineService with EngineAsk
             }
           }
         }
-        logger.info(s"Failed  to async($engineAskAsyncId) createEngine, can Retry $retryFlag", exception)
+        logger.info(s"Task: $taskId Failed  to async($engineAskAsyncId) createEngine, can Retry $retryFlag", exception)
         sender.send(EngineCreateError(engineAskAsyncId, ExceptionUtils.getRootCauseMessage(exception), retryFlag))
     }
 
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
index 083524e36..dd24367b7 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
@@ -24,6 +24,7 @@ import org.apache.linkis.common.exception.LinkisRetryException
 import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
 import org.apache.linkis.governance.common.conf.GovernanceCommonConf
 import org.apache.linkis.governance.common.conf.GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME
+import org.apache.linkis.governance.common.utils.JobUtils
 import org.apache.linkis.manager.am.conf.{AMConfiguration, EngineConnConfigurationService}
 import org.apache.linkis.manager.am.exception.{AMErrorCode, AMErrorException}
 import org.apache.linkis.manager.am.label.EngineReuseLabelChooser
@@ -139,7 +140,8 @@ class DefaultEngineCreateService extends AbstractEngineService with EngineCreate
   @throws[LinkisRetryException]
   override def createEngine(engineCreateRequest: EngineCreateRequest, sender: Sender): EngineNode = {
     val startTime = System.currentTimeMillis
-    info(s"Start to create Engine for request: $engineCreateRequest.")
+    val taskId = JobUtils.getJobIdFromStringMap(engineCreateRequest.getProperties)
+    logger.info(s"Task: $taskId start to create Engine for request: $engineCreateRequest.")
     val labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
     val timeout = if (engineCreateRequest.getTimeOut <= 0) AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong else engineCreateRequest.getTimeOut
 
@@ -207,7 +209,7 @@ class DefaultEngineCreateService extends AbstractEngineService with EngineCreate
         throw t
     }
 
-    info(s"Finished to create  engineConn $engineNode. ticketId is $resourceTicketId")
+    info(s"Task: $taskId finished to create  engineConn $engineNode. ticketId is $resourceTicketId")
     engineNode.setTicketId(resourceTicketId)
     //7. 更新持久化信息:包括插入engine/metrics
 
@@ -225,9 +227,9 @@ class DefaultEngineCreateService extends AbstractEngineService with EngineCreate
     val leftWaitTime = timeout - (System.currentTimeMillis - startTime)
     if (ECAvailableRule.getInstance.isNeedAvailable(labelList)) {
       ensureECAvailable(engineNode, resourceTicketId, leftWaitTime)
-      logger.info(s"Finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode. time taken ${System.currentTimeMillis() - startTime}ms")
+      logger.info(s"Task: $taskId finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode. time taken ${System.currentTimeMillis() - startTime}ms")
     } else {
-      logger.info(s"Finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode.And did not judge the availability,time taken ${System.currentTimeMillis() - startTime}ms")
+      logger.info(s"Task: $taskId finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode.And did not judge the availability,time taken ${System.currentTimeMillis() - startTime}ms")
     }
     engineNode
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org