You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2023/07/27 02:23:39 UTC
[linkis] 02/02: add startup params to once task job context
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
commit c73dd7f276fca34c40ef64be7833c9004bc08433
Author: peacewong <wp...@gmail.com>
AuthorDate: Wed Jul 26 22:44:31 2023 +0800
add startup params to once task job context
---
.../common/utils/OnceExecutorContentUtils.scala | 10 ++++----
.../entrance/interceptor/OnceJobInterceptor.scala | 2 +-
.../service/engine/DefaultEngineReuseService.java | 30 +++++++++++++---------
3 files changed, 24 insertions(+), 18 deletions(-)
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala
index dd4b9bcff..2c426339b 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala
@@ -58,14 +58,14 @@ object OnceExecutorContentUtils {
def mapToContent(contentMap: util.Map[String, Object]): OnceExecutorContent = {
val onceExecutorContent = new OnceExecutorContent
- implicit def getOrNull(key: String): util.Map[String, Object] = contentMap.get(key) match {
+ def getOrNull(key: String): util.Map[String, Object] = contentMap.get(key) match {
case map: util.Map[String, Object] => map
case _ => null
}
- onceExecutorContent.setJobContent(TaskConstant.JOB_CONTENT)
- onceExecutorContent.setRuntimeMap(TaskConstant.PARAMS_CONFIGURATION_RUNTIME)
- onceExecutorContent.setSourceMap(TaskConstant.SOURCE)
- onceExecutorContent.setVariableMap(TaskConstant.PARAMS_VARIABLE)
+ onceExecutorContent.setJobContent(getOrNull(TaskConstant.JOB_CONTENT))
+ onceExecutorContent.setRuntimeMap(getOrNull(TaskConstant.PARAMS_CONFIGURATION_RUNTIME))
+ onceExecutorContent.setSourceMap(getOrNull(TaskConstant.SOURCE))
+ onceExecutorContent.setVariableMap(getOrNull(TaskConstant.PARAMS_VARIABLE))
onceExecutorContent
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala
index 1291a8566..9b0578980 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala
@@ -94,8 +94,8 @@ class OnceJobInterceptor extends EntranceInterceptor {
s"/tmp/${task.getExecuteUser}/${task.getId}"
protected def getJobContent(task: JobRequest): util.Map[String, AnyRef] = {
- // TODO Wait for optimizing since the class `JobRequest` is waiting for optimizing .
val jobContent = new util.HashMap[String, AnyRef]
+ jobContent.putAll(TaskUtils.getStartupMap(task.getParams))
jobContent.put(TaskConstant.CODE, task.getExecutionCode)
task.getLabels.foreach {
case label: CodeLanguageLabel =>
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java
index 221df4b93..5f372cae6 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java
@@ -164,6 +164,11 @@ public class DefaultEngineReuseService extends AbstractEngineService implements
instances.keySet().toArray(new ScoreServiceInstance[0]);
EngineNode[] engineScoreList = getEngineNodeManager().getEngineNodes(scoreServiceInstances);
+ if (null == engineScoreList || engineScoreList.length == 0) {
+ throw new LinkisRetryException(
+ AMConstant.ENGINE_ERROR_CODE, "No engine can be reused, cause from db is null");
+ }
+
List<EngineNode> engines = Lists.newArrayList();
long timeout =
engineReuseRequest.getTimeOut() <= 0
@@ -177,8 +182,10 @@ public class DefaultEngineReuseService extends AbstractEngineService implements
long startTime = System.currentTimeMillis();
try {
MutablePair<Integer, Integer> limitPair = MutablePair.of(1, reuseLimit);
+ List<EngineNode> canReuseEcList = new ArrayList<>();
+ CollectionUtils.addAll(canReuseEcList, engineScoreList);
LinkisUtils.waitUntil(
- () -> selectEngineToReuse(limitPair, engines, engineScoreList),
+ () -> selectEngineToReuse(limitPair, engines, canReuseEcList),
Duration.ofMillis(timeout));
} catch (TimeoutException e) {
throw new LinkisRetryException(
@@ -220,19 +227,22 @@ public class DefaultEngineReuseService extends AbstractEngineService implements
public boolean selectEngineToReuse(
MutablePair<Integer, Integer> count2reuseLimit,
List<EngineNode> engines,
- EngineNode[] engineScoreList) {
+ List<EngineNode> canReuseEcList) {
if (count2reuseLimit.getLeft() > count2reuseLimit.getRight()) {
throw new LinkisRetryException(
AMConstant.ENGINE_ERROR_CODE,
- "Engine reuse exceeds limit: " + count2reuseLimit.getRight());
+ "Engine reuse exceeds limit: " + count2reuseLimit.getLeft());
}
- count2reuseLimit.setLeft(count2reuseLimit.getLeft() + 1);
- Optional<Node> choseNode = nodeSelector.choseNode(engineScoreList);
+
+ Optional<Node> choseNode = nodeSelector.choseNode(canReuseEcList.toArray(new Node[0]));
if (!choseNode.isPresent()) {
throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, "No engine can be reused");
}
EngineNode engineNode = (EngineNode) choseNode.get();
- logger.info("prepare to reuse engineNode: " + engineNode.getServiceInstance());
+ logger.info(
+ "prepare to reuse engineNode: {} times {}",
+ engineNode.getServiceInstance(),
+ count2reuseLimit.getLeft());
EngineNode reuseEngine =
LinkisUtils.tryCatch(
@@ -254,12 +264,8 @@ public class DefaultEngineReuseService extends AbstractEngineService implements
}
if (CollectionUtils.isEmpty(engines)) {
- Integer count = count2reuseLimit.getKey() + 1;
- count2reuseLimit.setLeft(count);
- engineScoreList =
- Arrays.stream(engineScoreList)
- .filter(node -> !node.equals(choseNode.get()))
- .toArray(EngineNode[]::new);
+ count2reuseLimit.setLeft(count2reuseLimit.getLeft() + 1);
+ canReuseEcList.remove(choseNode.get());
}
return CollectionUtils.isNotEmpty(engines);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org