You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2023/03/07 11:13:36 UTC

[linkis] branch dev-1.3.2 updated: [Feature][Orchestrator] Optimize task kill as a synchronous request (#4314)

This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new 8a03ed93f [Feature][Orchestrator] Optimize task kill as a synchronous request  (#4314)
8a03ed93f is described below

commit 8a03ed93f5e54a71d3da23091b11f9b330898afa
Author: peacewong <wp...@gmail.com>
AuthorDate: Tue Mar 7 19:13:28 2023 +0800

    [Feature][Orchestrator] Optimize task kill as a synchronous request  (#4314)
    
    * update kill to sync operation
    * Fix compile error
---
 .../linkis/entrance/execute/DefaultEntranceExecutor.scala     |  1 +
 .../linkis/manager/am/service/em/DefaultEMEngineService.scala |  1 -
 .../linkis/orchestrator/core/AbstractOrchestration.scala      | 11 ++++++++---
 .../execution/impl/AbstractExecutionFactory.scala             |  1 -
 .../linkis/orchestrator/execution/impl/ExecutionImpl.scala    |  8 ++------
 .../orchestrator/extensions/operation/CancelOperation.scala   | 11 ++++++++---
 .../listener/task/OrchestrationKillListener.scala             |  4 ++--
 .../linkis/orchestrator/listener/task/TaskInfoEvent.scala     |  2 +-
 8 files changed, 22 insertions(+), 17 deletions(-)

diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index 7b5e3cc3c..e601b3801 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -238,6 +238,7 @@ class DefaultEntranceExecutor(id: Long)
   }
 
   override def kill(): Boolean = {
+    logger.info("Entrance start to kill job {} invoke Orchestrator ", this.getId)
     Utils.tryAndWarn {
       val msg = s"You job with id  was cancelled by user!"
       getRunningOrchestrationFuture.foreach(_.cancel(msg))
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala
index 995b10b87..918faf912 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala
@@ -112,7 +112,6 @@ class DefaultEMEngineService extends EMEngineService with Logging {
     if (MapUtils.isEmpty(instanceAndLabels)) {
       new AMErrorException(AMConstant.EM_ERROR_CODE, "No corresponding EM")
     }
-    // TODO add em select rule to do this
     val emInstanceLabelOption = labels.asScala.find(_.isInstanceOf[EMInstanceLabel])
     val filterInstanceAndLabel = if (emInstanceLabelOption.isDefined) {
       val emInstanceLabel = emInstanceLabelOption.get.asInstanceOf[EMInstanceLabel]
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/core/AbstractOrchestration.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/core/AbstractOrchestration.scala
index 46263fd4a..6efbab608 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/core/AbstractOrchestration.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/core/AbstractOrchestration.scala
@@ -19,7 +19,7 @@ package org.apache.linkis.orchestrator.core
 
 import org.apache.linkis.common.conf.Configuration
 import org.apache.linkis.common.io.{Fs, FsPath}
-import org.apache.linkis.common.utils.Utils
+import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
 import org.apache.linkis.orchestrator.{Orchestration, OrchestratorSession}
 import org.apache.linkis.orchestrator.core.OrchestrationFuture.NotifyListener
@@ -143,11 +143,16 @@ abstract class AbstractOrchestration(
 
   protected def createOrchestration(logicalPlan: Task): Orchestration
 
-  class OrchestrationFutureImpl(asyncTaskResponse: AsyncTaskResponse) extends OrchestrationFuture {
+  class OrchestrationFutureImpl(asyncTaskResponse: AsyncTaskResponse)
+      extends OrchestrationFuture
+      with Logging {
 
     private val waitLock = new Array[Byte](0)
 
-    override def cancel(errorMsg: String, cause: Throwable): Unit = operate(Operation.CANCEL)
+    override def cancel(errorMsg: String, cause: Throwable): Unit = {
+      logger.info("Orchestrator to kill job {} ", self.physicalPlan.getIDInfo())
+      operate(Operation.CANCEL)
+    }
 
     override def getResponse: OrchestrationResponse = orchestrationResponse
 
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala
index f063fdb5a..a3014109c 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala
@@ -37,7 +37,6 @@ abstract class AbstractExecutionFactory extends ExecutionFactory {
     val execution = new ExecutionImpl(taskScheduler, taskManager, getTaskConsumer(sessionState))
     sessionState.getOrchestratorSyncListenerBus.addListener(execution)
     sessionState.getOrchestratorSyncListenerBus.addListener(taskManager)
-    sessionState.getOrchestratorAsyncListenerBus.addListener(execution)
     execution.start()
     execution
   }
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/ExecutionImpl.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/ExecutionImpl.scala
index 770b1cd58..4e00febdd 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/ExecutionImpl.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/ExecutionImpl.scala
@@ -53,6 +53,8 @@ class ExecutionImpl(
       onStatusUpdate(executionTaskStatusEvent)
     case executionTaskCompletedEvent: ExecutionTaskCompletedEvent =>
       onExecutionTaskCompletedEvent(executionTaskCompletedEvent)
+    case killRootExecTaskEvent: KillRootExecTaskEvent =>
+      onKillRootExecTaskEvent(killRootExecTaskEvent)
     case _ =>
   }
 
@@ -76,12 +78,6 @@ class ExecutionImpl(
       }
   }
 
-  override def onEvent(event: OrchestratorAsyncEvent): Unit = event match {
-    case killRootExecTaskEvent: KillRootExecTaskEvent =>
-      onKillRootExecTaskEvent(killRootExecTaskEvent)
-    case _ =>
-  }
-
   override def onKillRootExecTaskEvent(killRootExecTaskEvent: KillRootExecTaskEvent): Unit = {
     val execTask = killRootExecTaskEvent.execTask
     logger.info(s"receive killRootExecTaskEvent ${execTask.getIDInfo}")
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/extensions/operation/CancelOperation.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/extensions/operation/CancelOperation.scala
index 40681d5c8..9c2ec84cd 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/extensions/operation/CancelOperation.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/extensions/operation/CancelOperation.scala
@@ -17,6 +17,7 @@
 
 package org.apache.linkis.orchestrator.extensions.operation
 
+import org.apache.linkis.common.utils.Logging
 import org.apache.linkis.orchestrator.{Orchestration, OrchestratorSession}
 import org.apache.linkis.orchestrator.core.AbstractOrchestration
 import org.apache.linkis.orchestrator.extensions.operation.Operation.OperationBuilder
@@ -25,13 +26,17 @@ import org.apache.linkis.orchestrator.listener.task.KillRootExecTaskEvent
 
 /**
  */
-class CancelOperation extends Operation[Unit] {
+class CancelOperation extends Operation[Unit] with Logging {
 
   override def apply(orchestration: Orchestration): Unit = orchestration match {
     case abstractOrchestration: AbstractOrchestration =>
       if (null != abstractOrchestration.physicalPlan) {
-        orchestration.orchestratorSession.getOrchestratorSessionState.getOrchestratorAsyncListenerBus
-          .post(KillRootExecTaskEvent(abstractOrchestration.physicalPlan))
+        logger.info(
+          "Orchestrator to kill job {} post kill event",
+          abstractOrchestration.physicalPlan.getIDInfo()
+        )
+        orchestration.orchestratorSession.getOrchestratorSessionState.getOrchestratorSyncListenerBus
+          .postToAll(KillRootExecTaskEvent(abstractOrchestration.physicalPlan))
       }
     case _ =>
   }
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/OrchestrationKillListener.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/OrchestrationKillListener.scala
index f6a93ea76..924ee1a55 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/OrchestrationKillListener.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/OrchestrationKillListener.scala
@@ -17,11 +17,11 @@
 
 package org.apache.linkis.orchestrator.listener.task
 
-import org.apache.linkis.orchestrator.listener.OrchestratorAsyncListener
+import org.apache.linkis.orchestrator.listener.OrchestratorSyncListener
 
 /**
  */
-trait OrchestrationKillListener extends OrchestratorAsyncListener {
+trait OrchestrationKillListener extends OrchestratorSyncListener {
 
   def onKillRootExecTaskEvent(killRootExecTaskEvent: KillRootExecTaskEvent): Unit
 
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
index 3ae70d76a..87d1731d5 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
@@ -86,7 +86,7 @@ case class RootTaskResponseEvent(execTask: ExecTask, taskResponse: CompletedTask
 
 case class KillRootExecTaskEvent(execTask: ExecTask)
     extends TaskInfoEvent
-    with OrchestratorAsyncEvent
+    with OrchestratorSyncEvent
 
 case class TaskReheaterEvent(execTask: ExecTask) extends TaskInfoEvent with OrchestratorAsyncEvent
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org