You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@dolphinscheduler.apache.org by Jave-Chen <ke...@foxmail.com> on 2020/07/05 08:51:43 UTC

回复: Discuss on fake-run feature 任务空跑的方案讨论

I think Idea B is more suitable for the "fake run task".


And what&nbsp; will happen if&nbsp; I set&nbsp; workflow control&nbsp; nodes to "fake run"?&nbsp;
Workflow control nodes include dependency node, condition node&nbsp;
and sub-process node.


------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"wu shaoj"<gabrywu@apache.org&gt;;
发送时间:&nbsp;2020年7月3日(星期五) 上午10:10
收件人:&nbsp;"dev@dolphinscheduler.apache.org"<dev@dolphinscheduler.apache.org&gt;;

主题:&nbsp;Re: Discuss on fake-run feature 任务空跑的方案讨论



I think 'handled on WorkerServer' is better.

On 2020/7/3, 07:35, "Pu Hsu" <a.xp@live.com&gt; wrote:

&nbsp;&nbsp;&nbsp; Purpose: When we run a process, a task will be regarded as success directly if marked as "fake run".

&nbsp;&nbsp;&nbsp; Sub-goals:
&nbsp;&nbsp;&nbsp; 1. Marking a task as "fake run"
&nbsp;&nbsp;&nbsp; 2. Handling the fake-run task
&nbsp;&nbsp;&nbsp; 3. Providing the HTTP API

&nbsp;&nbsp;&nbsp; ## Subgoal 1 : Design

&nbsp;&nbsp;&nbsp; In the existing design, `org.apache.dolphinscheduler.common.model.TaskNode` stores information about a task in the workflow. The member `String runFlag` in TaskNode is used for running state, and runFlag has two types of `NORMAL` and `FORBIDDEN`. Tracking the references of the runFlag shows that currently only `TaskNode.isForbidden()` used it.

&nbsp;&nbsp;&nbsp; We're going to add a new `FAKE` type to runFlag.

&nbsp;&nbsp;&nbsp; ## Subgoal 2 : Design

&nbsp;&nbsp;&nbsp; The existing running flow of a process is:

&nbsp;&nbsp;&nbsp; 1. MasterServer builds TaskInstance from TaskNode
&nbsp;&nbsp;&nbsp; 2. MasterServer submits TaskInstance to database by MasterBaseTaskExecThread
&nbsp;&nbsp;&nbsp; 3. MasterServer dispatch TaskInstance
&nbsp;&nbsp;&nbsp; 3.1 Construct TaskExecutionContext from TaskInstance
&nbsp;&nbsp;&nbsp; 3.2 Construct Command from TaskExecutionContext
&nbsp;&nbsp;&nbsp; 3.3 Send Command to WorkerServer by NettyExecutorManager
&nbsp;&nbsp;&nbsp; 3.4 Wait for the result
&nbsp;&nbsp;&nbsp; 4. WorkerServer receives Command through NettyRemotingServer, and then processes Command of type `TASK_EXECUTE_REQUEST` and gets TaskExecutionContext from Command by TaskExecuteProcessor
&nbsp;&nbsp;&nbsp; 5. WorkerServer handles TaskExecutionContext by TaskExecuteThread
&nbsp;&nbsp;&nbsp; 6. WorkerServer returns the result to MasterServer by TaskCallbackService

&nbsp;&nbsp;&nbsp; Firstly, we must pass the `FAKE` mark to TaskInstance when creating TaskInstance from TaskNode.
&nbsp;&nbsp;&nbsp; After that, there are multiple candidate places to intercept and process the marked task in TaskInstance/TaskExecutionContext:

&nbsp;&nbsp;&nbsp; ### Idea A : Skip the task (handled on MasterServer)

&nbsp;&nbsp;&nbsp; Result: MasterServer creates the TaskInstance in memory only, then the task is passed and the process goes on. No submit to db, no Command, no dispatch, no wait, no WorkerServer involved.

&nbsp;&nbsp;&nbsp; Between step 1 and step 2 in the above flow, the MasterServer creates a subclass of MasterBaseTaskExecThread for each TaskInstance by `MasterExecThread.submitTaskExec()` to process TaskInstance asynchronously.

&nbsp;&nbsp;&nbsp; So we can get the `FAKE` mark and create a SkippedTaskExecThread here to skip the task and update the task status directly.

&nbsp;&nbsp;&nbsp; ### Idea B : Just do a fake-run (handled on WorkerServer)

&nbsp;&nbsp;&nbsp; Result: All regular things on the MasterServer run normally, the WorkerServer detects the FAKE mark and do a fake-run.

&nbsp;&nbsp;&nbsp; After the WorkerServer gets the FAKE mark from TaskExecutionContext, it doesn't invoke any real handler and returns success directly.

&nbsp;&nbsp;&nbsp; ## Subgoal 3 : Design

&nbsp;&nbsp;&nbsp; Among the existing HTTP API, the `/save` and `/update` in `ProcessDefinitionController` accept a `String processDefinitionJson` field. ApiApplicationServer generates a `ProcessData` from it by `JSONUtils.parseObject()`, which has a `List<TaskNode&gt; tasks` member.

&nbsp;&nbsp;&nbsp; That is: the runFlag is automatically parsed by the JSON library (fastjson), so no additional processing is required. The front-end should send a `FAKE` string here.

&nbsp;&nbsp;&nbsp; P.S. One possible improvement is to increase the validity check of this field.

&nbsp;&nbsp;&nbsp; ---

&nbsp;&nbsp;&nbsp; 目的:标记特定任务为“空跑(fake-run)”,使其不执行具体任务而直接视为运行成功,继续任务流。

&nbsp;&nbsp;&nbsp; 子目标:
&nbsp;&nbsp;&nbsp; 1. 标记空跑任务
&nbsp;&nbsp;&nbsp; 2. 处理空跑任务
&nbsp;&nbsp;&nbsp; 3. 提供后端接口

&nbsp;&nbsp;&nbsp; ## 子目标 1 的设计

&nbsp;&nbsp;&nbsp; 现有的设计中,`org.apache.dolphinscheduler.common.model.TaskNode` 存储了工作流中的一项任务的信息。其中,`String runFlag` 意为运行状态,目前有 `NORMAL` 和 `FORBIDDEN` 两种。追踪 runFlag 成员的引用可知,目前实际只有 `TaskNode.isForbidden()` 里用到了。

&nbsp;&nbsp;&nbsp; 本方案添加名为 `FAKE` 的新类型。

&nbsp;&nbsp;&nbsp; ## 子目标 2 的设计

&nbsp;&nbsp;&nbsp; 现有的任务处理流程是:

&nbsp;&nbsp;&nbsp; 1. MasterServer 从 TaskNode 构建 TaskInstance
&nbsp;&nbsp;&nbsp; 2. MasterServer 通过 MasterBaseTaskExecThread 提交 TaskInstance 到数据库
&nbsp;&nbsp;&nbsp; 3. MasterServer 分派 TaskInstance
&nbsp;&nbsp;&nbsp; 3.1 从 TaskInstance 构建 TaskExecutionContext
&nbsp;&nbsp;&nbsp; 3.2 从 TaskExecutionContext 构建 Command
&nbsp;&nbsp;&nbsp; 3.3 MasterServer 通过 NettyExecutorManager 向 WorkerServer 发送 Command
&nbsp;&nbsp;&nbsp; 3.4 MasterServer 等待并接收结果
&nbsp;&nbsp;&nbsp; 4. WorkerServer 通过 NettyRemotingServer 接收 Command,其中 TaskExecuteProcessor 处理 `TASK_EXECUTE_REQUEST` 类型的 Command,并从 Command 中拿到 TaskExecutionContext
&nbsp;&nbsp;&nbsp; 5. WorkerServer 由 TaskExecuteThread 处理 TaskExecutionContext
&nbsp;&nbsp;&nbsp; 6. WorkerServer 通过 TaskCallbackService 向 MasterServer 返回结果

&nbsp;&nbsp;&nbsp; 首先,在从 TaskNode 创建 TaskInstance 时将 FAKE 标记到 TaskInstance 上。
&nbsp;&nbsp;&nbsp; 之后,有多个地方可以拦截并处理 TaskInstance/TaskExecutionContext 里的标记:

&nbsp;&nbsp;&nbsp; ### 想法 A:在 MasterServer 端即跳过(skip)该任务

&nbsp;&nbsp;&nbsp; 结果:只有在 MasterServer 端做处理,创建了 TaskInstance 后即可结束流程处理下一个 Task。TaskInstance 不写数据库,也没有 Command、不用等 Master-Worker 的交互,WorkerServer 不需任何处理。

&nbsp;&nbsp;&nbsp; 在上述任务处理流程中的 1、2 之间,MasterServer 通过 `MasterExecThread.submitTaskExec()` 为每个 TaskInstance 创建了一个 MasterBaseTaskExecThread 子类来异步处理 TaskInstance。

&nbsp;&nbsp;&nbsp; 因此,可以就在这里创建一个 SkippedTaskExecThread 直接更新任务状态。

&nbsp;&nbsp;&nbsp; ### 想法 B:在 WorkerServer 端空跑(fake run)该任务

&nbsp;&nbsp;&nbsp; 结果:所有 MasterServer 端的流程都正常进行,只是 WorkerServer 识别 FAKE 标记并做一次空跑。

&nbsp;&nbsp;&nbsp; WorkerServer 从 TaskExecutionContext 里拿到 SKIPPED 标记,不调用具体的处理器,直接返回成功。

&nbsp;&nbsp;&nbsp; ## 子目标 3 的设计

&nbsp;&nbsp;&nbsp; 现有的后端接口中,`ProcessDefinitionController` 的 `/save` `/update` 接口都接受一个 `String processDefinitionJson` 字段。该字段通过 `JSONUtils.parseObject()` 产生 `ProcessData`,它有一个 `List<TaskNode&gt; tasks` 成员。

&nbsp;&nbsp;&nbsp; 即:runFlag 字段是通过 JSON 库(fastjson)自动解析的,因此无需额外处理。只要前端传递 `FAKE` 字符串即可。

&nbsp;&nbsp;&nbsp; 额外地,一个可能的改进是增加该字段的有效性检查。