You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/04/29 15:51:37 UTC

[incubator-linkis] branch dev-1.1.2 updated (9e177f2a4 -> e1ec0127d)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a change to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


    from 9e177f2a4 Fix: ECM page 'instance' display mistake (#2061)
     new 682f0fc18 Add taskID to job idinfo
     new c669bcd5a ecm prints entrance taskID log
     new 8dc378439 add getJobIdFromStringMap method
     new a0e96bed6 prints entrance taskID log
     new 8e0c0ac9b am prints entrance taskID log
     new deced99af static init method
     new 7af91ca6c optimize log
     new 2b8503f63 optimize code add locker
     new b6fcd5c4e add junit test
     new 61b7acc28 add junit test
     new e1ec0127d add comment

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../linkis/governance/common/utils/JobUtils.scala  | 11 ++++++-
 .../governance/common/utils/JobUtilsTest.scala}    | 30 +++++++++++-------
 .../impl/AbstractEngineConnLaunchService.scala     | 11 ++++---
 .../apache/linkis/entrance/EntranceServer.scala    |  2 +-
 .../label/utils/EngineTypeLabelCreator.java        |  4 +++
 .../engine/DefaultEngineAskEngineService.scala     | 18 ++++++-----
 .../engine/DefaultEngineCreateService.scala        | 10 +++---
 .../service/ComputationTaskExecutionReceiver.scala |  2 +-
 .../orchestrator/converter/AbstractConverter.scala |  2 +-
 .../orchestrator/listener/task/TaskInfoEvent.scala |  2 +-
 .../apache/linkis/orchestrator/plans/ast/Job.scala | 36 ++++++++++++++++++++--
 .../orchestrator/plans/physical/ExecTask.scala     |  4 +--
 12 files changed, 95 insertions(+), 37 deletions(-)
 copy linkis-computation-governance/linkis-computation-governance-common/src/{main/scala/org/apache/linkis/governance/common/utils/JobUtils.scala => test/scala/org/apache/linkis/governance/common/utils/JobUtilsTest.scala} (54%)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org


[incubator-linkis] 06/11: static init method

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit deced99af6b8ee427bf01c76fbbe0bdf32e5d590
Author: peacewong <wp...@gmail.com>
AuthorDate: Thu Apr 28 17:37:51 2022 +0800

    static init method
---
 .../org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java b/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
index 566e2bbd9..6c1783190 100644
--- a/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
+++ b/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
@@ -35,6 +35,10 @@ public class EngineTypeLabelCreator {
 
     private static Map<String, String> defaultVersion = null;
 
+    static {
+        init();
+    }
+
     private static void init() {
         if (null == defaultVersion) {
             synchronized (EngineTypeLabelCreator.class) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org


[incubator-linkis] 02/11: ecm prints entrance taskID log

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit c669bcd5af8db0e5642fa0c4ad10fef6def23aa8
Author: peacewong <wp...@gmail.com>
AuthorDate: Thu Apr 28 17:33:18 2022 +0800

    ecm prints entrance taskID log
---
 .../server/service/impl/AbstractEngineConnLaunchService.scala | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
index 9dcef7ad6..4ec3b6a59 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/AbstractEngineConnLaunchService.scala
@@ -18,7 +18,6 @@
 package org.apache.linkis.ecm.server.service.impl
 
 import java.util.concurrent.TimeUnit
-
 import org.apache.linkis.common.ServiceInstance
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.ecm.core.engineconn.{EngineConn, EngineConnInfo}
@@ -38,6 +37,7 @@ import org.apache.linkis.manager.common.protocol.engine.EngineConnStatusCallback
 import org.apache.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest
 import org.apache.linkis.rpc.Sender
 import org.apache.commons.lang.exception.ExceptionUtils
+import org.apache.linkis.governance.common.utils.JobUtils
 
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, ExecutionContextExecutorService, Future}
@@ -64,7 +64,8 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
 
   override def launchEngineConn(request: EngineConnLaunchRequest, duration: Long): EngineNode = {
     //1.创建engineConn和runner,launch 并设置基础属性
-    info(s"Try to launch a new EngineConn with $request.")
+    val taskId = JobUtils.getJobIdFromStringMap(request.creationDesc.properties)
+    logger.info(s"TaskId: $taskId try to launch a new EngineConn with $request.")
     val conn = createEngineConn
     val runner = createEngineConnLaunchRunner
     val launch = createEngineConnLaunch
@@ -102,13 +103,13 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
 
       future onComplete {
         case Failure(t) =>
-          logger.error(s"init ${conn.getServiceInstance} failed.${conn.getEngineConnLaunchRunner.getEngineConnLaunch.getEngineConnManagerEnv().engineConnWorkDir}")
+          logger.error(s"TaskId: $taskId init ${conn.getServiceInstance} failed.${conn.getEngineConnLaunchRunner.getEngineConnLaunch.getEngineConnManagerEnv().engineConnWorkDir}")
           LinkisECMApplication.getContext.getECMSyncListenerBus.postToAll(EngineConnStatusChangeEvent(conn.getTickedId, Failed))
         case Success(_) =>
-          logger.info(s"init ${conn.getServiceInstance} succeed.${conn.getEngineConnLaunchRunner.getEngineConnLaunch.getEngineConnManagerEnv().engineConnWorkDir}")
+          logger.info(s"TaskId: $taskId init ${conn.getServiceInstance} succeed.${conn.getEngineConnLaunchRunner.getEngineConnLaunch.getEngineConnManagerEnv().engineConnWorkDir}")
       }
     } { t =>
-      error(s"init ${conn.getServiceInstance} failed, ${conn.getEngineConnLaunchRunner.getEngineConnLaunch.getEngineConnManagerEnv().engineConnWorkDir}, now stop and delete it. message: ${t.getMessage}", t)
+      error(s"TaskId: $taskId init ${conn.getServiceInstance} failed, ${conn.getEngineConnLaunchRunner.getEngineConnLaunch.getEngineConnManagerEnv().engineConnWorkDir}, now stop and delete it. message: ${t.getMessage}", t)
       conn.getEngineConnLaunchRunner.stop()
       Sender.getSender(MANAGER_SPRING_NAME).send(EngineConnStatusCallbackToAM(conn.getServiceInstance,
         NodeStatus.ShuttingDown, " wait init failed , reason " + ExceptionUtils.getRootCauseMessage(t)))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org


[incubator-linkis] 09/11: add junit test

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit b6fcd5c4ef431845e04d85a366f7af954bc723a6
Author: peacewong <wp...@gmail.com>
AuthorDate: Fri Apr 29 14:45:23 2022 +0800

    add junit test
---
 .../governance/common/utils/JobUtilsTest.scala     | 45 ++++++++++++++++++++++
 1 file changed, 45 insertions(+)

diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/test/scala/org/apache/linkis/governance/common/utils/JobUtilsTest.scala b/linkis-computation-governance/linkis-computation-governance-common/src/test/scala/org/apache/linkis/governance/common/utils/JobUtilsTest.scala
new file mode 100644
index 000000000..bfeb038ee
--- /dev/null
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/test/scala/org/apache/linkis/governance/common/utils/JobUtilsTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.governance.common.utils
+
+import org.apache.linkis.governance.common.constant.job.JobRequestConstants
+import org.junit.jupiter.api.{Assertions, DisplayName, Test}
+
+import java.util
+
+class JobUtilsTest {
+
+  @Test
+  @DisplayName("testGetJobIdFromMap")
+  def testGetJobIdFromMap(): Unit = {
+    val map: util.Map[String, Object] = new util.HashMap[String, Object]()
+    Assertions.assertNull(JobUtils.getJobIdFromMap(map))
+    map.put(JobRequestConstants.JOB_ID, 100L)
+    Assertions.assertNotNull(JobUtils.getJobIdFromMap(map))
+  }
+
+  @Test
+  @DisplayName("getJobIdFromStringMap")
+  def testGetJobIdFromStringMap(): Unit = {
+    val map: util.Map[String, String] = new util.HashMap[String, String]()
+    Assertions.assertNull(JobUtils.getJobIdFromStringMap(map))
+    map.put(JobRequestConstants.JOB_ID, "100")
+    Assertions.assertNotNull(JobUtils.getJobIdFromStringMap(map))
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org


[incubator-linkis] 07/11: optimize log

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit 7af91ca6c5344dc427e0137a4298d7d28ae5570d
Author: peacewong <wp...@gmail.com>
AuthorDate: Thu Apr 28 19:58:59 2022 +0800

    optimize log
---
 .../computation/service/ComputationTaskExecutionReceiver.scala          | 2 +-
 .../org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala    | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
index 5c74f3855..0f342ecb8 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
@@ -162,7 +162,7 @@ class ComputationTaskExecutionReceiver extends TaskExecutionReceiver with Loggin
     val serviceInstance = RPCUtils.getServiceInstanceFromSender(sender)
     codeExecTaskExecutorManager.getByEngineConnAndTaskId(serviceInstance, responseTaskError.execId).foreach { codeExecutor =>
       val event = TaskErrorResponseEvent(codeExecutor.getExecTask, responseTaskError.errorMsg)
-      logger.info(s"From engineConn receive responseTaskError  info$responseTaskError, now post to listenerBus event: $event")
+      logger.info(s"From engineConn receive responseTaskError  info${responseTaskError.execId}, now post to listenerBus event: ${event.execTask.getIDInfo()}")
       codeExecutor.getExecTask.getPhysicalContext.broadcastSyncEvent(event)
       codeExecutor.getEngineConnExecutor.updateLastUpdateTime()
       isExist = true
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
index 661750fc0..d53ceed55 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
@@ -55,7 +55,7 @@ case class TaskResultSetSizeEvent(execTask: ExecTask, resultSize: Int) extends T
 
 case class TaskErrorResponseEvent(execTask: ExecTask, errorMsg: String) extends TaskInfoEvent with OrchestratorSyncEvent {
   override def toString: String = {
-    s"task ${execTask.getIDInfo()}, errorMsg ${errorMsg}"
+    s"task ${execTask.getIDInfo()}"
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org


[incubator-linkis] 11/11: add comment

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit e1ec0127d4ca92aa21dbd6f65f948cda1f1ec50b
Author: peacewong <wp...@gmail.com>
AuthorDate: Fri Apr 29 22:32:44 2022 +0800

    add comment
---
 .../main/scala/org/apache/linkis/orchestrator/plans/ast/Job.scala   | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/ast/Job.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/ast/Job.scala
index cd0ddbb6f..f1b2d3cb1 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/ast/Job.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/ast/Job.scala
@@ -56,6 +56,12 @@ trait Job extends ASTOrchestration[Job] {
     id
   }
 
+  /**
+   * JonIDINFO generation method:
+   * 1. If taskID exists in startUp, use taskID as prefix
+   * 2. If the taskID does not exist or is empty, use the job id directly
+   * @return
+   */
   def getIDInfo(): String = {
     if (null == idInfo) idInfoLock synchronized {
       if (null == idInfo) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org


[incubator-linkis] 05/11: am prints entrance taskID log

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit 8e0c0ac9bbf8c61774e46a4900f3c3344b0ad6b7
Author: peacewong <wp...@gmail.com>
AuthorDate: Thu Apr 28 17:35:29 2022 +0800

    am prints entrance taskID log
---
 .../service/engine/DefaultEngineAskEngineService.scala | 18 ++++++++++--------
 .../am/service/engine/DefaultEngineCreateService.scala | 10 ++++++----
 2 files changed, 16 insertions(+), 12 deletions(-)

diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
index 65e12d5fb..ef0dcbb49 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
@@ -21,6 +21,7 @@ import feign.RetryableException
 import org.apache.commons.lang.exception.ExceptionUtils
 import org.apache.linkis.common.exception.LinkisRetryException
 import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.governance.common.utils.JobUtils
 import org.apache.linkis.manager.am.conf.AMConfiguration
 import org.apache.linkis.manager.common.constant.AMConstant
 import org.apache.linkis.manager.common.protocol.engine._
@@ -55,7 +56,8 @@ class DefaultEngineAskEngineService extends AbstractEngineService with EngineAsk
 
   @Receiver
   override def askEngine(engineAskRequest: EngineAskRequest, sender: Sender): Any = {
-    logger.info(s"received engineAskRequest $engineAskRequest")
+    val taskId = JobUtils.getJobIdFromStringMap(engineAskRequest.getProperties)
+    logger.info(s"received task: $taskId, engineAskRequest $engineAskRequest")
     if(! engineAskRequest.getLabels.containsKey(LabelKeyConstant.EXECUTE_ONCE_KEY)) {
       val engineReuseRequest = new EngineReuseRequest()
       engineReuseRequest.setLabels(engineAskRequest.getLabels)
@@ -66,21 +68,21 @@ class DefaultEngineAskEngineService extends AbstractEngineService with EngineAsk
         t: Throwable =>
           t match {
             case retryException: LinkisRetryException =>
-              logger.warn(s"user ${engineAskRequest.getUser} reuse engine failed ${t.getMessage}")
+              logger.warn(s"task: $taskId user ${engineAskRequest.getUser} reuse engine failed ${t.getMessage}")
             case _ =>
-              logger.warn(s"user ${engineAskRequest.getUser} reuse engine failed", t)
+              logger.warn(s"task: $taskId user ${engineAskRequest.getUser} reuse engine failed", t)
           }
           null
       }
       if (null != reuseNode) {
-        logger.info(s"Finished to ask engine for user ${engineAskRequest.getUser} by reuse node $reuseNode")
+        logger.info(s"Finished to ask engine for task: $taskId user ${engineAskRequest.getUser} by reuse node $reuseNode")
         return reuseNode
       }
     }
 
     val engineAskAsyncId = getAsyncId
     val createNodeThread = Future {
-      logger.info(s"Start to async($engineAskAsyncId) createEngine, ${engineAskRequest.getCreateService}")
+      logger.info(s"Task: $taskId start to async($engineAskAsyncId) createEngine, ${engineAskRequest.getCreateService}")
       //如果原来的labels含engineInstance ,先去掉
       engineAskRequest.getLabels.remove("engineInstance")
       val engineCreateRequest = new EngineCreateRequest
@@ -96,14 +98,14 @@ class DefaultEngineAskEngineService extends AbstractEngineService with EngineAsk
       if (null == createEngineNode) {
         throw new LinkisRetryException(AMConstant.EM_ERROR_CODE, s"create engine${createNode.getServiceInstance} success, but to use engine failed")
       }
-      logger.info(s"Finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode")
+      logger.info(s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode")
       createEngineNode
     }
 
 
     createNodeThread.onComplete {
       case Success(engineNode) =>
-        logger.info(s"Success to async($engineAskAsyncId) createEngine $engineNode")
+        logger.info(s"Task: $taskId Success to async($engineAskAsyncId) createEngine $engineNode")
         sender.send(EngineCreateSuccess(engineAskAsyncId, engineNode))
       case Failure(exception) =>
         val retryFlag = exception match {
@@ -118,7 +120,7 @@ class DefaultEngineAskEngineService extends AbstractEngineService with EngineAsk
             }
           }
         }
-        logger.info(s"Failed  to async($engineAskAsyncId) createEngine, can Retry $retryFlag", exception)
+        logger.info(s"Task: $taskId Failed  to async($engineAskAsyncId) createEngine, can Retry $retryFlag", exception)
         sender.send(EngineCreateError(engineAskAsyncId, ExceptionUtils.getRootCauseMessage(exception), retryFlag))
     }
 
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
index 083524e36..dd24367b7 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
@@ -24,6 +24,7 @@ import org.apache.linkis.common.exception.LinkisRetryException
 import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
 import org.apache.linkis.governance.common.conf.GovernanceCommonConf
 import org.apache.linkis.governance.common.conf.GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME
+import org.apache.linkis.governance.common.utils.JobUtils
 import org.apache.linkis.manager.am.conf.{AMConfiguration, EngineConnConfigurationService}
 import org.apache.linkis.manager.am.exception.{AMErrorCode, AMErrorException}
 import org.apache.linkis.manager.am.label.EngineReuseLabelChooser
@@ -139,7 +140,8 @@ class DefaultEngineCreateService extends AbstractEngineService with EngineCreate
   @throws[LinkisRetryException]
   override def createEngine(engineCreateRequest: EngineCreateRequest, sender: Sender): EngineNode = {
     val startTime = System.currentTimeMillis
-    info(s"Start to create Engine for request: $engineCreateRequest.")
+    val taskId = JobUtils.getJobIdFromStringMap(engineCreateRequest.getProperties)
+    logger.info(s"Task: $taskId start to create Engine for request: $engineCreateRequest.")
     val labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
     val timeout = if (engineCreateRequest.getTimeOut <= 0) AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong else engineCreateRequest.getTimeOut
 
@@ -207,7 +209,7 @@ class DefaultEngineCreateService extends AbstractEngineService with EngineCreate
         throw t
     }
 
-    info(s"Finished to create  engineConn $engineNode. ticketId is $resourceTicketId")
+    info(s"Task: $taskId finished to create  engineConn $engineNode. ticketId is $resourceTicketId")
     engineNode.setTicketId(resourceTicketId)
     //7. 更新持久化信息:包括插入engine/metrics
 
@@ -225,9 +227,9 @@ class DefaultEngineCreateService extends AbstractEngineService with EngineCreate
     val leftWaitTime = timeout - (System.currentTimeMillis - startTime)
     if (ECAvailableRule.getInstance.isNeedAvailable(labelList)) {
       ensureECAvailable(engineNode, resourceTicketId, leftWaitTime)
-      logger.info(s"Finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode. time taken ${System.currentTimeMillis() - startTime}ms")
+      logger.info(s"Task: $taskId finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode. time taken ${System.currentTimeMillis() - startTime}ms")
     } else {
-      logger.info(s"Finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode.And did not judge the availability,time taken ${System.currentTimeMillis() - startTime}ms")
+      logger.info(s"Task: $taskId finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode.And did not judge the availability,time taken ${System.currentTimeMillis() - startTime}ms")
     }
     engineNode
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org


[incubator-linkis] 01/11: Add taskID to job idinfo

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit 682f0fc184ee612c5eacd9dac8f77f33f23140fc
Author: peacewong <wp...@gmail.com>
AuthorDate: Thu Apr 28 17:32:18 2022 +0800

    Add taskID to job idinfo
---
 .../orchestrator/converter/AbstractConverter.scala |  2 +-
 .../apache/linkis/orchestrator/plans/ast/Job.scala | 25 +++++++++++++++++++++-
 .../orchestrator/plans/physical/ExecTask.scala     |  4 ++--
 3 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/converter/AbstractConverter.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/converter/AbstractConverter.scala
index cf50d5db5..3185964f5 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/converter/AbstractConverter.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/converter/AbstractConverter.scala
@@ -40,7 +40,7 @@ trait AbstractConverter extends Converter with Logging{
     debug(s"Start to convert JobReq(${jobReq.getId}) to AstJob.")
     val job = converterTransforms.collectFirst { case transform => transform(jobReq, context) }
       .getOrElse(throw new OrchestratorErrorException(OrchestratorErrorCodeSummary.CONVERTER_FOR_NOT_SUPPORT_ERROR_CODE, "Cannot convert jobReq " + jobReq))
-    info(s"Finished to convert JobReq(${jobReq.getId}) to AstJob(${job.getId}).")
+    info(s"Finished to convert JobReq(${jobReq.getId}) to AstJob(${job.getIDInfo()}).")
     job
   }
 
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/ast/Job.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/ast/Job.scala
index 078e7d24e..04fbb6161 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/ast/Job.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/ast/Job.scala
@@ -17,6 +17,8 @@
  
 package org.apache.linkis.orchestrator.plans.ast
 
+import org.apache.commons.lang.StringUtils
+import org.apache.linkis.governance.common.utils.JobUtils
 import org.apache.linkis.orchestrator.utils.OrchestratorIDCreator
 
 /**
@@ -28,6 +30,8 @@ trait Job extends ASTOrchestration[Job] {
 
   private var id: String = _
 
+  private var idInfo: String = _
+
   override def isVisited: Boolean = visited
 
   override def setVisited(): Unit = this.visited = true
@@ -40,7 +44,7 @@ trait Job extends ASTOrchestration[Job] {
 
   def copyWithNewStages(stages: Array[Stage]): Job
 
-  override def getId: String =  {
+  override def getId: String = {
     if (null == id) synchronized {
       if (null == id) {
         id = OrchestratorIDCreator.getAstJobIDCreator.nextID("astJob")
@@ -49,4 +53,23 @@ trait Job extends ASTOrchestration[Job] {
     id
   }
 
+  def getIDInfo(): String = {
+    if (null == idInfo) synchronized {
+      if (null == idInfo) {
+        val context = getASTContext
+        if (null != context && null != context.getParams && null != context.getParams.getRuntimeParams && null != context.getParams.getRuntimeParams.toMap) {
+          val runtimeMap = context.getParams.getRuntimeParams.toMap
+          val taskId = JobUtils.getJobIdFromMap(runtimeMap)
+          if (StringUtils.isNotBlank(taskId)) {
+            idInfo = s"TaskID_${taskId}_otJobId_${getId}"
+          } else {
+            idInfo = getId
+          }
+        } else {
+          idInfo = getId
+        }
+      }
+    }
+    idInfo
+  }
 }
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/physical/ExecTask.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/physical/ExecTask.scala
index b81347668..40b4cf0ee 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/physical/ExecTask.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/physical/ExecTask.scala
@@ -45,9 +45,9 @@ trait ExecTask extends PhysicalOrchestration[ExecTask] {
     val desc = getTaskDesc
     val jobID = desc.getOrigin.getASTOrchestration match {
       case job: Job =>
-        job.getId
+        job.getIDInfo()
       case stage: Stage =>
-        stage.getJob.getId
+        stage.getJob.getIDInfo()
       case _ => ""
     }
     jobID + "_" + getId


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org


[incubator-linkis] 04/11: prints entrance taskID log

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit a0e96bed65c325acef5d91d80cf24edc02ba0989
Author: peacewong <wp...@gmail.com>
AuthorDate: Thu Apr 28 17:35:12 2022 +0800

    prints entrance taskID log
---
 .../src/main/scala/org/apache/linkis/entrance/EntranceServer.scala      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index 33f33d552..d56037c7e 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -108,7 +108,7 @@ abstract class EntranceServer extends Logging {
        * */
       Utils.tryAndWarn(job.getJobListener.foreach(_.onJobInited(job)))
       getEntranceContext.getOrCreateScheduler().submit(job)
-      val msg = s"Job with jobId : ${job.getId} and execID : ${job.getId()} submitted "
+      val msg = s"Job with jobId : ${jobRequest.getId} and execID : ${job.getId()} submitted "
       logger.info(msg)
 
       job match {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org


[incubator-linkis] 10/11: add junit test

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit 61b7acc285ebb46c246185f99874862583be6717
Author: peacewong <wp...@gmail.com>
AuthorDate: Fri Apr 29 16:56:23 2022 +0800

    add junit test
---
 .../scala/org/apache/linkis/governance/common/utils/JobUtilsTest.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/test/scala/org/apache/linkis/governance/common/utils/JobUtilsTest.scala b/linkis-computation-governance/linkis-computation-governance-common/src/test/scala/org/apache/linkis/governance/common/utils/JobUtilsTest.scala
index bfeb038ee..a26d51a70 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/test/scala/org/apache/linkis/governance/common/utils/JobUtilsTest.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/test/scala/org/apache/linkis/governance/common/utils/JobUtilsTest.scala
@@ -29,7 +29,7 @@ class JobUtilsTest {
   def testGetJobIdFromMap(): Unit = {
     val map: util.Map[String, Object] = new util.HashMap[String, Object]()
     Assertions.assertNull(JobUtils.getJobIdFromMap(map))
-    map.put(JobRequestConstants.JOB_ID, 100L)
+    map.put(JobRequestConstants.JOB_ID, "100")
     Assertions.assertNotNull(JobUtils.getJobIdFromMap(map))
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org


[incubator-linkis] 03/11: add getJobIdFromStringMap method

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit 8dc3784395df30bc39dec487c66f2de2b787d052
Author: peacewong <wp...@gmail.com>
AuthorDate: Thu Apr 28 17:34:45 2022 +0800

    add getJobIdFromStringMap method
---
 .../org/apache/linkis/governance/common/utils/JobUtils.scala  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/JobUtils.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/JobUtils.scala
index 1b0638d69..36cc86293 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/JobUtils.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/JobUtils.scala
@@ -18,7 +18,6 @@
 package org.apache.linkis.governance.common.utils
 
 import org.apache.linkis.governance.common.constant.job.JobRequestConstants
-import org.apache.linkis.protocol.utils.TaskUtils
 
 import java.util;
 
@@ -34,4 +33,14 @@ object JobUtils {
     null
   }
 
+  def getJobIdFromStringMap(map: util.Map[String, String]): String = {
+    if (null != map && map.containsKey(JobRequestConstants.JOB_ID)) {
+      val value = map.get(JobRequestConstants.JOB_ID)
+      if (null != value) {
+        return value
+      }
+    }
+    null
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org


[incubator-linkis] 08/11: optimize code add locker

Posted by ca...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git

commit 2b8503f638acbb94a601250649b6eb36bd648bb3
Author: peacewong <wp...@gmail.com>
AuthorDate: Thu Apr 28 19:59:26 2022 +0800

    optimize code add locker
---
 .../main/scala/org/apache/linkis/orchestrator/plans/ast/Job.scala  | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/ast/Job.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/ast/Job.scala
index 04fbb6161..cd0ddbb6f 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/ast/Job.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/ast/Job.scala
@@ -32,6 +32,9 @@ trait Job extends ASTOrchestration[Job] {
 
   private var idInfo: String = _
 
+  private val idLock = new Array[Byte](0)
+  private val idInfoLock = new Array[Byte](0)
+
   override def isVisited: Boolean = visited
 
   override def setVisited(): Unit = this.visited = true
@@ -45,7 +48,7 @@ trait Job extends ASTOrchestration[Job] {
   def copyWithNewStages(stages: Array[Stage]): Job
 
   override def getId: String = {
-    if (null == id) synchronized {
+    if (null == id) idLock synchronized {
       if (null == id) {
         id = OrchestratorIDCreator.getAstJobIDCreator.nextID("astJob")
       }
@@ -54,7 +57,7 @@ trait Job extends ASTOrchestration[Job] {
   }
 
   def getIDInfo(): String = {
-    if (null == idInfo) synchronized {
+    if (null == idInfo) idInfoLock synchronized {
       if (null == idInfo) {
         val context = getASTContext
         if (null != context && null != context.getParams && null != context.getParams.getRuntimeParams && null != context.getParams.getRuntimeParams.toMap) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org