You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@dolphinscheduler.apache.org by wu shaoj <ga...@apache.org> on 2020/07/03 02:10:48 UTC

Re: Discuss on fake-run feature 任务空跑的方案讨论

I think 'handled on WorkerServer' is better.

On 2020/7/3, 07:35, "Pu Hsu" <a....@live.com> wrote:

    Purpose: When we run a process, a task will be regarded as success directly if marked as "fake run".

    Sub-goals:
    1. Marking a task as "fake run"
    2. Handling the fake-run task
    3. Providing the HTTP API

    ## Subgoal 1 : Design

    In the existing design, `org.apache.dolphinscheduler.common.model.TaskNode` stores information about a task in the workflow. The member `String runFlag` in TaskNode is used for running state, and runFlag has two types of `NORMAL` and `FORBIDDEN`. Tracking the references of the runFlag shows that currently only `TaskNode.isForbidden()` used it.

    We're going to add a new `FAKE` type to runFlag.

    ## Subgoal 2 : Design

    The existing running flow of a process is:

    1. MasterServer builds TaskInstance from TaskNode
    2. MasterServer submits TaskInstance to database by MasterBaseTaskExecThread
    3. MasterServer dispatch TaskInstance
    3.1 Construct TaskExecutionContext from TaskInstance
    3.2 Construct Command from TaskExecutionContext
    3.3 Send Command to WorkerServer by NettyExecutorManager
    3.4 Wait for the result
    4. WorkerServer receives Command through NettyRemotingServer, and then processes Command of type `TASK_EXECUTE_REQUEST` and gets TaskExecutionContext from Command by TaskExecuteProcessor
    5. WorkerServer handles TaskExecutionContext by TaskExecuteThread
    6. WorkerServer returns the result to MasterServer by TaskCallbackService

    Firstly, we must pass the `FAKE` mark to TaskInstance when creating TaskInstance from TaskNode.
    After that, there are multiple candidate places to intercept and process the marked task in TaskInstance/TaskExecutionContext:

    ### Idea A : Skip the task (handled on MasterServer)

    Result: MasterServer creates the TaskInstance in memory only, then the task is passed and the process goes on. No submit to db, no Command, no dispatch, no wait, no WorkerServer involved.

    Between step 1 and step 2 in the above flow, the MasterServer creates a subclass of MasterBaseTaskExecThread for each TaskInstance by `MasterExecThread.submitTaskExec()` to process TaskInstance asynchronously.

    So we can get the `FAKE` mark and create a SkippedTaskExecThread here to skip the task and update the task status directly.

    ### Idea B : Just do a fake-run (handled on WorkerServer)

    Result: All regular things on the MasterServer run normally, the WorkerServer detects the FAKE mark and do a fake-run.

    After the WorkerServer gets the FAKE mark from TaskExecutionContext, it doesn't invoke any real handler and returns success directly.

    ## Subgoal 3 : Design

    Among the existing HTTP API, the `/save` and `/update` in `ProcessDefinitionController` accept a `String processDefinitionJson` field. ApiApplicationServer generates a `ProcessData` from it by `JSONUtils.parseObject()`, which has a `List<TaskNode> tasks` member.

    That is: the runFlag is automatically parsed by the JSON library (fastjson), so no additional processing is required. The front-end should send a `FAKE` string here.

    P.S. One possible improvement is to increase the validity check of this field.

    ---

    目的:标记特定任务为“空跑(fake-run)”,使其不执行具体任务而直接视为运行成功,继续任务流。

    子目标:
    1. 标记空跑任务
    2. 处理空跑任务
    3. 提供后端接口

    ## 子目标 1 的设计

    现有的设计中,`org.apache.dolphinscheduler.common.model.TaskNode` 存储了工作流中的一项任务的信息。其中,`String runFlag` 意为运行状态,目前有 `NORMAL` 和 `FORBIDDEN` 两种。追踪 runFlag 成员的引用可知,目前实际只有 `TaskNode.isForbidden()` 里用到了。

    本方案添加名为 `FAKE` 的新类型。

    ## 子目标 2 的设计

    现有的任务处理流程是:

    1. MasterServer 从 TaskNode 构建 TaskInstance
    2. MasterServer 通过 MasterBaseTaskExecThread 提交 TaskInstance 到数据库
    3. MasterServer 分派 TaskInstance
    3.1 从 TaskInstance 构建 TaskExecutionContext
    3.2 从 TaskExecutionContext 构建 Command
    3.3 MasterServer 通过 NettyExecutorManager 向 WorkerServer 发送 Command
    3.4 MasterServer 等待并接收结果
    4. WorkerServer 通过 NettyRemotingServer 接收 Command,其中 TaskExecuteProcessor 处理 `TASK_EXECUTE_REQUEST` 类型的 Command,并从 Command 中拿到 TaskExecutionContext
    5. WorkerServer 由 TaskExecuteThread 处理 TaskExecutionContext
    6. WorkerServer 通过 TaskCallbackService 向 MasterServer 返回结果

    首先,在从 TaskNode 创建 TaskInstance 时将 FAKE 标记到 TaskInstance 上。
    之后,有多个地方可以拦截并处理 TaskInstance/TaskExecutionContext 里的标记:

    ### 想法 A:在 MasterServer 端即跳过(skip)该任务

    结果:只有在 MasterServer 端做处理,创建了 TaskInstance 后即可结束流程处理下一个 Task。TaskInstance 不写数据库,也没有 Command、不用等 Master-Worker 的交互,WorkerServer 不需任何处理。

    在上述任务处理流程中的 1、2 之间,MasterServer 通过 `MasterExecThread.submitTaskExec()` 为每个 TaskInstance 创建了一个 MasterBaseTaskExecThread 子类来异步处理 TaskInstance。

    因此,可以就在这里创建一个 SkippedTaskExecThread 直接更新任务状态。

    ### 想法 B:在 WorkerServer 端空跑(fake run)该任务

    结果:所有 MasterServer 端的流程都正常进行,只是 WorkerServer 识别 FAKE 标记并做一次空跑。

    WorkerServer 从 TaskExecutionContext 里拿到 SKIPPED 标记,不调用具体的处理器,直接返回成功。

    ## 子目标 3 的设计

    现有的后端接口中,`ProcessDefinitionController` 的 `/save` `/update` 接口都接受一个 `String processDefinitionJson` 字段。该字段通过 `JSONUtils.parseObject()` 产生 `ProcessData`,它有一个 `List<TaskNode> tasks` 成员。

    即:runFlag 字段是通过 JSON 库(fastjson)自动解析的,因此无需额外处理。只要前端传递 `FAKE` 字符串即可。

    额外地,一个可能的改进是增加该字段的有效性检查。


Re: Discuss on fake-run feature ���������������������������

Posted by Pu Hsu <a....@live.com>.
So do I. And this is more in line with the semantic of "fake-run". IMO, fake-run is more like a test method.

Look back to the design "Handled on WorkerServer", is there any drawback to discuss or redesign?

On 2020/07/03 02:10:48, wu shaoj <ga...@apache.org> wrote: 
> I think 'handled on WorkerServer' is better.


Re: Discuss on fake-run feature ���������������������������

Posted by Pu Hsu <a....@live.com>.
The "active/inactive" is most likely to the "skip" in this thread. But "fake run" is required if you want to ensure all MasterServer things to work well.

On 2020/07/03 02:35:39, wu shaoj <ga...@apache.org> wrote: 
> It's better to change 'fake run' to disable or inactive task, I thinks .
> One use case is that a task might can't run before some serious problem resolved, we will disable or inactive it to let the DAG run smoothly.
> Once the problems resolved, the task can be enabled or active and run normally


Re: Discuss on fake-run feature 任务空跑的方案讨论

Posted by wu shaoj <ga...@apache.org>.
It's better to change 'fake run' to disable or inactive task, I thinks .
One use case is that a task might can't run before some serious problem resolved, we will disable or inactive it to let the DAG run smoothly.
Once the problems resolved, the task can be enabled or active and run normally


On 2020/7/3, 10:10, "wu shaoj" <ga...@apache.org> wrote:

    I think 'handled on WorkerServer' is better.

    On 2020/7/3, 07:35, "Pu Hsu" <a....@live.com> wrote:

        Purpose: When we run a process, a task will be regarded as success directly if marked as "fake run".

        Sub-goals:
        1. Marking a task as "fake run"
        2. Handling the fake-run task
        3. Providing the HTTP API

        ## Subgoal 1 : Design

        In the existing design, `org.apache.dolphinscheduler.common.model.TaskNode` stores information about a task in the workflow. The member `String runFlag` in TaskNode is used for running state, and runFlag has two types of `NORMAL` and `FORBIDDEN`. Tracking the references of the runFlag shows that currently only `TaskNode.isForbidden()` used it.

        We're going to add a new `FAKE` type to runFlag.

        ## Subgoal 2 : Design

        The existing running flow of a process is:

        1. MasterServer builds TaskInstance from TaskNode
        2. MasterServer submits TaskInstance to database by MasterBaseTaskExecThread
        3. MasterServer dispatch TaskInstance
        3.1 Construct TaskExecutionContext from TaskInstance
        3.2 Construct Command from TaskExecutionContext
        3.3 Send Command to WorkerServer by NettyExecutorManager
        3.4 Wait for the result
        4. WorkerServer receives Command through NettyRemotingServer, and then processes Command of type `TASK_EXECUTE_REQUEST` and gets TaskExecutionContext from Command by TaskExecuteProcessor
        5. WorkerServer handles TaskExecutionContext by TaskExecuteThread
        6. WorkerServer returns the result to MasterServer by TaskCallbackService

        Firstly, we must pass the `FAKE` mark to TaskInstance when creating TaskInstance from TaskNode.
        After that, there are multiple candidate places to intercept and process the marked task in TaskInstance/TaskExecutionContext:

        ### Idea A : Skip the task (handled on MasterServer)

        Result: MasterServer creates the TaskInstance in memory only, then the task is passed and the process goes on. No submit to db, no Command, no dispatch, no wait, no WorkerServer involved.

        Between step 1 and step 2 in the above flow, the MasterServer creates a subclass of MasterBaseTaskExecThread for each TaskInstance by `MasterExecThread.submitTaskExec()` to process TaskInstance asynchronously.

        So we can get the `FAKE` mark and create a SkippedTaskExecThread here to skip the task and update the task status directly.

        ### Idea B : Just do a fake-run (handled on WorkerServer)

        Result: All regular things on the MasterServer run normally, the WorkerServer detects the FAKE mark and do a fake-run.

        After the WorkerServer gets the FAKE mark from TaskExecutionContext, it doesn't invoke any real handler and returns success directly.

        ## Subgoal 3 : Design

        Among the existing HTTP API, the `/save` and `/update` in `ProcessDefinitionController` accept a `String processDefinitionJson` field. ApiApplicationServer generates a `ProcessData` from it by `JSONUtils.parseObject()`, which has a `List<TaskNode> tasks` member.

        That is: the runFlag is automatically parsed by the JSON library (fastjson), so no additional processing is required. The front-end should send a `FAKE` string here.

        P.S. One possible improvement is to increase the validity check of this field.

        ---

        目的:标记特定任务为“空跑(fake-run)”,使其不执行具体任务而直接视为运行成功,继续任务流。

        子目标:
        1. 标记空跑任务
        2. 处理空跑任务
        3. 提供后端接口

        ## 子目标 1 的设计

        现有的设计中,`org.apache.dolphinscheduler.common.model.TaskNode` 存储了工作流中的一项任务的信息。其中,`String runFlag` 意为运行状态,目前有 `NORMAL` 和 `FORBIDDEN` 两种。追踪 runFlag 成员的引用可知,目前实际只有 `TaskNode.isForbidden()` 里用到了。

        本方案添加名为 `FAKE` 的新类型。

        ## 子目标 2 的设计

        现有的任务处理流程是:

        1. MasterServer 从 TaskNode 构建 TaskInstance
        2. MasterServer 通过 MasterBaseTaskExecThread 提交 TaskInstance 到数据库
        3. MasterServer 分派 TaskInstance
        3.1 从 TaskInstance 构建 TaskExecutionContext
        3.2 从 TaskExecutionContext 构建 Command
        3.3 MasterServer 通过 NettyExecutorManager 向 WorkerServer 发送 Command
        3.4 MasterServer 等待并接收结果
        4. WorkerServer 通过 NettyRemotingServer 接收 Command,其中 TaskExecuteProcessor 处理 `TASK_EXECUTE_REQUEST` 类型的 Command,并从 Command 中拿到 TaskExecutionContext
        5. WorkerServer 由 TaskExecuteThread 处理 TaskExecutionContext
        6. WorkerServer 通过 TaskCallbackService 向 MasterServer 返回结果

        首先,在从 TaskNode 创建 TaskInstance 时将 FAKE 标记到 TaskInstance 上。
        之后,有多个地方可以拦截并处理 TaskInstance/TaskExecutionContext 里的标记:

        ### 想法 A:在 MasterServer 端即跳过(skip)该任务

        结果:只有在 MasterServer 端做处理,创建了 TaskInstance 后即可结束流程处理下一个 Task。TaskInstance 不写数据库,也没有 Command、不用等 Master-Worker 的交互,WorkerServer 不需任何处理。

        在上述任务处理流程中的 1、2 之间,MasterServer 通过 `MasterExecThread.submitTaskExec()` 为每个 TaskInstance 创建了一个 MasterBaseTaskExecThread 子类来异步处理 TaskInstance。

        因此,可以就在这里创建一个 SkippedTaskExecThread 直接更新任务状态。

        ### 想法 B:在 WorkerServer 端空跑(fake run)该任务

        结果:所有 MasterServer 端的流程都正常进行,只是 WorkerServer 识别 FAKE 标记并做一次空跑。

        WorkerServer 从 TaskExecutionContext 里拿到 SKIPPED 标记,不调用具体的处理器,直接返回成功。

        ## 子目标 3 的设计

        现有的后端接口中,`ProcessDefinitionController` 的 `/save` `/update` 接口都接受一个 `String processDefinitionJson` 字段。该字段通过 `JSONUtils.parseObject()` 产生 `ProcessData`,它有一个 `List<TaskNode> tasks` 成员。

        即:runFlag 字段是通过 JSON 库(fastjson)自动解析的,因此无需额外处理。只要前端传递 `FAKE` 字符串即可。

        额外地,一个可能的改进是增加该字段的有效性检查。