You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2023/03/07 13:04:16 UTC

[linkis] branch dev-1.3.2 updated: [Feature][LinkisManager] Add task fixed engine label (#4326)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new 29fb49662 [Feature][LinkisManager] Add task fixed engine label  (#4326)
29fb49662 is described below

commit 29fb4966299a8ed9ce9189c6a80aa16ee37b6133
Author: peacewong <wp...@gmail.com>
AuthorDate: Tue Mar 7 21:04:08 2023 +0800

    [Feature][LinkisManager] Add task fixed engine label  (#4326)
    
    * Annotation optimization
    
    * add fixed engine conn label
    
    * Fix FixedEngineLabel get and set value key to sessionId
    
    * Fix kill -9 hive ec  not to kill yarn app id
    
    * Fix hive kill task not to kill yarn appid
    
    * If the code has been compiled successfully, run is not compiling
    
    * Update the default number of tasks executed by the serial engine to 50
    
    * [Feature][LinkisManager] Add task fixed engine label #4325
    
    * remove close invoke kill task
---
 .../common/conf/GovernaceCommonConf.scala          |  2 +
 .../service/impl/DefaultEngineConnKillService.java |  2 +-
 .../impl/DefaultEngineConnListService.scala        |  4 +-
 .../executor/conf/ComputationExecutorConf.scala    | 90 +++++++++++++++++-----
 .../engine/DefaultEngineAskEngineService.scala     |  2 +-
 .../engine/DefaultEngineCreateService.scala        | 40 ++++++----
 .../service/engine/DefaultEngineReuseService.scala | 29 +++++--
 .../manager/label/constant/LabelKeyConstant.java   |  2 +
 .../label/entity/engine/FixedEngineConnLabel.java  | 70 +++++++++++++++++
 .../label/entity/entrance/ExecuteOnceLabel.java    |  3 +-
 .../errorcode/LabelCommonErrorCodeSummary.java     | 17 ++--
 .../common/protocol/engine/EngineReuseRequest.java | 10 +++
 .../main/resources/linkis-engineconn.properties    |  8 +-
 .../hive/src/main/resources/log4j2.xml             |  2 +-
 .../hive/conf/HiveEngineConfiguration.scala        |  6 +-
 .../hive/executor/HiveEngineConnExecutor.scala     | 27 ++++---
 16 files changed, 238 insertions(+), 76 deletions(-)

diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala
index 5fdc9cf7f..48f906bb8 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala
@@ -23,6 +23,8 @@ object GovernanceCommonConf {
 
   val CONF_FILTER_RM = "wds.linkis.rm"
 
+  val WILDCARD_CONSTANT = "*"
+
   val SPARK_ENGINE_VERSION = CommonVars("wds.linkis.spark.engine.version", "2.4.3")
 
   val HIVE_ENGINE_VERSION = CommonVars("wds.linkis.hive.engine.version", "1.2.1")
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
index 8c9e829ce..4f593736d 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
@@ -94,7 +94,7 @@ public class DefaultEngineConnKillService implements EngineConnKillService {
         return response;
     }
 
-    private  void killYarnAppIdOfOneEc(EngineConn engineConn) {
+    public void killYarnAppIdOfOneEc(EngineConn engineConn) {
         String engineConnInstance = engineConn.getServiceInstance().toString();
         logger.info("try to kill yarn app ids in the engine of ({}).", engineConnInstance);
         String engineLogDir = engineConn.getEngineConnManagerEnv().engineConnLogDirs();
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
index b7fbfd50d..4b9a59b4d 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
@@ -77,6 +77,8 @@ class DefaultEngineConnListService
         Utils.tryAndWarn {
           if (NodeStatus.Failed == conn.getStatus && StringUtils.isNotBlank(conn.getPid)) {
             killECByEngineConnKillService(conn)
+          } else {
+            getEngineConnKillService().killYarnAppIdOfOneEc(conn)
           }
           conn.close()
         }
@@ -184,7 +186,7 @@ class DefaultEngineConnListService
     logger.info(s"start to kill ec by engineConnKillService ${engineconn.getServiceInstance}")
     val engineStopRequest = new EngineStopRequest()
     engineStopRequest.setServiceInstance(engineconn.getServiceInstance)
-    getEngineConnKillService.dealEngineConnStop(engineStopRequest)
+    getEngineConnKillService().dealEngineConnStop(engineStopRequest)
   }
 
 }
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala
index 7765cfc96..fec2fe5e7 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala
@@ -22,50 +22,100 @@ import org.apache.linkis.common.conf.{ByteType, CommonVars}
 object ComputationExecutorConf {
 
   val ENGINE_RESULT_SET_MAX_CACHE =
-    CommonVars("wds.linkis.engine.resultSet.cache.max", new ByteType("0k"))
+    CommonVars("wds.linkis.engine.resultSet.cache.max", new ByteType("0k"), "Result set cache size")
 
   val ENGINE_LOCK_DEFAULT_EXPIRE_TIME =
-    CommonVars("wds.linkis.engine.lock.expire.time", 2 * 60 * 1000)
+    CommonVars("wds.linkis.engine.lock.expire.time", 2 * 60 * 1000, "lock expiration time")
 
-  val ENGINE_MAX_TASK_EXECUTE_NUM = CommonVars("wds.linkis.engineconn.max.task.execute.num", 0)
+  val ENGINE_MAX_TASK_EXECUTE_NUM = CommonVars(
+    "wds.linkis.engineconn.max.task.execute.num",
+    50,
+    "Maximum number of tasks executed by the synchronization EC"
+  )
 
   val ENGINE_PROGRESS_FETCH_INTERVAL =
-    CommonVars("wds.linkis.engineconn.progresss.fetch.interval-in-seconds", 5)
+    CommonVars(
+      "wds.linkis.engineconn.progresss.fetch.interval-in-seconds",
+      5,
+      "Progress information push interval"
+    )
 
-  val UDF_LOAD_FAILED_IGNORE = CommonVars("wds.linkis.engineconn.udf.load.ignore", true)
+  val UDF_LOAD_FAILED_IGNORE =
+    CommonVars("wds.linkis.engineconn.udf.load.ignore", true, "UDF load failed ignore")
 
-  val FUNCTION_LOAD_FAILED_IGNORE = CommonVars("wds.linkis.engineconn.function.load.ignore", true)
+  val FUNCTION_LOAD_FAILED_IGNORE =
+    CommonVars("wds.linkis.engineconn.function.load.ignore", true, "Function load failed ignore")
 
   val TASK_IGNORE_UNCOMPLETED_STATUS =
-    CommonVars("wds.linkis.engineconn.task.ignore.uncompleted.status", true).getValue
-
-  val ENGINE_CONCURRENT_THREAD_NUM = CommonVars("wds.linkis.engineconn.concurrent.thread.num", 20)
+    CommonVars(
+      "wds.linkis.engineconn.task.ignore.uncompleted.status",
+      true,
+      "Ignore pushes with uncompleted status"
+    ).getValue
+
+  val ENGINE_CONCURRENT_THREAD_NUM = CommonVars(
+    "wds.linkis.engineconn.concurrent.thread.num",
+    20,
+    "Maximum thread pool of the concurrent EC"
+  )
 
-  val ASYNC_EXECUTE_MAX_PARALLELISM = CommonVars("wds.linkis.engineconn.max.parallelism", 300)
+  val ASYNC_EXECUTE_MAX_PARALLELISM = CommonVars(
+    "wds.linkis.engineconn.max.parallelism",
+    300,
+    "Maximum  parallelism for the asynchronous EC"
+  )
 
   val ASYNC_SCHEDULER_MAX_RUNNING_JOBS =
-    CommonVars("wds.linkis.engineconn.async.group.max.running", 10).getValue
+    CommonVars(
+      "wds.linkis.engineconn.async.group.max.running",
+      10,
+      "Maximum number of running tasks for a group of asynchronous EC"
+    ).getValue
 
   val DEFAULT_COMPUTATION_EXECUTORMANAGER_CLAZZ = CommonVars(
     "wds.linkis.default.computation.executormanager.clazz",
-    "org.apache.linkis.engineconn.computation.executor.creation.ComputationExecutorManagerImpl"
+    "org.apache.linkis.engineconn.computation.executor.creation.ComputationExecutorManagerImpl",
+    "Executor manager implementation class"
   )
 
   val UPSTREAM_MONITOR_ECTASK_SHOULD_START =
-    CommonVars("linkis.upstream.monitor.ectask.should.start", true).getValue
+    CommonVars(
+      "linkis.upstream.monitor.ectask.should.start",
+      true,
+      "Enable upstream live monitoring"
+    ).getValue
 
   val UPSTREAM_MONITOR_WRAPPER_ENTRIES_SURVIVE_THRESHOLD_SEC =
-    CommonVars("linkis.upstream.monitor.wrapper.entries.survive.time.sec", 86400).getValue
+    CommonVars(
+      "linkis.upstream.monitor.wrapper.entries.survive.time.sec",
+      86400,
+      "Upstream task cache cleanup threshold"
+    ).getValue
 
   val UPSTREAM_MONITOR_ECTASK_ENTRANCE_THRESHOLD_SEC =
-    CommonVars("linkis.upstream.monitor.ectask.entrance.threshold.sec", 15).getValue
-
-  val HIVE_RESULTSET_USE_TABLE_NAME = CommonVars("hive.resultset.use.unique.column.names", false)
+    CommonVars(
+      "linkis.upstream.monitor.ectask.entrance.threshold.sec",
+      15,
+      "Maximum heartbeat time for whether the upstream task is alive"
+    ).getValue
+
+  val HIVE_RESULTSET_USE_TABLE_NAME = CommonVars(
+    "hive.resultset.use.unique.column.names",
+    false,
+    "hive result set to enable unique column names"
+  )
 
-  val JOB_ID_TO_ENV_KEY = CommonVars("wds.linkis.ec.job.id.env.key", "LINKIS_JOB_ID").getValue
+  val JOB_ID_TO_ENV_KEY =
+    CommonVars("wds.linkis.ec.job.id.env.key", "LINKIS_JOB_ID", "LINKIS_JOB_ID ENV").getValue
 
   val TASK_ASYNC_MAX_THREAD_SIZE =
-    CommonVars("linkis.ec.task.execution.async.thread.size", 50).getValue
+    CommonVars(
+      "linkis.ec.task.execution.async.thread.size",
+      50,
+      "Task submit thread pool size"
+    ).getValue
+
+  val TASK_SUBMIT_WAIT_TIME_MS =
+    CommonVars("linkis.ec.task.submit.wait.time.ms", 2L, "Task submit wait time(ms)").getValue
 
-  val TASK_SUBMIT_WAIT_TIME_MS = CommonVars("linkis.ec.task.submit.wait.time.ms", 2L).getValue
 }
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
index 2fc49fe56..4616cf85a 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
@@ -74,7 +74,7 @@ class DefaultEngineAskEngineService
       engineReuseRequest.setLabels(engineAskRequest.getLabels)
       engineReuseRequest.setTimeOut(engineAskRequest.getTimeOut)
       engineReuseRequest.setUser(engineAskRequest.getUser)
-
+      engineReuseRequest.setProperties(engineAskRequest.getProperties)
       val reuseNode = Utils.tryCatch(engineReuseService.reuseEngine(engineReuseRequest, sender)) {
         t: Throwable =>
           t match {
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
index 763a42e92..d6ceccef9 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
@@ -118,7 +118,7 @@ class DefaultEngineCreateService
         AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong
       } else engineCreateRequest.getTimeout
 
-    // 1. 检查Label是否合法
+    // 1. Check if Label is valid
     var labelList: util.List[Label[_]] = LabelUtils.distinctLabel(
       labelBuilderFactory.getLabels(engineCreateRequest.getLabels),
       userLabelService.getUserLabels(engineCreateRequest.getUser)
@@ -145,11 +145,11 @@ class DefaultEngineCreateService
     emInstanceLabel.setAlias(ENGINE_CONN_MANAGER_SPRING_NAME.getValue)
     emLabelList.add(emInstanceLabel)
 
-    // 2. NodeLabelService getNodesByLabel  获取EMNodeList
+    // 2. Get all available ECMs by labels
     val emScoreNodeList =
       getEMService().getEMNodes(emLabelList.asScala.filter(!_.isInstanceOf[EngineTypeLabel]).asJava)
 
-    // 3. 执行Select  比如负载过高,返回没有负载低的EM,每个规则如果返回为空就抛出异常
+    // 3. Get the ECM with the lowest load by selection algorithm
     val choseNode =
       if (null == emScoreNodeList || emScoreNodeList.isEmpty) null
       else {
@@ -163,11 +163,11 @@ class DefaultEngineCreateService
       )
     }
     val emNode = choseNode.get.asInstanceOf[EMNode]
-    // 4. 请求资源
+    // 4. request resource
     val (resourceTicketId, resource) =
       requestResource(engineCreateRequest, labelFilter.choseEngineLabel(labelList), emNode, timeout)
 
-    // 5. 封装engineBuildRequest对象,并发送给EM进行执行
+    // 5. build engineConn request
     val engineBuildRequest = EngineConnBuildRequestImpl(
       resourceTicketId,
       labelFilter.choseEngineLabel(labelList),
@@ -179,8 +179,10 @@ class DefaultEngineCreateService
       )
     )
 
-    // 6. 调用EM发送引擎启动请求调用ASK
-    // AM会更新serviceInstance表  需要将ticketID进行替换,并更新 EngineConn的Label 需要修改EngineInstanceLabel 中的id为Instance信息
+    // 6. Call ECM to send engine start request
+    // AM will update the serviceInstance table
+    // It is necessary to replace the ticketID and update the Label of EngineConn
+    // It is necessary to modify the id in EngineInstanceLabel to Instance information
     val oldServiceInstance = new ServiceInstance
     oldServiceInstance.setApplicationName(GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue)
     oldServiceInstance.setInstance(resourceTicketId)
@@ -206,8 +208,8 @@ class DefaultEngineCreateService
       s"Task: $taskId finished to create  engineConn $engineNode. ticketId is $resourceTicketId"
     )
     engineNode.setTicketId(resourceTicketId)
-    // 7. 更新持久化信息:包括插入engine/metrics
 
+    // 7.Update persistent information: including inserting engine/metrics
     Utils.tryCatch(getEngineNodeManager.updateEngineNode(oldServiceInstance, engineNode)) { t =>
       logger.warn(s"Failed to update engineNode $engineNode", t)
       val stopEngineRequest =
@@ -230,7 +232,7 @@ class DefaultEngineCreateService
       )
     }
 
-    // 8. 新增 EngineConn的Label,添加engineConn的Alias
+    // 8. Add the Label of EngineConn, and add the Alias of engineConn
     val engineConnAliasLabel = labelBuilderFactory.createLabel(classOf[AliasServiceInstanceLabel])
     engineConnAliasLabel.setAlias(GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue)
     labelList.add(engineConnAliasLabel)
@@ -262,16 +264,21 @@ class DefaultEngineCreateService
     engineNode
   }
 
+  /**
+   * Read the management console configuration and the parameters passed in by the user to combine
+   * request resources
+   * @param engineCreateRequest
+   * @param labelList
+   * @param emNode
+   * @param timeout
+   * @return
+   */
   private def requestResource(
       engineCreateRequest: EngineCreateRequest,
       labelList: util.List[Label[_]],
       emNode: EMNode,
       timeout: Long
   ): (String, NodeResource) = {
-    // 4.  向RM申请对应EM和用户的资源, 抛出资源不足异常:RetryException
-    // 4.1 TODO 如果EM资源不足,触发EM回收空闲的engine
-    // 4.2 TODO 如果用户资源不足,触发用户空闲的engine回收
-    // 读取管理台的的配置
     if (engineCreateRequest.getProperties == null) {
       engineCreateRequest.setProperties(new util.HashMap[String, String]())
     }
@@ -313,7 +320,7 @@ class DefaultEngineCreateService
   }
 
   private def ensuresIdle(engineNode: EngineNode, resourceTicketId: String): Boolean = {
-    // TODO 逻辑需要修改,修改为engineConn主动上报
+
     val engineNodeInfo = Utils.tryAndWarnMsg(
       getEngineNodeManager.getEngineNodeInfoByDB(engineNode)
     )("Failed to from db get engine node info")
@@ -324,12 +331,12 @@ class DefaultEngineCreateService
       if (canRetry.isDefined) {
         throw new LinkisRetryException(
           AMConstant.ENGINE_ERROR_CODE,
-          s"${engineNode.getServiceInstance} ticketID:$resourceTicketId 初始化引擎失败,原因: ${reason}"
+          s"${engineNode.getServiceInstance} ticketID:$resourceTicketId Failed to initialize engine, reason: ${reason}"
         )
       }
       throw new AMErrorException(
         AMConstant.EM_ERROR_CODE,
-        s"${engineNode.getServiceInstance} ticketID:$resourceTicketId 初始化引擎失败,原因: ${reason}"
+        s"${engineNode.getServiceInstance} ticketID:$resourceTicketId Failed to initialize engine, reason: ${reason}"
       )
     }
     NodeStatus.isAvailable(engineNodeInfo.getNodeStatus)
@@ -367,7 +374,6 @@ class DefaultEngineCreateService
       logger.info(
         s"Start to wait engineConn($engineNode) to be available, but only ${ByteTimeUtils.msDurationToString(timeout)} left."
       )
-      // 获取启动的引擎信息,并等待引擎的状态变为IDLE,如果等待超时则返回给用户,并抛出异常
       Utils.waitUntil(
         () => ensuresIdle(engineNode, resourceTicketId),
         Duration(timeout, TimeUnit.MILLISECONDS)
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala
index 65e41ae1c..5245036f5 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala
@@ -20,6 +20,7 @@ package org.apache.linkis.manager.am.service.engine
 import org.apache.linkis.common.exception.LinkisRetryException
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.governance.common.conf.GovernanceCommonConf
+import org.apache.linkis.governance.common.utils.JobUtils
 import org.apache.linkis.manager.am.conf.AMConfiguration
 import org.apache.linkis.manager.am.label.EngineReuseLabelChooser
 import org.apache.linkis.manager.am.selector.NodeSelector
@@ -66,10 +67,19 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
   @Autowired
   private var engineStopService: EngineStopService = _
 
+  /**
+   *   1. Obtain the EC corresponding to all labels 2. Judging reuse exclusion tags and fixed engine
+   *      labels 3. Select the EC with the lowest load available 4. Lock the corresponding EC
+   * @param engineReuseRequest
+   * @param sender
+   * @throws
+   * @return
+   */
   @Receiver
   @throws[LinkisRetryException]
   override def reuseEngine(engineReuseRequest: EngineReuseRequest, sender: Sender): EngineNode = {
-    logger.info(s"Start to reuse Engine for request: $engineReuseRequest")
+    val taskId = JobUtils.getJobIdFromStringMap(engineReuseRequest.getProperties)
+    logger.info(s"Task $taskId Start to reuse Engine for request: $engineReuseRequest")
     val labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
     val labelList = LabelUtils
       .distinctLabel(
@@ -86,6 +96,17 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
           Array.empty[String]
       }
 
+    if (
+        exclusionInstances.length == 1 && exclusionInstances(
+          0
+        ) == GovernanceCommonConf.WILDCARD_CONSTANT
+    ) {
+      logger.info(
+        s"Task $taskId exists ReuseExclusionLabel and the configuration does not choose to reuse EC"
+      )
+      return null
+    }
+
     var filterLabelList = labelList.filter(_.isInstanceOf[EngineNodeLabel]).asJava
 
     val engineConnAliasLabel = labelBuilderFactory.createLabel(classOf[AliasServiceInstanceLabel])
@@ -121,7 +142,7 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
       )
     }
     var engineScoreList =
-      getEngineNodeManager.getEngineNodes(instances.asScala.map(_._1).toSeq.toArray)
+      getEngineNodeManager.getEngineNodes(instances.asScala.keys.toSeq.toArray)
 
     var engine: EngineNode = null
     var count = 1
@@ -140,14 +161,10 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
           s"Engine reuse exceeds limit: $reuseLimit"
         )
       }
-      // 3. 执行Select 判断label分数、判断是否可用、判断负载
       val choseNode = nodeSelector.choseNode(engineScoreList.toArray)
-      // 4. 获取Select后排在第一的engine,修改EngineNode的Label为新标签,并调用EngineNodeManager的reuse请求
       if (choseNode.isEmpty) {
         throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, "No engine can be reused")
       }
-
-      // 5. 调用EngineNodeManager 进行reuse 如果reuse失败,则去掉该engine进行重新reuse走3和4
       val engineNode = choseNode.get.asInstanceOf[EngineNode]
       logger.info(s"prepare to reuse engineNode: ${engineNode.getServiceInstance}")
       engine = Utils.tryCatch(getEngineNodeManager.reuseEngine(engineNode)) { t: Throwable =>
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
index 742db2be2..bcb14a704 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
@@ -61,4 +61,6 @@ public class LabelKeyConstant {
   public static final String REUSE_EXCLUSION_KEY = "reuseExclusion";
 
   public static final String TENANT_KEY = "tenant";
+
+  public static final String FIXED_EC_KEY = "fixedEngineConn";
 }
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/FixedEngineConnLabel.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/FixedEngineConnLabel.java
new file mode 100644
index 000000000..d86308f8d
--- /dev/null
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/FixedEngineConnLabel.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.label.entity.engine;
+
+import org.apache.linkis.manager.label.constant.LabelKeyConstant;
+import org.apache.linkis.manager.label.entity.*;
+import org.apache.linkis.manager.label.entity.annon.ValueSerialNum;
+import org.apache.linkis.manager.label.exception.LabelErrorException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+
+import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.CHECK_LABEL_VALUE_EMPTY;
+import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.LABEL_ERROR_CODE;
+
+public class FixedEngineConnLabel extends GenericLabel implements EngineNodeLabel, UserModifiable {
+
+  public FixedEngineConnLabel() {
+    setLabelKey(LabelKeyConstant.FIXED_EC_KEY);
+  }
+
+  @ValueSerialNum(0)
+  public void setSessionId(String SessionId) {
+    if (getValue() == null) {
+      setValue(new HashMap<>());
+    }
+    getValue().put("sessionId", SessionId);
+  }
+
+  public String getSessionId() {
+    if (getValue() == null) {
+      return null;
+    }
+    return getValue().get("sessionId");
+  }
+
+  @Override
+  public Feature getFeature() {
+    return Feature.CORE;
+  }
+
+  @Override
+  public void valueCheck(String stringValue) throws LabelErrorException {
+    if (!StringUtils.isBlank(stringValue)) {
+      if (stringValue.split(SerializableLabel.VALUE_SEPARATOR).length != 1) {
+        throw new LabelErrorException(
+            LABEL_ERROR_CODE.getErrorCode(), LABEL_ERROR_CODE.getErrorDesc());
+      }
+    } else {
+      throw new LabelErrorException(
+          CHECK_LABEL_VALUE_EMPTY.getErrorCode(), CHECK_LABEL_VALUE_EMPTY.getErrorDesc());
+    }
+  }
+}
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/entrance/ExecuteOnceLabel.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/entrance/ExecuteOnceLabel.java
index 6f7f143d2..e18631eed 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/entrance/ExecuteOnceLabel.java
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/entrance/ExecuteOnceLabel.java
@@ -18,10 +18,11 @@
 package org.apache.linkis.manager.label.entity.entrance;
 
 import org.apache.linkis.manager.label.constant.LabelKeyConstant;
+import org.apache.linkis.manager.label.entity.EngineNodeLabel;
 import org.apache.linkis.manager.label.entity.Feature;
 import org.apache.linkis.manager.label.entity.GenericLabel;
 
-public class ExecuteOnceLabel extends GenericLabel {
+public class ExecuteOnceLabel extends GenericLabel implements EngineNodeLabel {
 
   public ExecuteOnceLabel() {
     setLabelKey(LabelKeyConstant.EXECUTE_ONCE_KEY);
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java
index 20e0a0a49..a0ebdffe8 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java
@@ -20,17 +20,18 @@ package org.apache.linkis.manager.label.errorcode;
 import org.apache.linkis.common.errorcode.LinkisErrorCode;
 
 public enum LabelCommonErrorCodeSummary implements LinkisErrorCode {
-  UPDATE_LABEL_FAILED(40001, "Update label realtion failed(更新标签属性失败)"),
+  UPDATE_LABEL_FAILED(25001, "Update label realtion failed(更新标签属性失败)"),
   LABEL_ERROR_CODE(
-      40001,
+      25002,
       "The value of the label is set incorrectly, only one value can be set, and the separator symbol '-' cannot be used(标签的值设置错误,只能设置一个值,不能使用分割符符号 '-') "),
-  FAILED_BUILD_COMBINEDLABEL(40001, "Failed to build combinedLabel(构建组合标签失败)"),
-  FAILED_READ_INPUT_STREAM(40001, "Fail to read value input stream(读取值输入流失败)"),
-  FAILED_CONSTRUCT_INSTANCE(40001, "Fail to construct a label instance of:{0}(未能构建标签实例)"),
-  NOT_SUPPORT_ENVTYPE(40001, "Not support envType:{0}(不支持 envType)"),
+  FAILED_BUILD_COMBINEDLABEL(25003, "Failed to build combinedLabel(构建组合标签失败)"),
+  FAILED_READ_INPUT_STREAM(25004, "Fail to read value input stream(读取值输入流失败)"),
+  FAILED_CONSTRUCT_INSTANCE(25005, "Fail to construct a label instance of:{0}(未能构建标签实例)"),
+  NOT_SUPPORT_ENVTYPE(25006, "Not support envType:{0}(不支持 envType)"),
   CHECK_LABEL_REMOVE_REQUEST(
-      130001,
-      "ServiceInstance in request is null, please check label remove request(请求中的 ServiceInstance 为空,请检查标签删除请求)");
+      25007,
+      "ServiceInstance in request is null, please check label remove request(请求中的 ServiceInstance 为空,请检查标签删除请求)"),
+  CHECK_LABEL_VALUE_EMPTY(25008, "Label value cannot be empty");
 
   /** (errorCode)错误码 */
   private final int errorCode;
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineReuseRequest.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineReuseRequest.java
index e8f2defb4..0f3b16141 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineReuseRequest.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineReuseRequest.java
@@ -29,6 +29,8 @@ public class EngineReuseRequest implements EngineRequest {
 
   private String user;
 
+  private Map<String, String> properties;
+
   public Map<String, Object> getLabels() {
     return labels;
   }
@@ -62,6 +64,14 @@ public class EngineReuseRequest implements EngineRequest {
     this.user = user;
   }
 
+  public Map<String, String> getProperties() {
+    return properties;
+  }
+
+  public void setProperties(Map<String, String> properties) {
+    this.properties = properties;
+  }
+
   @Override
   public String toString() {
     return "EngineReuseRequest{"
diff --git a/linkis-engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
index a2135181f..7fa9cab8e 100644
--- a/linkis-engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
+++ b/linkis-engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
@@ -15,13 +15,11 @@
 
 #
 wds.linkis.server.version=v1
-wds.linkis.engineconn.debug.enable=true
 #wds.linkis.keytab.enable=true
 wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.hive.HiveEngineConnPlugin
-wds.linkis.bdp.hive.init.sql.enable=true
+#wds.linkis.bdp.hive.init.sql.enable=true
 wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineplugin.hive.hook.HiveAddJarsEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.HiveUseDatabaseEngineHook,org.apache.linkis.engineconn.computation.executor.hook.HiveInitSQLHook
 
-wds.linkis.engineconn.maintain.enable=true
-
+#wds.linkis.engineconn.maintain.enable=true
 #Depending on the engine selected in HIVE_ENGINE_TYPE, control the function called when canceling the task in scripts.
-wds.linkis.hive.engine.type=mr
\ No newline at end of file
+linkis.hive.engine.type=mr
\ No newline at end of file
diff --git a/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
index d490defe9..e6f8b150f 100644
--- a/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
@@ -28,7 +28,7 @@
         </RollingFile>
 
         <File name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS:-logs}/yarnApp.log">
-            <RegexFilter regex=".* application .*" onMatch="ACCEPT" onMismatch="DENY"/>
+            <RegexFilter regex=".*application_.*" onMatch="ACCEPT" onMismatch="DENY"/>
             <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
         </File>
 
diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala
index 047157b78..255800084 100644
--- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala
+++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala
@@ -27,10 +27,10 @@ object HiveEngineConfiguration {
   )
 
   val ENABLE_FETCH_BASE64 =
-    CommonVars[Boolean]("wds.linkis.hive.enable.fetch.base64", false).getValue
+    CommonVars[Boolean]("linkis.hive.enable.fetch.base64", false).getValue
 
   val BASE64_SERDE_CLASS = CommonVars[String](
-    "wds.linkis.hive.base64.serde.class",
+    "linkis.hive.base64.serde.class",
     "org.apache.linkis.engineplugin.hive.serde.CustomerDelimitedJSONSerDe"
   ).getValue
 
@@ -39,5 +39,5 @@ object HiveEngineConfiguration {
     CommonVars[String]("HIVE_AUX_JARS_PATH", "").getValue
   ).getValue
 
-  val HIVE_ENGINE_TYPE = CommonVars[String]("wds.linkis.hive.engine.type", "mr").getValue
+  val HIVE_ENGINE_TYPE = CommonVars[String]("linkis.hive.engine.type", "mr").getValue
 }
diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index b50d3c68a..1a1da03cc 100644
--- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -28,20 +28,17 @@ import org.apache.linkis.engineconn.core.EngineConnObject
 import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
 import org.apache.linkis.engineplugin.hive.conf.{Counters, HiveEngineConfiguration}
 import org.apache.linkis.engineplugin.hive.cs.CSHiveHelper
-import org.apache.linkis.engineplugin.hive.errorcode.HiveErrorCodeSummary.COMPILE_HIVE_QUERY_ERROR
-import org.apache.linkis.engineplugin.hive.errorcode.HiveErrorCodeSummary.GET_FIELD_SCHEMAS_ERROR
+import org.apache.linkis.engineplugin.hive.errorcode.HiveErrorCodeSummary.{
+  COMPILE_HIVE_QUERY_ERROR,
+  GET_FIELD_SCHEMAS_ERROR
+}
 import org.apache.linkis.engineplugin.hive.exception.HiveQueryFailedException
 import org.apache.linkis.engineplugin.hive.progress.HiveProgressHelper
 import org.apache.linkis.governance.common.paser.SQLCodeParser
 import org.apache.linkis.governance.common.utils.JobUtils
 import org.apache.linkis.hadoop.common.conf.HadoopConf
-import org.apache.linkis.manager.common.entity.resource.{
-  CommonNodeResource,
-  LoadInstanceResource,
-  NodeResource
-}
+import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, NodeResource}
 import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
-import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
 import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.protocol.engine.JobProgressInfo
@@ -54,7 +51,6 @@ import org.apache.linkis.storage.domain.{Column, DataType}
 import org.apache.linkis.storage.resultset.ResultSetFactory
 import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
 
-import org.apache.commons.io.IOUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.hive.common.HiveInterruptUtils
 import org.apache.hadoop.hive.conf.HiveConf
@@ -106,8 +102,6 @@ class HiveEngineConnExecutor(
 
   private var singleLineProgress: Float = 0.0f
 
-  private var stage: Int = 0
-
   private var engineExecutorContext: EngineExecutionContext = _
 
   private val singleCodeCompleted: AtomicBoolean = new AtomicBoolean(false)
@@ -227,10 +221,18 @@ class HiveEngineConnExecutor(
         if (numberOfMRJobs > 0) {
           engineExecutorContext.appendStdout(s"Your hive sql has $numberOfMRJobs MR jobs to do")
         }
+        if (thread.isInterrupted) {
+          logger.error(
+            "The thread of execution has been interrupted and the task should be terminated"
+          )
+          return ErrorExecuteResponse(
+            "The thread of execution has been interrupted and the task should be terminated",
+            null
+          )
+        }
         val hiveResponse: CommandProcessorResponse = driver.run(realCode, compileRet == 0)
         if (hiveResponse.getResponseCode != 0) {
           LOG.error("Hive query failed, response code is {}", hiveResponse.getResponseCode)
-          // todo check uncleared context ?
           return ErrorExecuteResponse(hiveResponse.getErrorMessage, hiveResponse.getException)
         }
         engineExecutorContext.appendStdout(
@@ -541,6 +543,7 @@ class HiveEngineConnExecutor(
       case "mr" =>
         HadoopJobExecHelper.killRunningJobs()
         Utils.tryQuietly(HiveInterruptUtils.interrupt())
+        Utils.tryAndWarn(driver.close())
         if (null != thread) {
           Utils.tryAndWarn(thread.interrupt())
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org