You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by al...@apache.org on 2022/04/18 07:47:33 UTC

[incubator-linkis] branch dev-1.1.2 updated: Fix reheater concurrency issue #1990 (#1992)

This is an automated email from the ASF dual-hosted git repository.

alexkun pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new 02c1c0f25 Fix reheater concurrency issue #1990 (#1992)
02c1c0f25 is described below

commit 02c1c0f25190f8a265ab94dcdbbf47d7a2d05c33
Author: peacewong <wp...@gmail.com>
AuthorDate: Mon Apr 18 15:47:29 2022 +0800

    Fix reheater concurrency issue #1990 (#1992)
---
 .../reheater/PruneTaskReheaterTransform.scala      |  3 +
 .../conf/OrchestratorConfiguration.scala           |  2 +
 .../orchestrator/execution/TaskManager.scala       |  2 +
 .../execution/impl/DefaultTaskManager.scala        | 65 ++++++++++++++--------
 .../execution/impl/NotifyTaskConsumer.scala        | 21 +++++--
 .../orchestrator/reheater/AbstractReheater.scala   | 29 +++++-----
 .../linkis/orchestrator/reheater/Reheater.scala    |  2 +-
 .../reheater/ReheaterNotifyTaskConsumer.scala      | 22 +++++---
 8 files changed, 93 insertions(+), 53 deletions(-)

diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala
index 31f999662..3c4df038f 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala
@@ -22,6 +22,7 @@ import org.apache.linkis.common.log.LogUtils
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.orchestrator.computation.conf.ComputationOrchestratorConf
 import org.apache.linkis.orchestrator.computation.utils.TreeNodeUtil
+import org.apache.linkis.orchestrator.conf.OrchestratorConfiguration
 import org.apache.linkis.orchestrator.core.FailedOrchestrationResponse
 import org.apache.linkis.orchestrator.execution.FailedTaskResponse
 import org.apache.linkis.orchestrator.extensions.catalyst.ReheaterTransform
@@ -55,6 +56,7 @@ class PruneTaskRetryTransform extends ReheaterTransform with Logging{
                           val newTask = new RetryExecTask(retryExecTask.getOriginTask, retryExecTask.getAge() + 1)
                           newTask.initialize(retryExecTask.getPhysicalContext)
                           TreeNodeUtil.replaceNode(retryExecTask, newTask)
+                          context.set(OrchestratorConfiguration.REHEATER_KEY, true)
                           pushInfoLog(task, newTask)
                         } else {
                           val logEvent = TaskLogEvent(task, LogUtils.generateWarn(s"Retry task: ${retryExecTask.getIDInfo} reached maximum age:${retryExecTask.getAge()}, stop to retry it!"))
@@ -65,6 +67,7 @@ class PruneTaskRetryTransform extends ReheaterTransform with Logging{
                         val retryExecTask = new RetryExecTask(task)
                         retryExecTask.initialize(task.getPhysicalContext)
                         TreeNodeUtil.insertNode(parent, task, retryExecTask)
+                        context.set(OrchestratorConfiguration.REHEATER_KEY, true)
                         pushInfoLog(task, retryExecTask)
                       }
                     }
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala
index 952218f29..e758fceb2 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala
@@ -70,4 +70,6 @@ object OrchestratorConfiguration {
 
   val ORCHESTRATOR_METRIC_LOG_TIME = CommonVars("wds.linkis.orchestrator.metric.log.time", new TimeType("1h"))
 
+  val REHEATER_KEY = "Reheat_successful"
+
 }
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/TaskManager.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/TaskManager.scala
index eb4a10cda..2dc3554d4 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/TaskManager.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/TaskManager.scala
@@ -37,6 +37,8 @@ trait TaskManager extends OrchestratorSyncListener {
 
   def getRunnableTasks: Array[ExecTaskRunner]
 
+  def taskRunnableTasks(execTaskRunners: Array[ExecTaskRunner]): Array[ExecTaskRunner]
+
   def addCompletedTask(task: ExecTaskRunner): Unit
 
   def pollCompletedExecutionTasks: Array[ExecutionTask]
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/DefaultTaskManager.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/DefaultTaskManager.scala
index ad70f1215..f26d6685c 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/DefaultTaskManager.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/DefaultTaskManager.scala
@@ -120,37 +120,33 @@ class DefaultTaskManager extends AbstractTaskManager with Logging {
     subTasks.nonEmpty
   }
 
-  protected def getSuitableExecutionTasks: Array[ExecutionTask] = {
-    executionTasks.asScala.filter(executionTask => executionTask.getRootExecTask.canExecute
-      && !ExecutionNodeStatus.isCompleted(executionTask.getStatus)).toArray
-  }
-
-  /**
-    * Get runnable TaskRunner
-    * 1. Polling for all outstanding ExecutionTasks
-    * 2. Polling for unfinished subtasks of ExecutionTask corresponding to ExecTask tree
-    * 3. Get the subtask and determine whether it exceeds the maximum value of getRunnable. If it exceeds the maximum value, the maximum number of tasks will be returned
-    *
-    * @return
-    */
-  override def getRunnableTasks: Array[ExecTaskRunner] = {
-    val startTime = System.currentTimeMillis()
-    debug(s"Start to getRunnableTasks startTime: $startTime")
+  def getRunnableExecutionTasksAndExecTask: (Array[ExecutionTask], Array[ExecTaskRunner]) = {
     val execTaskRunners = ArrayBuffer[ExecTaskRunner]()
-    val runningExecutionTasks = getSuitableExecutionTasks
-    //1. Get all runnable TaskRunner
-    runningExecutionTasks.foreach { executionTask =>
+    val runningExecutionTasks = getSuitableExecutionTasks.filter{ executionTask =>
       val execTask = executionTask.getRootExecTask
       val runnableSubTasks = new mutable.HashSet[ExecTask]()
       getSubTasksRecursively(executionTask, execTask, runnableSubTasks)
-      val subExecTaskRunners = runnableSubTasks.map(execTaskToTaskRunner)
-      execTaskRunners ++= subExecTaskRunners
+      if (runnableSubTasks.nonEmpty) {
+        val subExecTaskRunners = runnableSubTasks.map(execTaskToTaskRunner)
+        execTaskRunners ++= subExecTaskRunners
+        true
+      } else {
+        false
+      }
     }
+    (runningExecutionTasks, execTaskRunners.toArray)
+  }
+
+  protected def getSuitableExecutionTasks: Array[ExecutionTask] = {
+    executionTasks.asScala.filter(executionTask => executionTask.getRootExecTask.canExecute
+      && !ExecutionNodeStatus.isCompleted(executionTask.getStatus)).toArray
+  }
 
+  override def taskRunnableTasks(execTaskRunners: Array[ExecTaskRunner]): Array[ExecTaskRunner] = {
     //2. Take the current maximum number of runnables from the priority queue: Maximum limit-jobs that are already running
     val nowRunningNumber = executionTaskToRunningExecTask.values.map(_.length).sum
     val maxRunning = if (nowRunningNumber >= MAX_RUNNER_TASK_SIZE) 0 else MAX_RUNNER_TASK_SIZE - nowRunningNumber
-    val runnableTasks = if (maxRunning == 0) {
+    if (maxRunning == 0) {
       logger.warn(s"The current running has exceeded the maximum, now: $nowRunningNumber ")
       Array.empty[ExecTaskRunner]
     } else if (execTaskRunners.isEmpty) {
@@ -188,9 +184,32 @@ class DefaultTaskManager extends AbstractTaskManager with Logging {
       }
       runners.toArray
     }
+  }
+
+  /**
+   * Get runnable TaskRunner
+   * 1. Polling for all outstanding ExecutionTasks
+   * 2. Polling for unfinished subtasks of ExecutionTask corresponding to ExecTask tree
+   * 3. Get the subtask and determine whether it exceeds the maximum value of getRunnable. If it exceeds the maximum value, the maximum number of tasks will be returned
+   *
+   * @return
+   */
+  override def getRunnableTasks: Array[ExecTaskRunner] = {
+    val startTime = System.currentTimeMillis()
+    debug(s"Start to getRunnableTasks startTime: $startTime")
+    val execTaskRunners = ArrayBuffer[ExecTaskRunner]()
+    val runningExecutionTasks = getSuitableExecutionTasks
+    //1. Get all runnable TaskRunner
+    runningExecutionTasks.foreach { executionTask =>
+      val execTask = executionTask.getRootExecTask
+      val runnableSubTasks = new mutable.HashSet[ExecTask]()
+      getSubTasksRecursively(executionTask, execTask, runnableSubTasks)
+      val subExecTaskRunners = runnableSubTasks.map(execTaskToTaskRunner)
+      execTaskRunners ++= subExecTaskRunners
+    }
     val finishTime = System.currentTimeMillis()
     debug(s"Finished to getRunnableTasks finishTime: $finishTime, taken: ${finishTime - startTime}")
-    runnableTasks
+    taskRunnableTasks(execTaskRunners.toArray)
   }
 
   /**
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/NotifyTaskConsumer.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/NotifyTaskConsumer.scala
index 7c35ae9ca..bc8e4fff3 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/NotifyTaskConsumer.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/NotifyTaskConsumer.scala
@@ -33,20 +33,29 @@ abstract class NotifyTaskConsumer extends TaskConsumer with OrchestratorAsyncLis
 
   protected def getWaitTime: Long = OrchestratorConfiguration.TASK_CONSUMER_WAIT.getValue
 
-  protected def beforeFetchLaunchTask(): Unit = {}
+  protected def beforeFetchLaunchTask(): Array[ExecTaskRunner] = {
+    null
+  }
 
   protected def beforeLaunchTask(runnableTasks: Array[ExecTaskRunner]): Unit = {}
 
   protected def afterLaunchTask(runnableTasks: Array[ExecTaskRunner]): Unit = {}
 
+  /**
+   * 1. The reheater gets the tasks that can be executed and performs the reheater
+   * 2. If a task is reheater, it will not be executed this time until it is scheduled next time
+   * 3. Execute the tasks after the reheater
+   */
   override def run(): Unit = {
     while (!isStopped)
       Utils.tryAndErrorMsg {
-        beforeFetchLaunchTask()
-        val runnableTasks = getExecution.taskManager.getRunnableTasks
-        beforeLaunchTask(runnableTasks)
-        runnableTasks.foreach(getExecution.taskScheduler.launchTask)
-        afterLaunchTask(runnableTasks)
+        val runners = beforeFetchLaunchTask()
+        if (null != runners && runners.nonEmpty) {
+          val runnableTasks = getExecution.taskManager.taskRunnableTasks(runners)
+          beforeLaunchTask(runnableTasks)
+          runnableTasks.foreach(getExecution.taskScheduler.launchTask)
+          afterLaunchTask(runnableTasks)
+        }
         notifyLock synchronized {
           notifyLock.wait(getWaitTime)
         }
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/AbstractReheater.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/AbstractReheater.scala
index a1ae843b6..54613f2ce 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/AbstractReheater.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/AbstractReheater.scala
@@ -18,39 +18,38 @@
 package org.apache.linkis.orchestrator.reheater
 
 import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.orchestrator.conf.OrchestratorConfiguration
 import org.apache.linkis.orchestrator.extensions.catalyst.ReheaterTransform
 import org.apache.linkis.orchestrator.plans.physical.{ExecTask, ReheatableExecTask}
 
 
 abstract class AbstractReheater extends Reheater with Logging {
 
-  override def reheat(execTask: ExecTask): Unit = execTask match {
+  override def reheat(execTask: ExecTask): Boolean = execTask match {
     case reheat: ReheatableExecTask =>
-      debug(s"Try to reheat ${execTask.getIDInfo()}.")
+      debug(s"Try to reheat ${execTask.getIDInfo()}")
       reheat.setReheating()
       var changed = false
       Utils.tryCatch(Option(reheaterTransforms).foreach { transforms =>
-        Option(execTask.getChildren).map(_.map{ child =>
-          val newChild = transforms.foldLeft(child)((node, transform) => transform.apply(node, execTask.getPhysicalContext))
-          if (!child.theSame(newChild)) {
-            changed = true
-            newChild.relateParents(child)
-            newChild
-          } else child
-        }).foreach { children =>
-          if (changed) {
-            execTask.withNewChildren(children)
-          }
+        execTask.getPhysicalContext.set(OrchestratorConfiguration.REHEATER_KEY, false)
+        val newTree = transforms.foldLeft(execTask)((node, transform) => transform.apply(node, execTask.getPhysicalContext))
+        val reheaterStatus = execTask.getPhysicalContext.get(OrchestratorConfiguration.REHEATER_KEY)
+        changed = reheaterStatus match {
+          case status: Boolean =>
+            status
+          case _ => false
         }
+        execTask.getPhysicalContext.set(OrchestratorConfiguration.REHEATER_KEY, false)
       }) { t =>
-        logger.error(s" Reheat ${execTask.getIDInfo()} failed, now mark it failed!", t)
+        logger.error(s"Reheat ${execTask.getIDInfo()} failed, now mark it failed", t)
         execTask.getPhysicalContext.markFailed(s" Reheat ${execTask.getIDInfo()} failed, now mark it failed!", t)
       }
       reheat.setReheated()
       if (changed) {
         logger.info(s"${execTask.getIDInfo()} reheated. The physicalTree has been changed. The new tree is ${execTask.simpleString}.")
       }
-    case _ =>
+      changed
+    case _ => false
   }
 
   protected def reheaterTransforms: Array[ReheaterTransform]
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/Reheater.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/Reheater.scala
index 2a62c3953..bf22fa986 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/Reheater.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/Reheater.scala
@@ -24,6 +24,6 @@ import org.apache.linkis.orchestrator.plans.physical.ExecTask
   */
 trait Reheater {
 
-  def reheat(execTask: ExecTask): Unit
+  def reheat(execTask: ExecTask): Boolean
 
 }
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/ReheaterNotifyTaskConsumer.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/ReheaterNotifyTaskConsumer.scala
index 21a6d6e2d..3c1fb4157 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/ReheaterNotifyTaskConsumer.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/reheater/ReheaterNotifyTaskConsumer.scala
@@ -17,6 +17,7 @@
  
 package org.apache.linkis.orchestrator.reheater
 
+import org.apache.linkis.orchestrator.execution.ExecTaskRunner
 import org.apache.linkis.orchestrator.execution.impl.{DefaultTaskManager, NotifyTaskConsumer}
 import org.apache.linkis.orchestrator.plans.physical.ExecTask
 
@@ -27,14 +28,17 @@ abstract class ReheaterNotifyTaskConsumer extends NotifyTaskConsumer {
 
   val reheater: Reheater
 
-  protected def reheatIt(execTask: ExecTask): Unit = {
+  protected def reheatIt(execTask: ExecTask): Boolean = {
     val key = getReheatableKey(execTask.getId)
-    def compareAndSet(lastRunning: String): Unit = {
+    def compareAndSet(lastRunning: String): Boolean = {
       val thisRunning = getExecution.taskManager.getCompletedTasks(execTask).map(_.task.getId).mkString(",")
       if(thisRunning != lastRunning) {
         logger.debug(s"${execTask.getIDInfo()} Try to reheat this $thisRunning. lastRunning: $lastRunning")
-        reheater.reheat(execTask)
+        val reheaterStatus = reheater.reheat(execTask)
         execTask.getPhysicalContext.set(key, thisRunning)
+        reheaterStatus
+      } else {
+        false
       }
     }
     execTask.getPhysicalContext.get(key) match {
@@ -42,18 +46,20 @@ abstract class ReheaterNotifyTaskConsumer extends NotifyTaskConsumer {
         compareAndSet(lastRunning)
       case _ if getExecution.taskManager.getCompletedTasks(execTask).nonEmpty =>
         compareAndSet(null)
-      case _ =>
-        //info(s"no need to deal ${execTask.getPhysicalContext.get(key) }")
+      case _ => false
+
     }
 
   }
 
   protected def getReheatableKey(id: String): String = ReheaterNotifyTaskConsumer.REHEAT_KEY_PREFIX + id
 
-  override protected def beforeFetchLaunchTask(): Unit = getExecution.taskManager match {
+  override protected def beforeFetchLaunchTask(): Array[ExecTaskRunner] = getExecution.taskManager match {
     case taskManager: DefaultTaskManager =>
-      taskManager.getRunnableExecutionTasks.foreach(t => reheatIt(t.getRootExecTask))
-    case _ =>
+      val (executionTasks, runnableExecTasks) = taskManager.getRunnableExecutionTasksAndExecTask
+      val reheaterRootExecTasks = executionTasks.map(_.getRootExecTask).filter(reheatIt).map(_.getId)
+      runnableExecTasks.filterNot(task => reheaterRootExecTasks.contains(task.task.getPhysicalContext.getRootTask.getId))
+    case _ => null
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org