You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@dolphinscheduler.apache.org by Jave-Chen <ke...@foxmail.com> on 2020/07/05 08:51:43 UTC
回复: Discuss on fake-run feature 任务空跑的方案讨论
I think Idea B is more suitable for the "fake run task".
And what will happen if I set workflow control nodes to "fake run"?
Workflow control nodes include dependency node, condition node
and sub-process node.
------------------ 原始邮件 ------------------
发件人: "wu shaoj"<gabrywu@apache.org>;
发送时间: 2020年7月3日(星期五) 上午10:10
收件人: "dev@dolphinscheduler.apache.org"<dev@dolphinscheduler.apache.org>;
主题: Re: Discuss on fake-run feature 任务空跑的方案讨论
I think 'handled on WorkerServer' is better.
On 2020/7/3, 07:35, "Pu Hsu" <a.xp@live.com> wrote:
Purpose: When we run a process, a task will be regarded as success directly if marked as "fake run".
Sub-goals:
1. Marking a task as "fake run"
2. Handling the fake-run task
3. Providing the HTTP API
## Subgoal 1 : Design
In the existing design, `org.apache.dolphinscheduler.common.model.TaskNode` stores information about a task in the workflow. The member `String runFlag` in TaskNode is used for running state, and runFlag has two types of `NORMAL` and `FORBIDDEN`. Tracking the references of the runFlag shows that currently only `TaskNode.isForbidden()` used it.
We're going to add a new `FAKE` type to runFlag.
## Subgoal 2 : Design
The existing running flow of a process is:
1. MasterServer builds TaskInstance from TaskNode
2. MasterServer submits TaskInstance to database by MasterBaseTaskExecThread
3. MasterServer dispatch TaskInstance
3.1 Construct TaskExecutionContext from TaskInstance
3.2 Construct Command from TaskExecutionContext
3.3 Send Command to WorkerServer by NettyExecutorManager
3.4 Wait for the result
4. WorkerServer receives Command through NettyRemotingServer, and then processes Command of type `TASK_EXECUTE_REQUEST` and gets TaskExecutionContext from Command by TaskExecuteProcessor
5. WorkerServer handles TaskExecutionContext by TaskExecuteThread
6. WorkerServer returns the result to MasterServer by TaskCallbackService
Firstly, we must pass the `FAKE` mark to TaskInstance when creating TaskInstance from TaskNode.
After that, there are multiple candidate places to intercept and process the marked task in TaskInstance/TaskExecutionContext:
### Idea A : Skip the task (handled on MasterServer)
Result: MasterServer creates the TaskInstance in memory only, then the task is passed and the process goes on. No submit to db, no Command, no dispatch, no wait, no WorkerServer involved.
Between step 1 and step 2 in the above flow, the MasterServer creates a subclass of MasterBaseTaskExecThread for each TaskInstance by `MasterExecThread.submitTaskExec()` to process TaskInstance asynchronously.
So we can get the `FAKE` mark and create a SkippedTaskExecThread here to skip the task and update the task status directly.
### Idea B : Just do a fake-run (handled on WorkerServer)
Result: All regular things on the MasterServer run normally, the WorkerServer detects the FAKE mark and do a fake-run.
After the WorkerServer gets the FAKE mark from TaskExecutionContext, it doesn't invoke any real handler and returns success directly.
## Subgoal 3 : Design
Among the existing HTTP API, the `/save` and `/update` in `ProcessDefinitionController` accept a `String processDefinitionJson` field. ApiApplicationServer generates a `ProcessData` from it by `JSONUtils.parseObject()`, which has a `List<TaskNode> tasks` member.
That is: the runFlag is automatically parsed by the JSON library (fastjson), so no additional processing is required. The front-end should send a `FAKE` string here.
P.S. One possible improvement is to increase the validity check of this field.
---
目的:标记特定任务为“空跑(fake-run)”,使其不执行具体任务而直接视为运行成功,继续任务流。
子目标:
1. 标记空跑任务
2. 处理空跑任务
3. 提供后端接口
## 子目标 1 的设计
现有的设计中,`org.apache.dolphinscheduler.common.model.TaskNode` 存储了工作流中的一项任务的信息。其中,`String runFlag` 意为运行状态,目前有 `NORMAL` 和 `FORBIDDEN` 两种。追踪 runFlag 成员的引用可知,目前实际只有 `TaskNode.isForbidden()` 里用到了。
本方案添加名为 `FAKE` 的新类型。
## 子目标 2 的设计
现有的任务处理流程是:
1. MasterServer 从 TaskNode 构建 TaskInstance
2. MasterServer 通过 MasterBaseTaskExecThread 提交 TaskInstance 到数据库
3. MasterServer 分派 TaskInstance
3.1 从 TaskInstance 构建 TaskExecutionContext
3.2 从 TaskExecutionContext 构建 Command
3.3 MasterServer 通过 NettyExecutorManager 向 WorkerServer 发送 Command
3.4 MasterServer 等待并接收结果
4. WorkerServer 通过 NettyRemotingServer 接收 Command,其中 TaskExecuteProcessor 处理 `TASK_EXECUTE_REQUEST` 类型的 Command,并从 Command 中拿到 TaskExecutionContext
5. WorkerServer 由 TaskExecuteThread 处理 TaskExecutionContext
6. WorkerServer 通过 TaskCallbackService 向 MasterServer 返回结果
首先,在从 TaskNode 创建 TaskInstance 时将 FAKE 标记到 TaskInstance 上。
之后,有多个地方可以拦截并处理 TaskInstance/TaskExecutionContext 里的标记:
### 想法 A:在 MasterServer 端即跳过(skip)该任务
结果:只有在 MasterServer 端做处理,创建了 TaskInstance 后即可结束流程处理下一个 Task。TaskInstance 不写数据库,也没有 Command、不用等 Master-Worker 的交互,WorkerServer 不需任何处理。
在上述任务处理流程中的 1、2 之间,MasterServer 通过 `MasterExecThread.submitTaskExec()` 为每个 TaskInstance 创建了一个 MasterBaseTaskExecThread 子类来异步处理 TaskInstance。
因此,可以就在这里创建一个 SkippedTaskExecThread 直接更新任务状态。
### 想法 B:在 WorkerServer 端空跑(fake run)该任务
结果:所有 MasterServer 端的流程都正常进行,只是 WorkerServer 识别 FAKE 标记并做一次空跑。
WorkerServer 从 TaskExecutionContext 里拿到 SKIPPED 标记,不调用具体的处理器,直接返回成功。
## 子目标 3 的设计
现有的后端接口中,`ProcessDefinitionController` 的 `/save` `/update` 接口都接受一个 `String processDefinitionJson` 字段。该字段通过 `JSONUtils.parseObject()` 产生 `ProcessData`,它有一个 `List<TaskNode> tasks` 成员。
即:runFlag 字段是通过 JSON 库(fastjson)自动解析的,因此无需额外处理。只要前端传递 `FAKE` 字符串即可。
额外地,一个可能的改进是增加该字段的有效性检查。