You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2023/07/27 02:23:39 UTC

[linkis] 02/02: add startup params to once task job context

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git

commit c73dd7f276fca34c40ef64be7833c9004bc08433
Author: peacewong <wp...@gmail.com>
AuthorDate: Wed Jul 26 22:44:31 2023 +0800

    add startup params to  once task job context
---
 .../common/utils/OnceExecutorContentUtils.scala    | 10 ++++----
 .../entrance/interceptor/OnceJobInterceptor.scala  |  2 +-
 .../service/engine/DefaultEngineReuseService.java  | 30 +++++++++++++---------
 3 files changed, 24 insertions(+), 18 deletions(-)

diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala
index dd4b9bcff..2c426339b 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/OnceExecutorContentUtils.scala
@@ -58,14 +58,14 @@ object OnceExecutorContentUtils {
 
   def mapToContent(contentMap: util.Map[String, Object]): OnceExecutorContent = {
     val onceExecutorContent = new OnceExecutorContent
-    implicit def getOrNull(key: String): util.Map[String, Object] = contentMap.get(key) match {
+    def getOrNull(key: String): util.Map[String, Object] = contentMap.get(key) match {
       case map: util.Map[String, Object] => map
       case _ => null
     }
-    onceExecutorContent.setJobContent(TaskConstant.JOB_CONTENT)
-    onceExecutorContent.setRuntimeMap(TaskConstant.PARAMS_CONFIGURATION_RUNTIME)
-    onceExecutorContent.setSourceMap(TaskConstant.SOURCE)
-    onceExecutorContent.setVariableMap(TaskConstant.PARAMS_VARIABLE)
+    onceExecutorContent.setJobContent(getOrNull(TaskConstant.JOB_CONTENT))
+    onceExecutorContent.setRuntimeMap(getOrNull(TaskConstant.PARAMS_CONFIGURATION_RUNTIME))
+    onceExecutorContent.setSourceMap(getOrNull(TaskConstant.SOURCE))
+    onceExecutorContent.setVariableMap(getOrNull(TaskConstant.PARAMS_VARIABLE))
     onceExecutorContent
   }
 
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala
index 1291a8566..9b0578980 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/OnceJobInterceptor.scala
@@ -94,8 +94,8 @@ class OnceJobInterceptor extends EntranceInterceptor {
     s"/tmp/${task.getExecuteUser}/${task.getId}"
 
   protected def getJobContent(task: JobRequest): util.Map[String, AnyRef] = {
-    // TODO Wait for optimizing since the class `JobRequest` is waiting for optimizing .
     val jobContent = new util.HashMap[String, AnyRef]
+    jobContent.putAll(TaskUtils.getStartupMap(task.getParams))
     jobContent.put(TaskConstant.CODE, task.getExecutionCode)
     task.getLabels.foreach {
       case label: CodeLanguageLabel =>
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java
index 221df4b93..5f372cae6 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.java
@@ -164,6 +164,11 @@ public class DefaultEngineReuseService extends AbstractEngineService implements
         instances.keySet().toArray(new ScoreServiceInstance[0]);
     EngineNode[] engineScoreList = getEngineNodeManager().getEngineNodes(scoreServiceInstances);
 
+    if (null == engineScoreList || engineScoreList.length == 0) {
+      throw new LinkisRetryException(
+          AMConstant.ENGINE_ERROR_CODE, "No engine can be reused, cause from db is null");
+    }
+
     List<EngineNode> engines = Lists.newArrayList();
     long timeout =
         engineReuseRequest.getTimeOut() <= 0
@@ -177,8 +182,10 @@ public class DefaultEngineReuseService extends AbstractEngineService implements
     long startTime = System.currentTimeMillis();
     try {
       MutablePair<Integer, Integer> limitPair = MutablePair.of(1, reuseLimit);
+      List<EngineNode> canReuseEcList = new ArrayList<>();
+      CollectionUtils.addAll(canReuseEcList, engineScoreList);
       LinkisUtils.waitUntil(
-          () -> selectEngineToReuse(limitPair, engines, engineScoreList),
+          () -> selectEngineToReuse(limitPair, engines, canReuseEcList),
           Duration.ofMillis(timeout));
     } catch (TimeoutException e) {
       throw new LinkisRetryException(
@@ -220,19 +227,22 @@ public class DefaultEngineReuseService extends AbstractEngineService implements
   public boolean selectEngineToReuse(
       MutablePair<Integer, Integer> count2reuseLimit,
       List<EngineNode> engines,
-      EngineNode[] engineScoreList) {
+      List<EngineNode> canReuseEcList) {
     if (count2reuseLimit.getLeft() > count2reuseLimit.getRight()) {
       throw new LinkisRetryException(
           AMConstant.ENGINE_ERROR_CODE,
-          "Engine reuse exceeds limit: " + count2reuseLimit.getRight());
+          "Engine reuse exceeds limit: " + count2reuseLimit.getLeft());
     }
-    count2reuseLimit.setLeft(count2reuseLimit.getLeft() + 1);
-    Optional<Node> choseNode = nodeSelector.choseNode(engineScoreList);
+
+    Optional<Node> choseNode = nodeSelector.choseNode(canReuseEcList.toArray(new Node[0]));
     if (!choseNode.isPresent()) {
       throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, "No engine can be reused");
     }
     EngineNode engineNode = (EngineNode) choseNode.get();
-    logger.info("prepare to reuse engineNode: " + engineNode.getServiceInstance());
+    logger.info(
+        "prepare to reuse engineNode: {} times {}",
+        engineNode.getServiceInstance(),
+        count2reuseLimit.getLeft());
 
     EngineNode reuseEngine =
         LinkisUtils.tryCatch(
@@ -254,12 +264,8 @@ public class DefaultEngineReuseService extends AbstractEngineService implements
     }
 
     if (CollectionUtils.isEmpty(engines)) {
-      Integer count = count2reuseLimit.getKey() + 1;
-      count2reuseLimit.setLeft(count);
-      engineScoreList =
-          Arrays.stream(engineScoreList)
-              .filter(node -> !node.equals(choseNode.get()))
-              .toArray(EngineNode[]::new);
+      count2reuseLimit.setLeft(count2reuseLimit.getLeft() + 1);
+      canReuseEcList.remove(choseNode.get());
     }
     return CollectionUtils.isNotEmpty(engines);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org