You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2023/03/07 13:04:16 UTC
[linkis] branch dev-1.3.2 updated: [Feature][LinkisManager] Add task fixed engine label (#4326)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new 29fb49662 [Feature][LinkisManager] Add task fixed engine label (#4326)
29fb49662 is described below
commit 29fb4966299a8ed9ce9189c6a80aa16ee37b6133
Author: peacewong <wp...@gmail.com>
AuthorDate: Tue Mar 7 21:04:08 2023 +0800
[Feature][LinkisManager] Add task fixed engine label (#4326)
* Annotation optimization
* add fixed engine conn label
* Fix FixedEngineLabel get and set value key to sessionId
* Fix kill -9 hive ec not to kill yarn app id
* Fix hive kill task not to kill yarn appid
* If the code has been compiled successfully, run is not compiling
* Update the default number of tasks executed by the serial engine to 50
* [Feature][LinkisManager] Add task fixed engine label #4325
* remove close invoke kill task
---
.../common/conf/GovernaceCommonConf.scala | 2 +
.../service/impl/DefaultEngineConnKillService.java | 2 +-
.../impl/DefaultEngineConnListService.scala | 4 +-
.../executor/conf/ComputationExecutorConf.scala | 90 +++++++++++++++++-----
.../engine/DefaultEngineAskEngineService.scala | 2 +-
.../engine/DefaultEngineCreateService.scala | 40 ++++++----
.../service/engine/DefaultEngineReuseService.scala | 29 +++++--
.../manager/label/constant/LabelKeyConstant.java | 2 +
.../label/entity/engine/FixedEngineConnLabel.java | 70 +++++++++++++++++
.../label/entity/entrance/ExecuteOnceLabel.java | 3 +-
.../errorcode/LabelCommonErrorCodeSummary.java | 17 ++--
.../common/protocol/engine/EngineReuseRequest.java | 10 +++
.../main/resources/linkis-engineconn.properties | 8 +-
.../hive/src/main/resources/log4j2.xml | 2 +-
.../hive/conf/HiveEngineConfiguration.scala | 6 +-
.../hive/executor/HiveEngineConnExecutor.scala | 27 ++++---
16 files changed, 238 insertions(+), 76 deletions(-)
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala
index 5fdc9cf7f..48f906bb8 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/conf/GovernaceCommonConf.scala
@@ -23,6 +23,8 @@ object GovernanceCommonConf {
val CONF_FILTER_RM = "wds.linkis.rm"
+ val WILDCARD_CONSTANT = "*"
+
val SPARK_ENGINE_VERSION = CommonVars("wds.linkis.spark.engine.version", "2.4.3")
val HIVE_ENGINE_VERSION = CommonVars("wds.linkis.hive.engine.version", "1.2.1")
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
index 8c9e829ce..4f593736d 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnKillService.java
@@ -94,7 +94,7 @@ public class DefaultEngineConnKillService implements EngineConnKillService {
return response;
}
- private void killYarnAppIdOfOneEc(EngineConn engineConn) {
+ public void killYarnAppIdOfOneEc(EngineConn engineConn) {
String engineConnInstance = engineConn.getServiceInstance().toString();
logger.info("try to kill yarn app ids in the engine of ({}).", engineConnInstance);
String engineLogDir = engineConn.getEngineConnManagerEnv().engineConnLogDirs();
diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
index b7fbfd50d..4b9a59b4d 100644
--- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
+++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultEngineConnListService.scala
@@ -77,6 +77,8 @@ class DefaultEngineConnListService
Utils.tryAndWarn {
if (NodeStatus.Failed == conn.getStatus && StringUtils.isNotBlank(conn.getPid)) {
killECByEngineConnKillService(conn)
+ } else {
+ getEngineConnKillService().killYarnAppIdOfOneEc(conn)
}
conn.close()
}
@@ -184,7 +186,7 @@ class DefaultEngineConnListService
logger.info(s"start to kill ec by engineConnKillService ${engineconn.getServiceInstance}")
val engineStopRequest = new EngineStopRequest()
engineStopRequest.setServiceInstance(engineconn.getServiceInstance)
- getEngineConnKillService.dealEngineConnStop(engineStopRequest)
+ getEngineConnKillService().dealEngineConnStop(engineStopRequest)
}
}
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala
index 7765cfc96..fec2fe5e7 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala
@@ -22,50 +22,100 @@ import org.apache.linkis.common.conf.{ByteType, CommonVars}
object ComputationExecutorConf {
val ENGINE_RESULT_SET_MAX_CACHE =
- CommonVars("wds.linkis.engine.resultSet.cache.max", new ByteType("0k"))
+ CommonVars("wds.linkis.engine.resultSet.cache.max", new ByteType("0k"), "Result set cache size")
val ENGINE_LOCK_DEFAULT_EXPIRE_TIME =
- CommonVars("wds.linkis.engine.lock.expire.time", 2 * 60 * 1000)
+ CommonVars("wds.linkis.engine.lock.expire.time", 2 * 60 * 1000, "lock expiration time")
- val ENGINE_MAX_TASK_EXECUTE_NUM = CommonVars("wds.linkis.engineconn.max.task.execute.num", 0)
+ val ENGINE_MAX_TASK_EXECUTE_NUM = CommonVars(
+ "wds.linkis.engineconn.max.task.execute.num",
+ 50,
+ "Maximum number of tasks executed by the synchronization EC"
+ )
val ENGINE_PROGRESS_FETCH_INTERVAL =
- CommonVars("wds.linkis.engineconn.progresss.fetch.interval-in-seconds", 5)
+ CommonVars(
+ "wds.linkis.engineconn.progresss.fetch.interval-in-seconds",
+ 5,
+ "Progress information push interval"
+ )
- val UDF_LOAD_FAILED_IGNORE = CommonVars("wds.linkis.engineconn.udf.load.ignore", true)
+ val UDF_LOAD_FAILED_IGNORE =
+ CommonVars("wds.linkis.engineconn.udf.load.ignore", true, "UDF load failed ignore")
- val FUNCTION_LOAD_FAILED_IGNORE = CommonVars("wds.linkis.engineconn.function.load.ignore", true)
+ val FUNCTION_LOAD_FAILED_IGNORE =
+ CommonVars("wds.linkis.engineconn.function.load.ignore", true, "Function load failed ignore")
val TASK_IGNORE_UNCOMPLETED_STATUS =
- CommonVars("wds.linkis.engineconn.task.ignore.uncompleted.status", true).getValue
-
- val ENGINE_CONCURRENT_THREAD_NUM = CommonVars("wds.linkis.engineconn.concurrent.thread.num", 20)
+ CommonVars(
+ "wds.linkis.engineconn.task.ignore.uncompleted.status",
+ true,
+ "Ignore pushes with uncompleted status"
+ ).getValue
+
+ val ENGINE_CONCURRENT_THREAD_NUM = CommonVars(
+ "wds.linkis.engineconn.concurrent.thread.num",
+ 20,
+ "Maximum thread pool of the concurrent EC"
+ )
- val ASYNC_EXECUTE_MAX_PARALLELISM = CommonVars("wds.linkis.engineconn.max.parallelism", 300)
+ val ASYNC_EXECUTE_MAX_PARALLELISM = CommonVars(
+ "wds.linkis.engineconn.max.parallelism",
+ 300,
+ "Maximum parallelism for the asynchronous EC"
+ )
val ASYNC_SCHEDULER_MAX_RUNNING_JOBS =
- CommonVars("wds.linkis.engineconn.async.group.max.running", 10).getValue
+ CommonVars(
+ "wds.linkis.engineconn.async.group.max.running",
+ 10,
+ "Maximum number of running tasks for a group of asynchronous EC"
+ ).getValue
val DEFAULT_COMPUTATION_EXECUTORMANAGER_CLAZZ = CommonVars(
"wds.linkis.default.computation.executormanager.clazz",
- "org.apache.linkis.engineconn.computation.executor.creation.ComputationExecutorManagerImpl"
+ "org.apache.linkis.engineconn.computation.executor.creation.ComputationExecutorManagerImpl",
+ "Executor manager implementation class"
)
val UPSTREAM_MONITOR_ECTASK_SHOULD_START =
- CommonVars("linkis.upstream.monitor.ectask.should.start", true).getValue
+ CommonVars(
+ "linkis.upstream.monitor.ectask.should.start",
+ true,
+ "Enable upstream live monitoring"
+ ).getValue
val UPSTREAM_MONITOR_WRAPPER_ENTRIES_SURVIVE_THRESHOLD_SEC =
- CommonVars("linkis.upstream.monitor.wrapper.entries.survive.time.sec", 86400).getValue
+ CommonVars(
+ "linkis.upstream.monitor.wrapper.entries.survive.time.sec",
+ 86400,
+ "Upstream task cache cleanup threshold"
+ ).getValue
val UPSTREAM_MONITOR_ECTASK_ENTRANCE_THRESHOLD_SEC =
- CommonVars("linkis.upstream.monitor.ectask.entrance.threshold.sec", 15).getValue
-
- val HIVE_RESULTSET_USE_TABLE_NAME = CommonVars("hive.resultset.use.unique.column.names", false)
+ CommonVars(
+ "linkis.upstream.monitor.ectask.entrance.threshold.sec",
+ 15,
+ "Maximum heartbeat time for whether the upstream task is alive"
+ ).getValue
+
+ val HIVE_RESULTSET_USE_TABLE_NAME = CommonVars(
+ "hive.resultset.use.unique.column.names",
+ false,
+ "hive result set to enable unique column names"
+ )
- val JOB_ID_TO_ENV_KEY = CommonVars("wds.linkis.ec.job.id.env.key", "LINKIS_JOB_ID").getValue
+ val JOB_ID_TO_ENV_KEY =
+ CommonVars("wds.linkis.ec.job.id.env.key", "LINKIS_JOB_ID", "LINKIS_JOB_ID ENV").getValue
val TASK_ASYNC_MAX_THREAD_SIZE =
- CommonVars("linkis.ec.task.execution.async.thread.size", 50).getValue
+ CommonVars(
+ "linkis.ec.task.execution.async.thread.size",
+ 50,
+ "Task submit thread pool size"
+ ).getValue
+
+ val TASK_SUBMIT_WAIT_TIME_MS =
+ CommonVars("linkis.ec.task.submit.wait.time.ms", 2L, "Task submit wait time(ms)").getValue
- val TASK_SUBMIT_WAIT_TIME_MS = CommonVars("linkis.ec.task.submit.wait.time.ms", 2L).getValue
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
index 2fc49fe56..4616cf85a 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala
@@ -74,7 +74,7 @@ class DefaultEngineAskEngineService
engineReuseRequest.setLabels(engineAskRequest.getLabels)
engineReuseRequest.setTimeOut(engineAskRequest.getTimeOut)
engineReuseRequest.setUser(engineAskRequest.getUser)
-
+ engineReuseRequest.setProperties(engineAskRequest.getProperties)
val reuseNode = Utils.tryCatch(engineReuseService.reuseEngine(engineReuseRequest, sender)) {
t: Throwable =>
t match {
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
index 763a42e92..d6ceccef9 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala
@@ -118,7 +118,7 @@ class DefaultEngineCreateService
AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong
} else engineCreateRequest.getTimeout
- // 1. 检查Label是否合法
+ // 1. Check if Label is valid
var labelList: util.List[Label[_]] = LabelUtils.distinctLabel(
labelBuilderFactory.getLabels(engineCreateRequest.getLabels),
userLabelService.getUserLabels(engineCreateRequest.getUser)
@@ -145,11 +145,11 @@ class DefaultEngineCreateService
emInstanceLabel.setAlias(ENGINE_CONN_MANAGER_SPRING_NAME.getValue)
emLabelList.add(emInstanceLabel)
- // 2. NodeLabelService getNodesByLabel 获取EMNodeList
+ // 2. Get all available ECMs by labels
val emScoreNodeList =
getEMService().getEMNodes(emLabelList.asScala.filter(!_.isInstanceOf[EngineTypeLabel]).asJava)
- // 3. 执行Select 比如负载过高,返回没有负载低的EM,每个规则如果返回为空就抛出异常
+ // 3. Get the ECM with the lowest load by selection algorithm
val choseNode =
if (null == emScoreNodeList || emScoreNodeList.isEmpty) null
else {
@@ -163,11 +163,11 @@ class DefaultEngineCreateService
)
}
val emNode = choseNode.get.asInstanceOf[EMNode]
- // 4. 请求资源
+ // 4. request resource
val (resourceTicketId, resource) =
requestResource(engineCreateRequest, labelFilter.choseEngineLabel(labelList), emNode, timeout)
- // 5. 封装engineBuildRequest对象,并发送给EM进行执行
+ // 5. build engineConn request
val engineBuildRequest = EngineConnBuildRequestImpl(
resourceTicketId,
labelFilter.choseEngineLabel(labelList),
@@ -179,8 +179,10 @@ class DefaultEngineCreateService
)
)
- // 6. 调用EM发送引擎启动请求调用ASK
- // AM会更新serviceInstance表 需要将ticketID进行替换,并更新 EngineConn的Label 需要修改EngineInstanceLabel 中的id为Instance信息
+ // 6. Call ECM to send engine start request
+ // AM will update the serviceInstance table
+ // It is necessary to replace the ticketID and update the Label of EngineConn
+ // It is necessary to modify the id in EngineInstanceLabel to Instance information
val oldServiceInstance = new ServiceInstance
oldServiceInstance.setApplicationName(GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue)
oldServiceInstance.setInstance(resourceTicketId)
@@ -206,8 +208,8 @@ class DefaultEngineCreateService
s"Task: $taskId finished to create engineConn $engineNode. ticketId is $resourceTicketId"
)
engineNode.setTicketId(resourceTicketId)
- // 7. 更新持久化信息:包括插入engine/metrics
+ // 7.Update persistent information: including inserting engine/metrics
Utils.tryCatch(getEngineNodeManager.updateEngineNode(oldServiceInstance, engineNode)) { t =>
logger.warn(s"Failed to update engineNode $engineNode", t)
val stopEngineRequest =
@@ -230,7 +232,7 @@ class DefaultEngineCreateService
)
}
- // 8. 新增 EngineConn的Label,添加engineConn的Alias
+ // 8. Add the Label of EngineConn, and add the Alias of engineConn
val engineConnAliasLabel = labelBuilderFactory.createLabel(classOf[AliasServiceInstanceLabel])
engineConnAliasLabel.setAlias(GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue)
labelList.add(engineConnAliasLabel)
@@ -262,16 +264,21 @@ class DefaultEngineCreateService
engineNode
}
+ /**
+ * Read the management console configuration and the parameters passed in by the user to combine
+ * request resources
+ * @param engineCreateRequest
+ * @param labelList
+ * @param emNode
+ * @param timeout
+ * @return
+ */
private def requestResource(
engineCreateRequest: EngineCreateRequest,
labelList: util.List[Label[_]],
emNode: EMNode,
timeout: Long
): (String, NodeResource) = {
- // 4. 向RM申请对应EM和用户的资源, 抛出资源不足异常:RetryException
- // 4.1 TODO 如果EM资源不足,触发EM回收空闲的engine
- // 4.2 TODO 如果用户资源不足,触发用户空闲的engine回收
- // 读取管理台的的配置
if (engineCreateRequest.getProperties == null) {
engineCreateRequest.setProperties(new util.HashMap[String, String]())
}
@@ -313,7 +320,7 @@ class DefaultEngineCreateService
}
private def ensuresIdle(engineNode: EngineNode, resourceTicketId: String): Boolean = {
- // TODO 逻辑需要修改,修改为engineConn主动上报
+
val engineNodeInfo = Utils.tryAndWarnMsg(
getEngineNodeManager.getEngineNodeInfoByDB(engineNode)
)("Failed to from db get engine node info")
@@ -324,12 +331,12 @@ class DefaultEngineCreateService
if (canRetry.isDefined) {
throw new LinkisRetryException(
AMConstant.ENGINE_ERROR_CODE,
- s"${engineNode.getServiceInstance} ticketID:$resourceTicketId 初始化引擎失败,原因: ${reason}"
+ s"${engineNode.getServiceInstance} ticketID:$resourceTicketId Failed to initialize engine, reason: ${reason}"
)
}
throw new AMErrorException(
AMConstant.EM_ERROR_CODE,
- s"${engineNode.getServiceInstance} ticketID:$resourceTicketId 初始化引擎失败,原因: ${reason}"
+ s"${engineNode.getServiceInstance} ticketID:$resourceTicketId Failed to initialize engine, reason: ${reason}"
)
}
NodeStatus.isAvailable(engineNodeInfo.getNodeStatus)
@@ -367,7 +374,6 @@ class DefaultEngineCreateService
logger.info(
s"Start to wait engineConn($engineNode) to be available, but only ${ByteTimeUtils.msDurationToString(timeout)} left."
)
- // 获取启动的引擎信息,并等待引擎的状态变为IDLE,如果等待超时则返回给用户,并抛出异常
Utils.waitUntil(
() => ensuresIdle(engineNode, resourceTicketId),
Duration(timeout, TimeUnit.MILLISECONDS)
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala
index 65e41ae1c..5245036f5 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala
@@ -20,6 +20,7 @@ package org.apache.linkis.manager.am.service.engine
import org.apache.linkis.common.exception.LinkisRetryException
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
+import org.apache.linkis.governance.common.utils.JobUtils
import org.apache.linkis.manager.am.conf.AMConfiguration
import org.apache.linkis.manager.am.label.EngineReuseLabelChooser
import org.apache.linkis.manager.am.selector.NodeSelector
@@ -66,10 +67,19 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
@Autowired
private var engineStopService: EngineStopService = _
+ /**
+ * 1. Obtain the EC corresponding to all labels 2. Judging reuse exclusion tags and fixed engine
+ * labels 3. Select the EC with the lowest load available 4. Lock the corresponding EC
+ * @param engineReuseRequest
+ * @param sender
+ * @throws
+ * @return
+ */
@Receiver
@throws[LinkisRetryException]
override def reuseEngine(engineReuseRequest: EngineReuseRequest, sender: Sender): EngineNode = {
- logger.info(s"Start to reuse Engine for request: $engineReuseRequest")
+ val taskId = JobUtils.getJobIdFromStringMap(engineReuseRequest.getProperties)
+ logger.info(s"Task $taskId Start to reuse Engine for request: $engineReuseRequest")
val labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
val labelList = LabelUtils
.distinctLabel(
@@ -86,6 +96,17 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
Array.empty[String]
}
+ if (
+ exclusionInstances.length == 1 && exclusionInstances(
+ 0
+ ) == GovernanceCommonConf.WILDCARD_CONSTANT
+ ) {
+ logger.info(
+ s"Task $taskId exists ReuseExclusionLabel and the configuration does not choose to reuse EC"
+ )
+ return null
+ }
+
var filterLabelList = labelList.filter(_.isInstanceOf[EngineNodeLabel]).asJava
val engineConnAliasLabel = labelBuilderFactory.createLabel(classOf[AliasServiceInstanceLabel])
@@ -121,7 +142,7 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
)
}
var engineScoreList =
- getEngineNodeManager.getEngineNodes(instances.asScala.map(_._1).toSeq.toArray)
+ getEngineNodeManager.getEngineNodes(instances.asScala.keys.toSeq.toArray)
var engine: EngineNode = null
var count = 1
@@ -140,14 +161,10 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
s"Engine reuse exceeds limit: $reuseLimit"
)
}
- // 3. 执行Select 判断label分数、判断是否可用、判断负载
val choseNode = nodeSelector.choseNode(engineScoreList.toArray)
- // 4. 获取Select后排在第一的engine,修改EngineNode的Label为新标签,并调用EngineNodeManager的reuse请求
if (choseNode.isEmpty) {
throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, "No engine can be reused")
}
-
- // 5. 调用EngineNodeManager 进行reuse 如果reuse失败,则去掉该engine进行重新reuse走3和4
val engineNode = choseNode.get.asInstanceOf[EngineNode]
logger.info(s"prepare to reuse engineNode: ${engineNode.getServiceInstance}")
engine = Utils.tryCatch(getEngineNodeManager.reuseEngine(engineNode)) { t: Throwable =>
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
index 742db2be2..bcb14a704 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
@@ -61,4 +61,6 @@ public class LabelKeyConstant {
public static final String REUSE_EXCLUSION_KEY = "reuseExclusion";
public static final String TENANT_KEY = "tenant";
+
+ public static final String FIXED_EC_KEY = "fixedEngineConn";
}
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/FixedEngineConnLabel.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/FixedEngineConnLabel.java
new file mode 100644
index 000000000..d86308f8d
--- /dev/null
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/FixedEngineConnLabel.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.label.entity.engine;
+
+import org.apache.linkis.manager.label.constant.LabelKeyConstant;
+import org.apache.linkis.manager.label.entity.*;
+import org.apache.linkis.manager.label.entity.annon.ValueSerialNum;
+import org.apache.linkis.manager.label.exception.LabelErrorException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+
+import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.CHECK_LABEL_VALUE_EMPTY;
+import static org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.LABEL_ERROR_CODE;
+
+public class FixedEngineConnLabel extends GenericLabel implements EngineNodeLabel, UserModifiable {
+
+ public FixedEngineConnLabel() {
+ setLabelKey(LabelKeyConstant.FIXED_EC_KEY);
+ }
+
+ @ValueSerialNum(0)
+ public void setSessionId(String SessionId) {
+ if (getValue() == null) {
+ setValue(new HashMap<>());
+ }
+ getValue().put("sessionId", SessionId);
+ }
+
+ public String getSessionId() {
+ if (getValue() == null) {
+ return null;
+ }
+ return getValue().get("sessionId");
+ }
+
+ @Override
+ public Feature getFeature() {
+ return Feature.CORE;
+ }
+
+ @Override
+ public void valueCheck(String stringValue) throws LabelErrorException {
+ if (!StringUtils.isBlank(stringValue)) {
+ if (stringValue.split(SerializableLabel.VALUE_SEPARATOR).length != 1) {
+ throw new LabelErrorException(
+ LABEL_ERROR_CODE.getErrorCode(), LABEL_ERROR_CODE.getErrorDesc());
+ }
+ } else {
+ throw new LabelErrorException(
+ CHECK_LABEL_VALUE_EMPTY.getErrorCode(), CHECK_LABEL_VALUE_EMPTY.getErrorDesc());
+ }
+ }
+}
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/entrance/ExecuteOnceLabel.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/entrance/ExecuteOnceLabel.java
index 6f7f143d2..e18631eed 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/entrance/ExecuteOnceLabel.java
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/entrance/ExecuteOnceLabel.java
@@ -18,10 +18,11 @@
package org.apache.linkis.manager.label.entity.entrance;
import org.apache.linkis.manager.label.constant.LabelKeyConstant;
+import org.apache.linkis.manager.label.entity.EngineNodeLabel;
import org.apache.linkis.manager.label.entity.Feature;
import org.apache.linkis.manager.label.entity.GenericLabel;
-public class ExecuteOnceLabel extends GenericLabel {
+public class ExecuteOnceLabel extends GenericLabel implements EngineNodeLabel {
public ExecuteOnceLabel() {
setLabelKey(LabelKeyConstant.EXECUTE_ONCE_KEY);
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java
index 20e0a0a49..a0ebdffe8 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/errorcode/LabelCommonErrorCodeSummary.java
@@ -20,17 +20,18 @@ package org.apache.linkis.manager.label.errorcode;
import org.apache.linkis.common.errorcode.LinkisErrorCode;
public enum LabelCommonErrorCodeSummary implements LinkisErrorCode {
- UPDATE_LABEL_FAILED(40001, "Update label realtion failed(更新标签属性失败)"),
+ UPDATE_LABEL_FAILED(25001, "Update label realtion failed(更新标签属性失败)"),
LABEL_ERROR_CODE(
- 40001,
+ 25002,
"The value of the label is set incorrectly, only one value can be set, and the separator symbol '-' cannot be used(标签的值设置错误,只能设置一个值,不能使用分割符符号 '-') "),
- FAILED_BUILD_COMBINEDLABEL(40001, "Failed to build combinedLabel(构建组合标签失败)"),
- FAILED_READ_INPUT_STREAM(40001, "Fail to read value input stream(读取值输入流失败)"),
- FAILED_CONSTRUCT_INSTANCE(40001, "Fail to construct a label instance of:{0}(未能构建标签实例)"),
- NOT_SUPPORT_ENVTYPE(40001, "Not support envType:{0}(不支持 envType)"),
+ FAILED_BUILD_COMBINEDLABEL(25003, "Failed to build combinedLabel(构建组合标签失败)"),
+ FAILED_READ_INPUT_STREAM(25004, "Fail to read value input stream(读取值输入流失败)"),
+ FAILED_CONSTRUCT_INSTANCE(25005, "Fail to construct a label instance of:{0}(未能构建标签实例)"),
+ NOT_SUPPORT_ENVTYPE(25006, "Not support envType:{0}(不支持 envType)"),
CHECK_LABEL_REMOVE_REQUEST(
- 130001,
- "ServiceInstance in request is null, please check label remove request(请求中的 ServiceInstance 为空,请检查标签删除请求)");
+ 25007,
+ "ServiceInstance in request is null, please check label remove request(请求中的 ServiceInstance 为空,请检查标签删除请求)"),
+ CHECK_LABEL_VALUE_EMPTY(25008, "Label value cannot be empty");
/** (errorCode)错误码 */
private final int errorCode;
diff --git a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineReuseRequest.java b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineReuseRequest.java
index e8f2defb4..0f3b16141 100644
--- a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineReuseRequest.java
+++ b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/protocol/engine/EngineReuseRequest.java
@@ -29,6 +29,8 @@ public class EngineReuseRequest implements EngineRequest {
private String user;
+ private Map<String, String> properties;
+
public Map<String, Object> getLabels() {
return labels;
}
@@ -62,6 +64,14 @@ public class EngineReuseRequest implements EngineRequest {
this.user = user;
}
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
@Override
public String toString() {
return "EngineReuseRequest{"
diff --git a/linkis-engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
index a2135181f..7fa9cab8e 100644
--- a/linkis-engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
+++ b/linkis-engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
@@ -15,13 +15,11 @@
#
wds.linkis.server.version=v1
-wds.linkis.engineconn.debug.enable=true
#wds.linkis.keytab.enable=true
wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.hive.HiveEngineConnPlugin
-wds.linkis.bdp.hive.init.sql.enable=true
+#wds.linkis.bdp.hive.init.sql.enable=true
wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineplugin.hive.hook.HiveAddJarsEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook,org.apache.linkis.engineconn.computation.executor.hook.HiveUseDatabaseEngineHook,org.apache.linkis.engineconn.computation.executor.hook.HiveInitSQLHook
-wds.linkis.engineconn.maintain.enable=true
-
+#wds.linkis.engineconn.maintain.enable=true
#Depending on the engine selected in HIVE_ENGINE_TYPE, control the function called when canceling the task in scripts.
-wds.linkis.hive.engine.type=mr
\ No newline at end of file
+linkis.hive.engine.type=mr
\ No newline at end of file
diff --git a/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
index d490defe9..e6f8b150f 100644
--- a/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
@@ -28,7 +28,7 @@
</RollingFile>
<File name="YarnAppIdOutputFile" append="true" fileName="${env:LOG_DIRS:-logs}/yarnApp.log">
- <RegexFilter regex=".* application .*" onMatch="ACCEPT" onMismatch="DENY"/>
+ <RegexFilter regex=".*application_.*" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%-5level] [%-40t] %c{1.} (%L) [%M] - %msg%xEx%n"/>
</File>
diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala
index 047157b78..255800084 100644
--- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala
+++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/conf/HiveEngineConfiguration.scala
@@ -27,10 +27,10 @@ object HiveEngineConfiguration {
)
val ENABLE_FETCH_BASE64 =
- CommonVars[Boolean]("wds.linkis.hive.enable.fetch.base64", false).getValue
+ CommonVars[Boolean]("linkis.hive.enable.fetch.base64", false).getValue
val BASE64_SERDE_CLASS = CommonVars[String](
- "wds.linkis.hive.base64.serde.class",
+ "linkis.hive.base64.serde.class",
"org.apache.linkis.engineplugin.hive.serde.CustomerDelimitedJSONSerDe"
).getValue
@@ -39,5 +39,5 @@ object HiveEngineConfiguration {
CommonVars[String]("HIVE_AUX_JARS_PATH", "").getValue
).getValue
- val HIVE_ENGINE_TYPE = CommonVars[String]("wds.linkis.hive.engine.type", "mr").getValue
+ val HIVE_ENGINE_TYPE = CommonVars[String]("linkis.hive.engine.type", "mr").getValue
}
diff --git a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index b50d3c68a..1a1da03cc 100644
--- a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -28,20 +28,17 @@ import org.apache.linkis.engineconn.core.EngineConnObject
import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
import org.apache.linkis.engineplugin.hive.conf.{Counters, HiveEngineConfiguration}
import org.apache.linkis.engineplugin.hive.cs.CSHiveHelper
-import org.apache.linkis.engineplugin.hive.errorcode.HiveErrorCodeSummary.COMPILE_HIVE_QUERY_ERROR
-import org.apache.linkis.engineplugin.hive.errorcode.HiveErrorCodeSummary.GET_FIELD_SCHEMAS_ERROR
+import org.apache.linkis.engineplugin.hive.errorcode.HiveErrorCodeSummary.{
+ COMPILE_HIVE_QUERY_ERROR,
+ GET_FIELD_SCHEMAS_ERROR
+}
import org.apache.linkis.engineplugin.hive.exception.HiveQueryFailedException
import org.apache.linkis.engineplugin.hive.progress.HiveProgressHelper
import org.apache.linkis.governance.common.paser.SQLCodeParser
import org.apache.linkis.governance.common.utils.JobUtils
import org.apache.linkis.hadoop.common.conf.HadoopConf
-import org.apache.linkis.manager.common.entity.resource.{
- CommonNodeResource,
- LoadInstanceResource,
- NodeResource
-}
+import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource, NodeResource}
import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
-import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.protocol.engine.JobProgressInfo
@@ -54,7 +51,6 @@ import org.apache.linkis.storage.domain.{Column, DataType}
import org.apache.linkis.storage.resultset.ResultSetFactory
import org.apache.linkis.storage.resultset.table.{TableMetaData, TableRecord}
-import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.hive.common.HiveInterruptUtils
import org.apache.hadoop.hive.conf.HiveConf
@@ -106,8 +102,6 @@ class HiveEngineConnExecutor(
private var singleLineProgress: Float = 0.0f
- private var stage: Int = 0
-
private var engineExecutorContext: EngineExecutionContext = _
private val singleCodeCompleted: AtomicBoolean = new AtomicBoolean(false)
@@ -227,10 +221,18 @@ class HiveEngineConnExecutor(
if (numberOfMRJobs > 0) {
engineExecutorContext.appendStdout(s"Your hive sql has $numberOfMRJobs MR jobs to do")
}
+ if (thread.isInterrupted) {
+ logger.error(
+ "The thread of execution has been interrupted and the task should be terminated"
+ )
+ return ErrorExecuteResponse(
+ "The thread of execution has been interrupted and the task should be terminated",
+ null
+ )
+ }
val hiveResponse: CommandProcessorResponse = driver.run(realCode, compileRet == 0)
if (hiveResponse.getResponseCode != 0) {
LOG.error("Hive query failed, response code is {}", hiveResponse.getResponseCode)
- // todo check uncleared context ?
return ErrorExecuteResponse(hiveResponse.getErrorMessage, hiveResponse.getException)
}
engineExecutorContext.appendStdout(
@@ -541,6 +543,7 @@ class HiveEngineConnExecutor(
case "mr" =>
HadoopJobExecHelper.killRunningJobs()
Utils.tryQuietly(HiveInterruptUtils.interrupt())
+ Utils.tryAndWarn(driver.close())
if (null != thread) {
Utils.tryAndWarn(thread.interrupt())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org