You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/05/31 03:23:20 UTC
[incubator-linkis] branch dev-1.1.3 updated: add RetryCountLabel and retrycount param (#2164)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.1.3 by this push:
new 891c300c9 add RetryCountLabel and retrycount param (#2164)
891c300c9 is described below
commit 891c300c92e3d65b31eff97bfb7a96335cbd6136
Author: Alexyang <xu...@qq.com>
AuthorDate: Tue May 31 11:23:16 2022 +0800
add RetryCountLabel and retrycount param (#2164)
* add support for retry age in runtime map.
* 1. label-common - add RetryCountLabel
2. orchestrator-core - add support for retryCount in RetryExecTask
* RetryCountLabel - add license header
---
.../manager/label/constant/LabelKeyConstant.java | 1 +
.../label/entity/entrance/RetryCountLabel.java | 46 ++++++++++++++++++++++
.../reheater/PruneTaskReheaterTransform.scala | 5 ++-
.../conf/ComputationOrchestratorConf.scala | 2 -
.../conf/OrchestratorConfiguration.scala | 2 +
.../plans/physical/RetryExecTask.scala | 25 +++++++++++-
6 files changed, 77 insertions(+), 4 deletions(-)
diff --git a/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java b/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
index 9a951bfbf..18aec9b65 100644
--- a/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
+++ b/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelKeyConstant.java
@@ -53,6 +53,7 @@ public class LabelKeyConstant {
public static final String RETRY_TIMEOUT_KEY = "jobRetryTimeout";
+ public static final String RETRY_COUNT_KEY = "jobRetryCount";
public static final String EXECUTE_ONCE_KEY = "executeOnce";
public static final String LOAD_BALANCE_KEY = "loadBalance";
diff --git a/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/entity/entrance/RetryCountLabel.java b/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/entity/entrance/RetryCountLabel.java
new file mode 100644
index 000000000..01dda4b21
--- /dev/null
+++ b/linkis-computation-governance/linkis-manager/label-common/src/main/java/org/apache/linkis/manager/label/entity/entrance/RetryCountLabel.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.label.entity.entrance;
+
+import org.apache.linkis.manager.label.constant.LabelKeyConstant;
+import org.apache.linkis.manager.label.entity.GenericLabel;
+import org.apache.linkis.manager.label.entity.annon.ValueSerialNum;
+
+import java.util.HashMap;
+
+public class RetryCountLabel extends GenericLabel implements JobStrategyLabel {
+
+ public RetryCountLabel() {
+ setLabelKey(LabelKeyConstant.RETRY_COUNT_KEY);
+ }
+
+ public Integer getJobRetryCount() {
+ if (null == getValue()) {
+ return -1;
+ }
+ return Integer.parseInt(getValue().getOrDefault(LabelKeyConstant.RETRY_COUNT_KEY, "-1"));
+ }
+
+ @ValueSerialNum(0)
+ public void setJobRetryCount(String count) {
+ if (null == getValue()) {
+ setValue(new HashMap<>());
+ }
+ getValue().put(LabelKeyConstant.RETRY_COUNT_KEY, count);
+ }
+}
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala
index 3c4df038f..7bd5fb2fd 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala
@@ -30,6 +30,9 @@ import org.apache.linkis.orchestrator.listener.task.TaskLogEvent
import org.apache.linkis.orchestrator.plans.physical.{ExecTask, PhysicalContext, PhysicalOrchestration, ReheatableExecTask, RetryExecTask}
import org.apache.linkis.orchestrator.strategy.ExecTaskStatusInfo
+import java.util
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
/**
* Transform physical tree by pruning it's nodes
*
@@ -52,7 +55,7 @@ class PruneTaskRetryTransform extends ReheaterTransform with Logging{
Utils.tryCatch{
task match {
case retryExecTask: RetryExecTask => {
- if (retryExecTask.getAge() < ComputationOrchestratorConf.RETRYTASK_MAXIMUM_AGE.getValue) {
+ if (retryExecTask.getAge() < retryExecTask.getMaxRetryCount()) {
val newTask = new RetryExecTask(retryExecTask.getOriginTask, retryExecTask.getAge() + 1)
newTask.initialize(retryExecTask.getPhysicalContext)
TreeNodeUtil.replaceNode(retryExecTask, newTask)
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala
index 2d9f65bbc..155128d0e 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/conf/ComputationOrchestratorConf.scala
@@ -47,8 +47,6 @@ object ComputationOrchestratorConf {
val LOG_LEN = CommonVars("wds.linkis.computation.orchestrator.log.len", 100)
- val RETRYTASK_MAXIMUM_AGE = CommonVars("wds.linkis.computation.orchestrator.retry.max.age", 10)
-
val ENGINECONN_LASTUPDATE_TIMEOUT = CommonVars("wds.linkis.orchestrator.engine.lastupdate.timeout", new TimeType("5s"))
val ENGINECONN_ACTIVITY_TIMEOUT = CommonVars("wds.linkis.orchestrator.engine.timeout", new TimeType("10s"))
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala
index e758fceb2..5c31cd9c7 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala
@@ -54,6 +54,8 @@ object OrchestratorConfiguration {
val RETRY_TASK_WAIT_TIME = CommonVars("wds.linkis.orchestrator.task.retry.wait.time", 30000)
+ val RETRYTASK_MAXIMUM_AGE = CommonVars("wds.linkis.computation.orchestrator.retry.max.age", 10)
+
val SCHEDULER_RETRY_TASK_WAIT_TIME = CommonVars("wds.linkis.orchestrator.task.scheduler.retry.wait.time", 100000)
val TASK_SCHEDULER_THREAD_POOL = CommonVars("wds.linkis.orchestrator.task.scheduler.thread.pool", 200)
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/physical/RetryExecTask.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/physical/RetryExecTask.scala
index ee0134728..51d94fffa 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/physical/RetryExecTask.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/physical/RetryExecTask.scala
@@ -17,8 +17,9 @@
package org.apache.linkis.orchestrator.plans.physical
+import org.apache.linkis.common.utils.Utils
import org.apache.linkis.manager.label.entity.Label
-import org.apache.linkis.manager.label.entity.entrance.RetryWaitTimeOutLabel
+import org.apache.linkis.manager.label.entity.entrance.{RetryCountLabel, RetryWaitTimeOutLabel}
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.orchestrator.conf.OrchestratorConfiguration
import org.apache.linkis.orchestrator.exception.{OrchestratorErrorCodeSummary, OrchestratorErrorException}
@@ -29,6 +30,9 @@ import org.apache.linkis.orchestrator.strategy.{ResultSetExecTask, StatusInfoExe
import org.apache.linkis.orchestrator.strategy.async.AsyncExecTask
import org.apache.linkis.orchestrator.utils.OrchestratorIDCreator
+import java.util
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
class RetryExecTask(private val originTask: ExecTask, private val age: Int = 1) extends AbstractExecTask
with StatusInfoExecTask with ResultSetExecTask with AsyncExecTask{
@@ -55,6 +59,25 @@ class RetryExecTask(private val originTask: ExecTask, private val age: Int = 1)
waitTime
}
+ def getMaxRetryCount(): Integer = {
+ var count = -1
+ val retryCountLabel = LabelUtil.getLabelFromList[RetryCountLabel](getLabels)
+ if (null != retryCountLabel) {
+ count = retryCountLabel.getJobRetryCount
+ } else {
+ val runtimeMap = new util.HashMap[String, String]()
+ Utils.tryAndWarn {
+ getTaskDesc.getOrigin.getASTOrchestration.getASTContext.getParams.getRuntimeParams.toMap.asScala.foreach(kv => {
+ if (kv._2.isInstanceOf[String]) {
+ runtimeMap.put(kv._1, kv._2.asInstanceOf[String])
+ }
+ })
+ count = OrchestratorConfiguration.RETRYTASK_MAXIMUM_AGE.getValue(runtimeMap)
+ }
+ }
+ count
+ }
+
def getOriginTask: ExecTask = {
originTask
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org