You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by ca...@apache.org on 2022/09/07 16:06:15 UTC
[incubator-linkis] branch dev-1.3.1 updated: [linkis-computation-orchestrator] Modification of scala file floating… (#3187)
This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new 52c85405a [linkis-computation-orchestrator] Modification of scala file floating… (#3187)
52c85405a is described below
commit 52c85405a0ae279226715bbc5ca9bcbf33b40c31
Author: 成彬彬 <10...@users.noreply.github.com>
AuthorDate: Thu Sep 8 00:06:10 2022 +0800
[linkis-computation-orchestrator] Modification of scala file floating… (#3187)
---
.../ComputationOrchestratorSessionFactory.scala | 4 ++--
.../parser/EnrichLabelParserTransform.scala | 4 ++--
.../physical/ComputePhysicalTransform.scala | 15 +-------------
.../catalyst/planner/TaskPlannerTransform.scala | 17 +++++----------
.../reheater/PruneTaskReheaterTransform.scala | 12 ++++-------
.../computation/entity/ComputationJobReq.scala | 13 ++++++------
.../DefaultCodeExecTaskExecutorManager.scala | 2 +-
.../computation/physical/CacheExecTask.scala | 12 +++++------
.../physical/CodeLogicalUnitExecTask.scala | 24 ----------------------
.../service/ComputationTaskExecutionReceiver.scala | 7 -------
.../computation/utils/TreeNodeUtil.scala | 8 ++------
11 files changed, 30 insertions(+), 88 deletions(-)
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/ComputationOrchestratorSessionFactory.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/ComputationOrchestratorSessionFactory.scala
index 13d067c1e..ee3a22b26 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/ComputationOrchestratorSessionFactory.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/ComputationOrchestratorSessionFactory.scala
@@ -50,11 +50,11 @@ object ComputationOrchestratorSessionFactory {
if (
StringUtils
.isNotBlank(ComputationOrchestratorConf.COMPUTATION_SESSION_FACTORY_CLASS.getValue)
- )
+ ) {
ClassUtils.getClassInstance(
ComputationOrchestratorConf.COMPUTATION_SESSION_FACTORY_CLASS.getValue
)
- else {
+ } else {
new ComputationOrchestratorSessionFactoryImpl
}
}
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/parser/EnrichLabelParserTransform.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/parser/EnrichLabelParserTransform.scala
index 31b6d1c8b..38593443b 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/parser/EnrichLabelParserTransform.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/parser/EnrichLabelParserTransform.scala
@@ -48,7 +48,7 @@ class EnrichLabelParserTransform extends ParserTransform {
}
}
- def enrichLabels(codeJob: CodeJob, context: ASTContext) = {
+ def enrichLabels(codeJob: CodeJob, context: ASTContext): Unit = {
parserLabelFillers.foreach { filler =>
filler.parseToLabel(codeJob, context) match {
case Some(label) => context.getLabels.add(label)
@@ -57,7 +57,7 @@ class EnrichLabelParserTransform extends ParserTransform {
}
}
- def initParserLabelFillers = {
+ def initParserLabelFillers: Unit = {
parserLabelFillers = Array(new CacheParserLabelFiller)
}
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/physical/ComputePhysicalTransform.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/physical/ComputePhysicalTransform.scala
index 5ddee1078..9b0dcb60e 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/physical/ComputePhysicalTransform.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/physical/ComputePhysicalTransform.scala
@@ -104,19 +104,6 @@ class CodeExecTaskTransform extends ComputePhysicalTransform with Logging {
object ComputePhysicalTransform {
- def main(args: Array[String]): Unit = {
- /*val jobStartExec = new JobTask(null, null)
- jobStartExec.setTaskDesc(new StartJobTaskDesc(null))
- val stage1 = new StageTask(Array(jobStartExec), null)
- val stage2 = new StageTask(Array(jobStartExec), null)
- jobStartExec.withNewChildren(Array(stage1, stage2))
- val jobEndTask = new JobTask(Array(stage1, stage2), null)
- stage1.withNewChildren(Array(jobEndTask))
- stage2.withNewChildren(Array(jobEndTask))
- val transform = new ComputePhysicalTransform()
- val result = transform.apply(stage1, null)
- println(result)
- println(result.getChildren.length)*/
- }
+ def main(args: Array[String]): Unit = {}
}
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/planner/TaskPlannerTransform.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/planner/TaskPlannerTransform.scala
index e176be0d0..f5fc6d550 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/planner/TaskPlannerTransform.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/planner/TaskPlannerTransform.scala
@@ -56,16 +56,15 @@ class TaskPlannerTransform extends PlannerTransform with Logging {
def buildStageTaskTree(taskDesc: StageTaskDesc, startJobTask: Task = null): (Task, Task) = {
taskDesc match {
- case endStageTask: EndStageTaskDesc => {
+ case endStageTask: EndStageTaskDesc =>
val (task, newStartJobTask) = buildCodeLogicTaskTree(
taskDesc.stage.getJob match {
case codeJob: CodeJob => codeJob.getCodeLogicalUnit
- case job: Job => {
+ case job: Job =>
logger.error(
s"jobId:${job.getId}-----jobType:${job.getName}, job type mismatch, only support CodeJob"
)
null
- }
},
taskDesc.stage,
startJobTask
@@ -73,9 +72,7 @@ class TaskPlannerTransform extends PlannerTransform with Logging {
val stageTaskTmp = new StageTask(Array(), Array(task))
stageTaskTmp.setTaskDesc(endStageTask)
(rebuildTreeNode(stageTaskTmp), newStartJobTask)
- }
- case startStageTask: StartStageTaskDesc => {
-
+ case startStageTask: StartStageTaskDesc =>
/**
* when the construction node arrives at stage-task-start check whether this stage has child
* nodes if true -> use the same way to build another stage tasks if false -> build or reuse
@@ -102,7 +99,6 @@ class TaskPlannerTransform extends PlannerTransform with Logging {
stageTaskTmp.setTaskDesc(taskDesc)
(rebuildTreeNode(stageTaskTmp), newStartJobTask)
}
- }
}
}
@@ -123,20 +119,17 @@ class TaskPlannerTransform extends PlannerTransform with Logging {
def buildJobTaskTree(taskDesc: TaskDesc): Task = {
taskDesc match {
- case startTask: StartJobTaskDesc => {
-
+ case startTask: StartJobTaskDesc =>
/**
* The end of recursion
*/
val jobTask = new JobTask(Array(), Array())
jobTask.setTaskDesc(startTask)
jobTask
- }
- case endTask: EndJobTaskDesc => {
+ case endTask: EndJobTaskDesc =>
val jobTaskTmp = new JobTask(Array(), buildAllStageTaskTree(endTask.job.getAllStages)._1)
jobTaskTmp.setTaskDesc(endTask)
rebuildTreeNode(jobTaskTmp)
- }
}
}
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala
index 627841aeb..79274aa53 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/catalyst/reheater/PruneTaskReheaterTransform.scala
@@ -52,7 +52,7 @@ class PruneTaskRetryTransform extends ReheaterTransform with Logging {
s"task:${in.getIDInfo()} has ${failedTasks.size} child tasks which execute failed, some of them may be retried"
)
TreeNodeUtil.getTaskResponse(task) match {
- case response: FailedTaskResponse => {
+ case response: FailedTaskResponse =>
val exception = response.getCause
if (exception.isInstanceOf[LinkisRetryException]) {
val parents = task.getParents
@@ -62,7 +62,7 @@ class PruneTaskRetryTransform extends ReheaterTransform with Logging {
val otherChildren = parent.getChildren.filter(_ != task)
Utils.tryCatch {
task match {
- case retryExecTask: RetryExecTask => {
+ case retryExecTask: RetryExecTask =>
if (retryExecTask.getAge() < retryExecTask.getMaxRetryCount()) {
val newTask =
new RetryExecTask(retryExecTask.getOriginTask, retryExecTask.getAge() + 1)
@@ -79,18 +79,16 @@ class PruneTaskRetryTransform extends ReheaterTransform with Logging {
)
task.getPhysicalContext.pushLog(logEvent)
}
- }
- case _ => {
+ case _ =>
val retryExecTask = new RetryExecTask(task)
retryExecTask.initialize(task.getPhysicalContext)
TreeNodeUtil.insertNode(parent, task, retryExecTask)
context.set(OrchestratorConfiguration.REHEATER_KEY, true)
pushInfoLog(task, retryExecTask)
- }
}
} {
// restore task node when retry task construction failed
- case e: Exception => {
+ case e: Exception =>
val logEvent = TaskLogEvent(
task,
LogUtils.generateWarn(
@@ -110,12 +108,10 @@ class PruneTaskRetryTransform extends ReheaterTransform with Logging {
LogUtils.generateWarn(s"restore task success! task node: ${task.getIDInfo}")
)
task.getPhysicalContext.pushLog(downLogEvent)
- }
}
})
}
}
- }
case _ =>
}
})
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/entity/ComputationJobReq.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/entity/ComputationJobReq.scala
index db73ac343..002ad7fdd 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/entity/ComputationJobReq.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/entity/ComputationJobReq.scala
@@ -44,21 +44,22 @@ class ComputationJobReq extends AbstractJobReq {
override def getName: String = s"ComputationJobReq_$id"
- def getCodeLogicalUnit = codeLogicalUnit
+ def getCodeLogicalUnit: CodeLogicalUnit = codeLogicalUnit
- def setCodeLogicalUnit(codeLogicalUnit: CodeLogicalUnit) = this.codeLogicalUnit = codeLogicalUnit
+ def setCodeLogicalUnit(codeLogicalUnit: CodeLogicalUnit): Unit = this.codeLogicalUnit =
+ codeLogicalUnit
def getCodeLanguageLabel: CodeLanguageLabel = {
codeLogicalUnit.getLabel
}
- def getErrorCode = errorCode
+ def getErrorCode: Int = errorCode
- def setErrorCode(errorCode: Int) = this.errorCode = errorCode
+ def setErrorCode(errorCode: Int): Unit = this.errorCode = errorCode
- def getErrorDesc = errorDesc
+ def getErrorDesc: String = errorDesc
- def setErrorDesc(errorDesc: String) = this.errorDesc = errorDesc
+ def setErrorDesc(errorDesc: String): Unit = this.errorDesc = errorDesc
}
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala
index 4803ce818..0f08187bd 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/execute/DefaultCodeExecTaskExecutorManager.scala
@@ -250,7 +250,7 @@ class DefaultCodeExecTaskExecutorManager extends CodeExecTaskExecutorManager wit
override def addEngineConnTaskID(executor: CodeExecTaskExecutor): Unit = {
/* val codeExecutor = new CodeExecTaskExecutor(executor.getEngineConnExecutor, executor.getExecTask, executor.getMark)
- codeExecutor.setEngineConnTaskId(executor.getEngineConnTaskId)*/
+ codeExecutor.setEngineConnTaskId(executor.getEngineConnTaskId) */
execTaskToExecutor synchronized {
execTaskToExecutor.put(executor.getExecTaskId, executor)
}
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CacheExecTask.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CacheExecTask.scala
index faffa5965..13a20305c 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CacheExecTask.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CacheExecTask.scala
@@ -42,7 +42,7 @@ import org.apache.linkis.protocol.query.cache.{
}
import org.apache.linkis.rpc.Sender
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
class CacheExecTask(parents: Array[ExecTask], children: Array[ExecTask]) extends AbstractExecTask {
@@ -74,7 +74,7 @@ class CacheExecTask(parents: Array[ExecTask], children: Array[ExecTask]) extends
codeLogicalUnitExecTask.getCodeLogicalUnit.toStringCode,
aSTContext.getExecuteUser,
java.lang.Long.parseLong(cacheLabel.getCacheExpireAfter),
- codeLogicalUnitExecTask.getLabels.map(_.getStringValue),
+ codeLogicalUnitExecTask.getLabels.asScala.map(_.getStringValue).asJava,
resp.getResultSet
)
sender.ask(requestWriteCache)
@@ -103,11 +103,11 @@ class CacheExecTask(parents: Array[ExecTask], children: Array[ExecTask]) extends
val aSTContext =
codeLogicalUnitExecTask.getTaskDesc.getOrigin.getASTOrchestration.getASTContext
val cacheLabel =
- aSTContext.getLabels.find(_.isInstanceOf[CacheLabel]).get.asInstanceOf[CacheLabel]
+ aSTContext.getLabels.asScala.find(_.isInstanceOf[CacheLabel]).get.asInstanceOf[CacheLabel]
val requestReadCache = new RequestReadCache(
codeLogicalUnitExecTask.getCodeLogicalUnit.toStringCode,
aSTContext.getExecuteUser,
- codeLogicalUnitExecTask.getLabels.map(_.getStringValue),
+ codeLogicalUnitExecTask.getLabels.asScala.map(_.getStringValue).asJava,
java.lang.Long.parseLong(cacheLabel.getReadCacheBefore)
)
sender.ask(requestReadCache) match {
@@ -132,9 +132,9 @@ class CacheExecTask(parents: Array[ExecTask], children: Array[ExecTask]) extends
override def isLocalMode: Boolean = true
- def getRealExecTask = realExecTask
+ def getRealExecTask: ExecTask = realExecTask
- def setRealExecTask(realExecTask: ExecTask) = this.realExecTask = realExecTask
+ def setRealExecTask(realExecTask: ExecTask): Unit = this.realExecTask = realExecTask
override def getPhysicalContext: PhysicalContext = physicalContext
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
index 1895138ee..7fb2b926a 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/physical/CodeLogicalUnitExecTask.scala
@@ -164,28 +164,10 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask], children: Array[ExecTask
private def toRequestTask: RequestTask = {
val requestTask = new RequestTaskExecute
requestTask.setCode(getCodeLogicalUnit.toStringCode)
- // getLabels.add(getCodeLogicalUnit.getLabel)
requestTask.setLabels(getLabels)
- // Map
-// if (null != getParams.getRuntimeParams.getDataSources ) {
-// requestTask.getProperties.putAll(getParams.getRuntimeParams.getDataSources)
-// }
-
-// if (null != getParams.getRuntimeParams.getContext) {
-// requestTask.getProperties.putAll(getParams.getRuntimeParams.getContext)
-// }
-
-// if (null != getParams.getRuntimeParams.getSpecials) {
-// requestTask.getProperties.putAll(getParams.getRuntimeParams.getSpecials)
-// }
-
if (null != getParams.getRuntimeParams.getJobs) {
requestTask.getProperties.putAll(getParams.getRuntimeParams.getJobs)
}
-// requestTask.getProperties.put(GovernanceConstant.TASK_SOURCE_MAP_KEY, getParams.getRuntimeParams.get(GovernanceConstant.TASK_SOURCE_MAP_KEY) match {
-// case o: Object => o
-// case _ => null
-// })
requestTask.getProperties.putAll(getParams.getRuntimeParams.toMap)
requestTask.setSourceID(getIDInfo())
requestTask
@@ -265,12 +247,6 @@ class CodeLogicalUnitExecTask(parents: Array[ExecTask], children: Array[ExecTask
task
}
- /*override def close(): Unit = {
- codeExecTaskExecutorManager.getByExecTaskId(this.getId).foreach { codeEngineConnExecutor =>
- info(s"ExecTask(${getIDInfo()}) be closed.")
- Utils.tryAndWarn(codeExecTaskExecutorManager.delete(this, codeEngineConnExecutor))
- }
- }*/
override def clear(isSucceed: Boolean): Unit = {
codeExecTaskExecutorManager.getByExecTaskId(this.getId).foreach { codeEngineConnExecutor =>
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
index f85d9e24e..7df16d84d 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala
@@ -41,13 +41,6 @@ class ComputationTaskExecutionReceiver extends TaskExecutionReceiver with Loggin
private val codeExecTaskExecutorManager =
CodeExecTaskExecutorManager.getCodeExecTaskExecutorManager
- // private val asyncListenerBus: OrchestratorAsyncListenerBus = OrchestratorListenerBusContext.getListenerBusContext().getOrchestratorAsyncListenerBus
-
- // private val syncListenerBus: OrchestratorSyncListenerBus = OrchestratorListenerBusContext.getListenerBusContext().getOrchestratorSyncListenerBus
-
- // TODO ListenerBus should to split into OrchestratorSessions.
- // TODO Two whole ListenerBus will cause the consume problem.
-
@PostConstruct
private def init(): Unit = {
EngineConnMonitor.addEngineExecutorStatusMonitor(
diff --git a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/utils/TreeNodeUtil.scala b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/utils/TreeNodeUtil.scala
index 8c19ccec6..5ca89c702 100644
--- a/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/utils/TreeNodeUtil.scala
+++ b/linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/utils/TreeNodeUtil.scala
@@ -17,9 +17,6 @@
package org.apache.linkis.orchestrator.computation.utils
-import org.apache.linkis.manager.common.entity.node.Node
-import org.apache.linkis.orchestrator.conf.OrchestratorConfiguration
-import org.apache.linkis.orchestrator.domain.TreeNode
import org.apache.linkis.orchestrator.exception.{
OrchestratorErrorCodeSummary,
OrchestratorErrorException
@@ -35,7 +32,7 @@ import org.apache.linkis.orchestrator.strategy.{ExecTaskStatusInfo, StatusInfoEx
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-//TODO 考虑TreeNode修改的并发操作问题
+// TODO 考虑TreeNode修改的并发操作问题
object TreeNodeUtil {
// private method, only support to check adjacent node
@@ -132,9 +129,8 @@ object TreeNodeUtil {
val status = statusInfoMap.get(node.getId).getOrElse(null)
if (status != null) {
status.taskResponse match {
- case response: FailedTaskResponse => {
+ case response: FailedTaskResponse =>
failedTasks += node
- }
case _ =>
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org