You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/05/31 03:23:20 UTC

[incubator-linkis] branch dev-1.1.3 updated: add RetryCountLabel and retrycount param (#2164)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.1.3 by this push:
     new 891c300c9 add RetryCountLabel and retrycount param (#2164)
891c300c9 is described below

commit 891c300c92e3d65b31eff97bfb7a96335cbd6136
Author: Alexyang <xu...@qq.com>
AuthorDate: Tue May 31 11:23:16 2022 +0800

    add RetryCountLabel and retrycount param (#2164)
    
    * add support for retry age in runtime map.
    
    * 1. label-common - add RetryCountLabel
    2. orchestrator-core - add support for retryCount in RetryExecTask
    
    * RetryCountLabel - add license header
---
 .../manager/label/constant/LabelKeyConstant.java   |  1 +
 .../label/entity/entrance/RetryCountLabel.java     | 46 ++++++++++++++++++++++
 .../reheater/PruneTaskReheaterTransform.scala      |  5 ++-
 .../conf/ComputationOrchestratorConf.scala         |  2 -
 .../conf/OrchestratorConfiguration.scala           |  2 +
 .../plans/physical/RetryExecTask.scala             | 25 +++++++++++-
 6 files changed, 77 insertions(+), 4 deletions(-)

diff --git a/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java b/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
index 9a951bfbf..18aec9b65 100644
--- a/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
+++ b/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
@@ -53,6 +53,7 @@ public class LabelKeyConstant {
 
     public static final String RETRY_TIMEOUT_KEY = "jobRetryTimeout";
 
+    public static final String RETRY_COUNT_KEY = "jobRetryCount";
     public static final String EXECUTE_ONCE_KEY = "executeOnce";
 
     public static final String LOAD_BALANCE_KEY = "loadBalance";
diff --git a/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/entity/entrance/RetryCountLabel.java b/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/entity/entrance/RetryCountLabel.java
new file mode 100644
index 000000000..01dda4b21
--- /dev/null
+++ b/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/entity/entrance/RetryCountLabel.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.label.entity.entrance;
+
+import org.apache.linkis.manager.label.constant.LabelKeyConstant;
+import org.apache.linkis.manager.label.entity.GenericLabel;
+import org.apache.linkis.manager.label.entity.annon.ValueSerialNum;
+
+import java.util.HashMap;
+
+public class RetryCountLabel extends GenericLabel implements JobStrategyLabel {
+
+    public RetryCountLabel() {
+        setLabelKey(LabelKeyConstant.RETRY_COUNT_KEY);
+    }
+
+    public Integer getJobRetryCount() {
+        if (null == getValue()) {
+            return -1;
+        }
+        return Integer.parseInt(getValue().getOrDefault(LabelKeyConstant.RETRY_COUNT_KEY, "-1"));
+    }
+
+    @ValueSerialNum(0)
+    public void setJobRetryCount(String count) {
+        if (null == getValue()) {
+            setValue(new HashMap<>());
+        }
+        getValue().put(LabelKeyConstant.RETRY_COUNT_KEY, count);
+    }
+}
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala
index 3c4df038f..7bd5fb2fd 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala
@@ -30,6 +30,9 @@ import org.apache.linkis.orchestrator.listener.task.TaskLogEvent
 import org.apache.linkis.orchestrator.plans.physical.{ExecTask, PhysicalContext, PhysicalOrchestration, ReheatableExecTask, RetryExecTask}
 import org.apache.linkis.orchestrator.strategy.ExecTaskStatusInfo
 
+import java.util
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
 /**
  * Transform physical tree by pruning it's nodes
  *
@@ -52,7 +55,7 @@ class PruneTaskRetryTransform extends ReheaterTransform with Logging{
                   Utils.tryCatch{
                     task match {
                       case retryExecTask: RetryExecTask => {
-                        if (retryExecTask.getAge() < ComputationOrchestratorConf.RETRYTASK_MAXIMUM_AGE.getValue) {
+                        if (retryExecTask.getAge() < retryExecTask.getMaxRetryCount()) {
                           val newTask = new RetryExecTask(retryExecTask.getOriginTask, retryExecTask.getAge() + 1)
                           newTask.initialize(retryExecTask.getPhysicalContext)
                           TreeNodeUtil.replaceNode(retryExecTask, newTask)
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala
index 2d9f65bbc..155128d0e 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala
@@ -47,8 +47,6 @@ object ComputationOrchestratorConf {
 
   val LOG_LEN = CommonVars("wds.linkis.computation.orchestrator.log.len", 100)
 
-  val RETRYTASK_MAXIMUM_AGE = CommonVars("wds.linkis.computation.orchestrator.retry.max.age", 10)
-
 
   val ENGINECONN_LASTUPDATE_TIMEOUT = CommonVars("wds.linkis.orchestrator.engine.lastupdate.timeout", new TimeType("5s"))
   val ENGINECONN_ACTIVITY_TIMEOUT = CommonVars("wds.linkis.orchestrator.engine.timeout", new TimeType("10s"))
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala
index e758fceb2..5c31cd9c7 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala
@@ -54,6 +54,8 @@ object OrchestratorConfiguration {
 
   val RETRY_TASK_WAIT_TIME = CommonVars("wds.linkis.orchestrator.task.retry.wait.time", 30000)
 
+  val RETRYTASK_MAXIMUM_AGE = CommonVars("wds.linkis.computation.orchestrator.retry.max.age", 10)
+
   val SCHEDULER_RETRY_TASK_WAIT_TIME = CommonVars("wds.linkis.orchestrator.task.scheduler.retry.wait.time", 100000)
 
   val TASK_SCHEDULER_THREAD_POOL = CommonVars("wds.linkis.orchestrator.task.scheduler.thread.pool", 200)
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/physical/RetryExecTask.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/physical/RetryExecTask.scala
index ee0134728..51d94fffa 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/physical/RetryExecTask.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/physical/RetryExecTask.scala
@@ -17,8 +17,9 @@
  
 package org.apache.linkis.orchestrator.plans.physical
 
+import org.apache.linkis.common.utils.Utils
 import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.manager.label.entity.entrance.RetryWaitTimeOutLabel
+import org.apache.linkis.manager.label.entity.entrance.{RetryCountLabel, RetryWaitTimeOutLabel}
 import org.apache.linkis.manager.label.utils.LabelUtil
 import org.apache.linkis.orchestrator.conf.OrchestratorConfiguration
 import org.apache.linkis.orchestrator.exception.{OrchestratorErrorCodeSummary, OrchestratorErrorException}
@@ -29,6 +30,9 @@ import org.apache.linkis.orchestrator.strategy.{ResultSetExecTask, StatusInfoExe
 import org.apache.linkis.orchestrator.strategy.async.AsyncExecTask
 import org.apache.linkis.orchestrator.utils.OrchestratorIDCreator
 
+import java.util
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
 
 class RetryExecTask(private val originTask: ExecTask, private val age: Int = 1) extends AbstractExecTask
   with StatusInfoExecTask with ResultSetExecTask with AsyncExecTask{
@@ -55,6 +59,25 @@ class RetryExecTask(private val originTask: ExecTask, private val age: Int = 1)
     waitTime
   }
 
+  def getMaxRetryCount(): Integer = {
+    var count = -1
+    val retryCountLabel = LabelUtil.getLabelFromList[RetryCountLabel](getLabels)
+    if (null != retryCountLabel) {
+      count = retryCountLabel.getJobRetryCount
+    } else {
+      val runtimeMap = new util.HashMap[String, String]()
+      Utils.tryAndWarn {
+        getTaskDesc.getOrigin.getASTOrchestration.getASTContext.getParams.getRuntimeParams.toMap.asScala.foreach(kv => {
+          if (kv._2.isInstanceOf[String]) {
+            runtimeMap.put(kv._1, kv._2.asInstanceOf[String])
+          }
+        })
+        count = OrchestratorConfiguration.RETRYTASK_MAXIMUM_AGE.getValue(runtimeMap)
+      }
+    }
+    count
+  }
+
   def getOriginTask: ExecTask = {
     originTask
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org