You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by al...@apache.org on 2022/04/18 07:47:33 UTC
[incubator-linkis] branch dev-1.1.2 updated: Fix reheater concurrency issue #1990 (#1992)
This is an automated email from the ASF dual-hosted git repository.
alexkun pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new 02c1c0f25 Fix reheater concurrency issue #1990 (#1992)
02c1c0f25 is described below
commit 02c1c0f25190f8a265ab94dcdbbf47d7a2d05c33
Author: peacewong <wp...@gmail.com>
AuthorDate: Mon Apr 18 15:47:29 2022 +0800
Fix reheater concurrency issue #1990 (#1992)
---
.../reheater/PruneTaskReheaterTransform.scala | 3 +
.../conf/OrchestratorConfiguration.scala | 2 +
.../orchestrator/execution/TaskManager.scala | 2 +
.../execution/impl/DefaultTaskManager.scala | 65 ++++++++++++++--------
.../execution/impl/NotifyTaskConsumer.scala | 21 +++++--
.../orchestrator/reheater/AbstractReheater.scala | 29 +++++-----
.../linkis/orchestrator/reheater/Reheater.scala | 2 +-
.../reheater/ReheaterNotifyTaskConsumer.scala | 22 +++++---
8 files changed, 93 insertions(+), 53 deletions(-)
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala
index 31f999662..3c4df038f 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala
@@ -22,6 +22,7 @@ import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.orchestrator.computation.conf.ComputationOrchestratorConf
import org.apache.linkis.orchestrator.computation.utils.TreeNodeUtil
+import org.apache.linkis.orchestrator.conf.OrchestratorConfiguration
import org.apache.linkis.orchestrator.core.FailedOrchestrationResponse
import org.apache.linkis.orchestrator.execution.FailedTaskResponse
import org.apache.linkis.orchestrator.extensions.catalyst.ReheaterTransform
@@ -55,6 +56,7 @@ class PruneTaskRetryTransform extends ReheaterTransform with Logging{
val newTask = new RetryExecTask(retryExecTask.getOriginTask, retryExecTask.getAge() + 1)
newTask.initialize(retryExecTask.getPhysicalContext)
TreeNodeUtil.replaceNode(retryExecTask, newTask)
+ context.set(OrchestratorConfiguration.REHEATER_KEY, true)
pushInfoLog(task, newTask)
} else {
val logEvent = TaskLogEvent(task, LogUtils.generateWarn(s"Retry task: ${retryExecTask.getIDInfo} reached maximum age:${retryExecTask.getAge()}, stop to retry it!"))
@@ -65,6 +67,7 @@ class PruneTaskRetryTransform extends ReheaterTransform with Logging{
val retryExecTask = new RetryExecTask(task)
retryExecTask.initialize(task.getPhysicalContext)
TreeNodeUtil.insertNode(parent, task, retryExecTask)
+ context.set(OrchestratorConfiguration.REHEATER_KEY, true)
pushInfoLog(task, retryExecTask)
}
}
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala
index 952218f29..e758fceb2 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala
@@ -70,4 +70,6 @@ object OrchestratorConfiguration {
val ORCHESTRATOR_METRIC_LOG_TIME = CommonVars("wds.linkis.orchestrator.metric.log.time", new TimeType("1h"))
+ val REHEATER_KEY = "Reheat_successful"
+
}
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/TaskManager.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/TaskManager.scala
index eb4a10cda..2dc3554d4 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/TaskManager.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/TaskManager.scala
@@ -37,6 +37,8 @@ trait TaskManager extends OrchestratorSyncListener {
def getRunnableTasks: Array[ExecTaskRunner]
+ def taskRunnableTasks(execTaskRunners: Array[ExecTaskRunner]): Array[ExecTaskRunner]
+
def addCompletedTask(task: ExecTaskRunner): Unit
def pollCompletedExecutionTasks: Array[ExecutionTask]
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/DefaultTaskManager.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/DefaultTaskManager.scala
index ad70f1215..f26d6685c 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/DefaultTaskManager.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/DefaultTaskManager.scala
@@ -120,37 +120,33 @@ class DefaultTaskManager extends AbstractTaskManager with Logging {
subTasks.nonEmpty
}
- protected def getSuitableExecutionTasks: Array[ExecutionTask] = {
- executionTasks.asScala.filter(executionTask => executionTask.getRootExecTask.canExecute
- && !ExecutionNodeStatus.isCompleted(executionTask.getStatus)).toArray
- }
-
- /**
- * Get runnable TaskRunner
- * 1. Polling for all outstanding ExecutionTasks
- * 2. Polling for unfinished subtasks of ExecutionTask corresponding to ExecTask tree
- * 3. Get the subtask and determine whether it exceeds the maximum value of getRunnable. If it exceeds the maximum value, the maximum number of tasks will be returned
- *
- * @return
- */
- override def getRunnableTasks: Array[ExecTaskRunner] = {
- val startTime = System.currentTimeMillis()
- debug(s"Start to getRunnableTasks startTime: $startTime")
+ def getRunnableExecutionTasksAndExecTask: (Array[ExecutionTask], Array[ExecTaskRunner]) = {
val execTaskRunners = ArrayBuffer[ExecTaskRunner]()
- val runningExecutionTasks = getSuitableExecutionTasks
- //1. Get all runnable TaskRunner
- runningExecutionTasks.foreach { executionTask =>
+ val runningExecutionTasks = getSuitableExecutionTasks.filter{ executionTask =>
val execTask = executionTask.getRootExecTask
val runnableSubTasks = new mutable.HashSet[ExecTask]()
getSubTasksRecursively(executionTask, execTask, runnableSubTasks)
- val subExecTaskRunners = runnableSubTasks.map(execTaskToTaskRunner)
- execTaskRunners ++= subExecTaskRunners
+ if (runnableSubTasks.nonEmpty) {
+ val subExecTaskRunners = runnableSubTasks.map(execTaskToTaskRunner)
+ execTaskRunners ++= subExecTaskRunners
+ true
+ } else {
+ false
+ }
}
+ (runningExecutionTasks, execTaskRunners.toArray)
+ }
+
+ protected def getSuitableExecutionTasks: Array[ExecutionTask] = {
+ executionTasks.asScala.filter(executionTask => executionTask.getRootExecTask.canExecute
+ && !ExecutionNodeStatus.isCompleted(executionTask.getStatus)).toArray
+ }
+ override def taskRunnableTasks(execTaskRunners: Array[ExecTaskRunner]): Array[ExecTaskRunner] = {
//2. Take the current maximum number of runnables from the priority queue: Maximum limit-jobs that are already running
val nowRunningNumber = executionTaskToRunningExecTask.values.map(_.length).sum
val maxRunning = if (nowRunningNumber >= MAX_RUNNER_TASK_SIZE) 0 else MAX_RUNNER_TASK_SIZE - nowRunningNumber
- val runnableTasks = if (maxRunning == 0) {
+ if (maxRunning == 0) {
logger.warn(s"The current running has exceeded the maximum, now: $nowRunningNumber ")
Array.empty[ExecTaskRunner]
} else if (execTaskRunners.isEmpty) {
@@ -188,9 +184,32 @@ class DefaultTaskManager extends AbstractTaskManager with Logging {
}
runners.toArray
}
+ }
+
+ /**
+ * Get runnable TaskRunner
+ * 1. Polling for all outstanding ExecutionTasks
+ * 2. Polling for unfinished subtasks of ExecutionTask corresponding to ExecTask tree
+ * 3. Get the subtask and determine whether it exceeds the maximum value of getRunnable. If it exceeds the maximum value, the maximum number of tasks will be returned
+ *
+ * @return
+ */
+ override def getRunnableTasks: Array[ExecTaskRunner] = {
+ val startTime = System.currentTimeMillis()
+ debug(s"Start to getRunnableTasks startTime: $startTime")
+ val execTaskRunners = ArrayBuffer[ExecTaskRunner]()
+ val runningExecutionTasks = getSuitableExecutionTasks
+ //1. Get all runnable TaskRunner
+ runningExecutionTasks.foreach { executionTask =>
+ val execTask = executionTask.getRootExecTask
+ val runnableSubTasks = new mutable.HashSet[ExecTask]()
+ getSubTasksRecursively(executionTask, execTask, runnableSubTasks)
+ val subExecTaskRunners = runnableSubTasks.map(execTaskToTaskRunner)
+ execTaskRunners ++= subExecTaskRunners
+ }
val finishTime = System.currentTimeMillis()
debug(s"Finished to getRunnableTasks finishTime: $finishTime, taken: ${finishTime - startTime}")
- runnableTasks
+ taskRunnableTasks(execTaskRunners.toArray)
}
/**
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/NotifyTaskConsumer.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/NotifyTaskConsumer.scala
index 7c35ae9ca..bc8e4fff3 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/NotifyTaskConsumer.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/NotifyTaskConsumer.scala
@@ -33,20 +33,29 @@ abstract class NotifyTaskConsumer extends TaskConsumer with OrchestratorAsyncLis
protected def getWaitTime: Long = OrchestratorConfiguration.TASK_CONSUMER_WAIT.getValue
- protected def beforeFetchLaunchTask(): Unit = {}
+ protected def beforeFetchLaunchTask(): Array[ExecTaskRunner] = {
+ null
+ }
protected def beforeLaunchTask(runnableTasks: Array[ExecTaskRunner]): Unit = {}
protected def afterLaunchTask(runnableTasks: Array[ExecTaskRunner]): Unit = {}
+ /**
+ * 1. The reheater gets the tasks that can be executed and performs the reheater
+ * 2. If a task is reheater, it will not be executed this time until it is scheduled next time
+ * 3. Execute the tasks after the reheater
+ */
override def run(): Unit = {
while (!isStopped)
Utils.tryAndErrorMsg {
- beforeFetchLaunchTask()
- val runnableTasks = getExecution.taskManager.getRunnableTasks
- beforeLaunchTask(runnableTasks)
- runnableTasks.foreach(getExecution.taskScheduler.launchTask)
- afterLaunchTask(runnableTasks)
+ val runners = beforeFetchLaunchTask()
+ if (null != runners && runners.nonEmpty) {
+ val runnableTasks = getExecution.taskManager.taskRunnableTasks(runners)
+ beforeLaunchTask(runnableTasks)
+ runnableTasks.foreach(getExecution.taskScheduler.launchTask)
+ afterLaunchTask(runnableTasks)
+ }
notifyLock synchronized {
notifyLock.wait(getWaitTime)
}
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/AbstractReheater.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/AbstractReheater.scala
index a1ae843b6..54613f2ce 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/AbstractReheater.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/AbstractReheater.scala
@@ -18,39 +18,38 @@
package org.apache.linkis.orchestrator.reheater
import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.orchestrator.conf.OrchestratorConfiguration
import org.apache.linkis.orchestrator.extensions.catalyst.ReheaterTransform
import org.apache.linkis.orchestrator.plans.physical.{ExecTask, ReheatableExecTask}
abstract class AbstractReheater extends Reheater with Logging {
- override def reheat(execTask: ExecTask): Unit = execTask match {
+ override def reheat(execTask: ExecTask): Boolean = execTask match {
case reheat: ReheatableExecTask =>
- debug(s"Try to reheat ${execTask.getIDInfo()}.")
+ debug(s"Try to reheat ${execTask.getIDInfo()}")
reheat.setReheating()
var changed = false
Utils.tryCatch(Option(reheaterTransforms).foreach { transforms =>
- Option(execTask.getChildren).map(_.map{ child =>
- val newChild = transforms.foldLeft(child)((node, transform) => transform.apply(node, execTask.getPhysicalContext))
- if (!child.theSame(newChild)) {
- changed = true
- newChild.relateParents(child)
- newChild
- } else child
- }).foreach { children =>
- if (changed) {
- execTask.withNewChildren(children)
- }
+ execTask.getPhysicalContext.set(OrchestratorConfiguration.REHEATER_KEY, false)
+ val newTree = transforms.foldLeft(execTask)((node, transform) => transform.apply(node, execTask.getPhysicalContext))
+ val reheaterStatus = execTask.getPhysicalContext.get(OrchestratorConfiguration.REHEATER_KEY)
+ changed = reheaterStatus match {
+ case status: Boolean =>
+ status
+ case _ => false
}
+ execTask.getPhysicalContext.set(OrchestratorConfiguration.REHEATER_KEY, false)
}) { t =>
- logger.error(s" Reheat ${execTask.getIDInfo()} failed, now mark it failed!", t)
+ logger.error(s"Reheat ${execTask.getIDInfo()} failed, now mark it failed", t)
execTask.getPhysicalContext.markFailed(s" Reheat ${execTask.getIDInfo()} failed, now mark it failed!", t)
}
reheat.setReheated()
if (changed) {
logger.info(s"${execTask.getIDInfo()} reheated. The physicalTree has been changed. The new tree is ${execTask.simpleString}.")
}
- case _ =>
+ changed
+ case _ => false
}
protected def reheaterTransforms: Array[ReheaterTransform]
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/Reheater.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/Reheater.scala
index 2a62c3953..bf22fa986 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/Reheater.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/Reheater.scala
@@ -24,6 +24,6 @@ import org.apache.linkis.orchestrator.plans.physical.ExecTask
*/
trait Reheater {
- def reheat(execTask: ExecTask): Unit
+ def reheat(execTask: ExecTask): Boolean
}
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/ReheaterNotifyTaskConsumer.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/ReheaterNotifyTaskConsumer.scala
index 21a6d6e2d..3c1fb4157 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/ReheaterNotifyTaskConsumer.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/ReheaterNotifyTaskConsumer.scala
@@ -17,6 +17,7 @@
package org.apache.linkis.orchestrator.reheater
+import org.apache.linkis.orchestrator.execution.ExecTaskRunner
import org.apache.linkis.orchestrator.execution.impl.{DefaultTaskManager, NotifyTaskConsumer}
import org.apache.linkis.orchestrator.plans.physical.ExecTask
@@ -27,14 +28,17 @@ abstract class ReheaterNotifyTaskConsumer extends NotifyTaskConsumer {
val reheater: Reheater
- protected def reheatIt(execTask: ExecTask): Unit = {
+ protected def reheatIt(execTask: ExecTask): Boolean = {
val key = getReheatableKey(execTask.getId)
- def compareAndSet(lastRunning: String): Unit = {
+ def compareAndSet(lastRunning: String): Boolean = {
val thisRunning = getExecution.taskManager.getCompletedTasks(execTask).map(_.task.getId).mkString(",")
if(thisRunning != lastRunning) {
logger.debug(s"${execTask.getIDInfo()} Try to reheat this $thisRunning. lastRunning: $lastRunning")
- reheater.reheat(execTask)
+ val reheaterStatus = reheater.reheat(execTask)
execTask.getPhysicalContext.set(key, thisRunning)
+ reheaterStatus
+ } else {
+ false
}
}
execTask.getPhysicalContext.get(key) match {
@@ -42,18 +46,20 @@ abstract class ReheaterNotifyTaskConsumer extends NotifyTaskConsumer {
compareAndSet(lastRunning)
case _ if getExecution.taskManager.getCompletedTasks(execTask).nonEmpty =>
compareAndSet(null)
- case _ =>
- //info(s"no need to deal ${execTask.getPhysicalContext.get(key) }")
+ case _ => false
+
}
}
protected def getReheatableKey(id: String): String = ReheaterNotifyTaskConsumer.REHEAT_KEY_PREFIX + id
- override protected def beforeFetchLaunchTask(): Unit = getExecution.taskManager match {
+ override protected def beforeFetchLaunchTask(): Array[ExecTaskRunner] = getExecution.taskManager match {
case taskManager: DefaultTaskManager =>
- taskManager.getRunnableExecutionTasks.foreach(t => reheatIt(t.getRootExecTask))
- case _ =>
+ val (executionTasks, runnableExecTasks) = taskManager.getRunnableExecutionTasksAndExecTask
+ val reheaterRootExecTasks = executionTasks.map(_.getRootExecTask).filter(reheatIt).map(_.getId)
+ runnableExecTasks.filterNot(task => reheaterRootExecTasks.contains(task.task.getPhysicalContext.getRootTask.getId))
+ case _ => null
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org