You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2023/03/07 11:13:36 UTC
[linkis] branch dev-1.3.2 updated: [Feature][Orchestrator] Optimize task kill as a synchronous request (#4314)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new 8a03ed93f [Feature][Orchestrator] Optimize task kill as a synchronous request (#4314)
8a03ed93f is described below
commit 8a03ed93f5e54a71d3da23091b11f9b330898afa
Author: peacewong <wp...@gmail.com>
AuthorDate: Tue Mar 7 19:13:28 2023 +0800
[Feature][Orchestrator] Optimize task kill as a synchronous request (#4314)
* update kill to sync operation
* Fix compile error
---
.../linkis/entrance/execute/DefaultEntranceExecutor.scala | 1 +
.../linkis/manager/am/service/em/DefaultEMEngineService.scala | 1 -
.../linkis/orchestrator/core/AbstractOrchestration.scala | 11 ++++++++---
.../execution/impl/AbstractExecutionFactory.scala | 1 -
.../linkis/orchestrator/execution/impl/ExecutionImpl.scala | 8 ++------
.../orchestrator/extensions/operation/CancelOperation.scala | 11 ++++++++---
.../listener/task/OrchestrationKillListener.scala | 4 ++--
.../linkis/orchestrator/listener/task/TaskInfoEvent.scala | 2 +-
8 files changed, 22 insertions(+), 17 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
index 7b5e3cc3c..e601b3801 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/DefaultEntranceExecutor.scala
@@ -238,6 +238,7 @@ class DefaultEntranceExecutor(id: Long)
}
override def kill(): Boolean = {
+ logger.info("Entrance start to kill job {} invoke Orchestrator ", this.getId)
Utils.tryAndWarn {
val msg = s"You job with id was cancelled by user!"
getRunningOrchestrationFuture.foreach(_.cancel(msg))
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala
index 995b10b87..918faf912 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMEngineService.scala
@@ -112,7 +112,6 @@ class DefaultEMEngineService extends EMEngineService with Logging {
if (MapUtils.isEmpty(instanceAndLabels)) {
new AMErrorException(AMConstant.EM_ERROR_CODE, "No corresponding EM")
}
- // TODO add em select rule to do this
val emInstanceLabelOption = labels.asScala.find(_.isInstanceOf[EMInstanceLabel])
val filterInstanceAndLabel = if (emInstanceLabelOption.isDefined) {
val emInstanceLabel = emInstanceLabelOption.get.asInstanceOf[EMInstanceLabel]
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/core/AbstractOrchestration.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/core/AbstractOrchestration.scala
index 46263fd4a..6efbab608 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/core/AbstractOrchestration.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/core/AbstractOrchestration.scala
@@ -19,7 +19,7 @@ package org.apache.linkis.orchestrator.core
import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.io.{Fs, FsPath}
-import org.apache.linkis.common.utils.Utils
+import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
import org.apache.linkis.orchestrator.{Orchestration, OrchestratorSession}
import org.apache.linkis.orchestrator.core.OrchestrationFuture.NotifyListener
@@ -143,11 +143,16 @@ abstract class AbstractOrchestration(
protected def createOrchestration(logicalPlan: Task): Orchestration
- class OrchestrationFutureImpl(asyncTaskResponse: AsyncTaskResponse) extends OrchestrationFuture {
+ class OrchestrationFutureImpl(asyncTaskResponse: AsyncTaskResponse)
+ extends OrchestrationFuture
+ with Logging {
private val waitLock = new Array[Byte](0)
- override def cancel(errorMsg: String, cause: Throwable): Unit = operate(Operation.CANCEL)
+ override def cancel(errorMsg: String, cause: Throwable): Unit = {
+ logger.info("Orchestrator to kill job {} ", self.physicalPlan.getIDInfo())
+ operate(Operation.CANCEL)
+ }
override def getResponse: OrchestrationResponse = orchestrationResponse
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala
index f063fdb5a..a3014109c 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/AbstractExecutionFactory.scala
@@ -37,7 +37,6 @@ abstract class AbstractExecutionFactory extends ExecutionFactory {
val execution = new ExecutionImpl(taskScheduler, taskManager, getTaskConsumer(sessionState))
sessionState.getOrchestratorSyncListenerBus.addListener(execution)
sessionState.getOrchestratorSyncListenerBus.addListener(taskManager)
- sessionState.getOrchestratorAsyncListenerBus.addListener(execution)
execution.start()
execution
}
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/ExecutionImpl.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/ExecutionImpl.scala
index 770b1cd58..4e00febdd 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/ExecutionImpl.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/execution/impl/ExecutionImpl.scala
@@ -53,6 +53,8 @@ class ExecutionImpl(
onStatusUpdate(executionTaskStatusEvent)
case executionTaskCompletedEvent: ExecutionTaskCompletedEvent =>
onExecutionTaskCompletedEvent(executionTaskCompletedEvent)
+ case killRootExecTaskEvent: KillRootExecTaskEvent =>
+ onKillRootExecTaskEvent(killRootExecTaskEvent)
case _ =>
}
@@ -76,12 +78,6 @@ class ExecutionImpl(
}
}
- override def onEvent(event: OrchestratorAsyncEvent): Unit = event match {
- case killRootExecTaskEvent: KillRootExecTaskEvent =>
- onKillRootExecTaskEvent(killRootExecTaskEvent)
- case _ =>
- }
-
override def onKillRootExecTaskEvent(killRootExecTaskEvent: KillRootExecTaskEvent): Unit = {
val execTask = killRootExecTaskEvent.execTask
logger.info(s"receive killRootExecTaskEvent ${execTask.getIDInfo}")
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/extensions/operation/CancelOperation.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/extensions/operation/CancelOperation.scala
index 40681d5c8..9c2ec84cd 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/extensions/operation/CancelOperation.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/extensions/operation/CancelOperation.scala
@@ -17,6 +17,7 @@
package org.apache.linkis.orchestrator.extensions.operation
+import org.apache.linkis.common.utils.Logging
import org.apache.linkis.orchestrator.{Orchestration, OrchestratorSession}
import org.apache.linkis.orchestrator.core.AbstractOrchestration
import org.apache.linkis.orchestrator.extensions.operation.Operation.OperationBuilder
@@ -25,13 +26,17 @@ import org.apache.linkis.orchestrator.listener.task.KillRootExecTaskEvent
/**
*/
-class CancelOperation extends Operation[Unit] {
+class CancelOperation extends Operation[Unit] with Logging {
override def apply(orchestration: Orchestration): Unit = orchestration match {
case abstractOrchestration: AbstractOrchestration =>
if (null != abstractOrchestration.physicalPlan) {
- orchestration.orchestratorSession.getOrchestratorSessionState.getOrchestratorAsyncListenerBus
- .post(KillRootExecTaskEvent(abstractOrchestration.physicalPlan))
+ logger.info(
+ "Orchestrator to kill job {} post kill event",
+ abstractOrchestration.physicalPlan.getIDInfo()
+ )
+ orchestration.orchestratorSession.getOrchestratorSessionState.getOrchestratorSyncListenerBus
+ .postToAll(KillRootExecTaskEvent(abstractOrchestration.physicalPlan))
}
case _ =>
}
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/OrchestrationKillListener.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/OrchestrationKillListener.scala
index f6a93ea76..924ee1a55 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/OrchestrationKillListener.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/OrchestrationKillListener.scala
@@ -17,11 +17,11 @@
package org.apache.linkis.orchestrator.listener.task
-import org.apache.linkis.orchestrator.listener.OrchestratorAsyncListener
+import org.apache.linkis.orchestrator.listener.OrchestratorSyncListener
/**
*/
-trait OrchestrationKillListener extends OrchestratorAsyncListener {
+trait OrchestrationKillListener extends OrchestratorSyncListener {
def onKillRootExecTaskEvent(killRootExecTaskEvent: KillRootExecTaskEvent): Unit
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
index 3ae70d76a..87d1731d5 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/listener/task/TaskInfoEvent.scala
@@ -86,7 +86,7 @@ case class RootTaskResponseEvent(execTask: ExecTask, taskResponse: CompletedTask
case class KillRootExecTaskEvent(execTask: ExecTask)
extends TaskInfoEvent
- with OrchestratorAsyncEvent
+ with OrchestratorSyncEvent
case class TaskReheaterEvent(execTask: ExecTask) extends TaskInfoEvent with OrchestratorAsyncEvent
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org