You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2021/12/22 03:46:43 UTC

[dolphinscheduler] branch dev updated: [python] Add task switch (#7531)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 946a0c7  [python] Add task switch (#7531)
946a0c7 is described below

commit 946a0c7c5768506e7ca92a21e7aed6ad5aa60871
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Wed Dec 22 11:46:34 2021 +0800

    [python] Add task switch (#7531)
    
    * [python] Add task switch
    
    close: #6928
    
    * Fix code style
---
 .../examples/task_switch_example.py                |  51 ++++
 .../src/pydolphinscheduler/constants.py            |   1 +
 .../src/pydolphinscheduler/tasks/switch.py         | 158 +++++++++++
 .../pydolphinscheduler/tests/tasks/test_switch.py  | 300 +++++++++++++++++++++
 4 files changed, 510 insertions(+)

diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
new file mode 100644
index 0000000..418d569
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+r"""
+A example workflow for task switch.
+
+This example will create four task in single workflow, with three shell task and one switch task. Task switch
+have one upstream which we declare explicit with syntax `parent >> switch`, and two downstream automatically
+set dependence by switch task by passing parameter `condition`. The graph of this workflow like:
+                      --> switch_child_1
+                    /
+parent -> switch ->
+                    \
+                      --> switch_child_2
+.
+"""
+
+from tasks.switch import Branch, Default, Switch, SwitchCondition
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.shell import Shell
+
+with ProcessDefinition(
+    name="task_dependent_external",
+    tenant="tenant_exists",
+) as pd:
+    parent = Shell(name="parent", command="echo parent")
+    switch_child_1 = Shell(name="switch_child_1", command="echo switch_child_1")
+    switch_child_2 = Shell(name="switch_child_2", command="echo switch_child_2")
+    switch_condition = SwitchCondition(
+        Branch(condition="${var} > 1", task=switch_child_1),
+        Default(task=switch_child_2),
+    )
+
+    switch = Switch(name="switch", condition=switch_condition)
+    parent >> switch
+    pd.submit()
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 65ab6ca..940c749 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -76,6 +76,7 @@ class TaskType(str):
     DATAX = "DATAX"
     DEPENDENT = "DEPENDENT"
     CONDITIONS = "CONDITIONS"
+    SWITCH = "SWITCH"
 
 
 class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py
new file mode 100644
index 0000000..28032f8
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task Switch."""
+
+from typing import Dict, Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.base import Base
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSParamException
+
+
+class SwitchBranch(Base):
+    """Base class of ConditionBranch of task switch.
+
+    It a parent class for :class:`Branch` and :class:`Default`.
+    """
+
+    _DEFINE_ATTR = {
+        "next_node",
+    }
+
+    def __init__(self, task: Task, exp: Optional[str] = None):
+        super().__init__(f"Switch.{self.__class__.__name__.upper()}")
+        self.task = task
+        self.exp = exp
+
+    @property
+    def next_node(self) -> str:
+        """Get task switch property next_node, it return task code when init class switch."""
+        return self.task.code
+
+    @property
+    def condition(self) -> Optional[str]:
+        """Get task switch property condition."""
+        return self.exp
+
+    def get_define(self, camel_attr: bool = True) -> Dict:
+        """Get :class:`ConditionBranch` definition attribute communicate to Java gateway server."""
+        if self.condition:
+            self._DEFINE_ATTR.add("condition")
+        return super().get_define()
+
+
+class Branch(SwitchBranch):
+    """Common condition branch for switch task.
+
+    If any condition in :class:`Branch` match, would set this :class:`Branch`'s task as downstream of task
+    switch. If all condition branch do not match would set :class:`Default`'s task as task switch downstream.
+    """
+
+    def __init__(self, condition: str, task: Task):
+        super().__init__(task, condition)
+
+
+class Default(SwitchBranch):
+    """Class default branch for switch task.
+
+    If all condition of :class:`Branch` do not match, task switch would run the tasks in :class:`Default`
+    and set :class:`Default`'s task as switch downstream. Please notice that each switch condition
+    could only have one single :class:`Default`.
+    """
+
+    def __init__(self, task: Task):
+        super().__init__(task)
+
+
+class SwitchCondition(Base):
+    """Set switch condition of given parameter."""
+
+    _DEFINE_ATTR = {
+        "depend_task_list",
+    }
+
+    def __init__(self, *args):
+        super().__init__(self.__class__.__name__)
+        self.args = args
+
+    def set_define_attr(self) -> None:
+        """Set attribute to function :func:`get_define`.
+
+        It is a wrapper for both `And` and `Or` operator.
+        """
+        result = []
+        num_branch_default = 0
+        for condition in self.args:
+            if isinstance(condition, SwitchBranch):
+                if num_branch_default < 1:
+                    if isinstance(condition, Default):
+                        self._DEFINE_ATTR.add("next_node")
+                        setattr(self, "next_node", condition.next_node)
+                        num_branch_default += 1
+                    elif isinstance(condition, Branch):
+                        result.append(condition.get_define())
+                else:
+                    raise PyDSParamException(
+                        "Task Switch's parameter only support exactly one default branch."
+                    )
+            else:
+                raise PyDSParamException(
+                    "Task Switch's parameter only support SwitchBranch but got %s.",
+                    type(condition),
+                )
+        # Handle switch default branch, default value is `""` if not provide.
+        if num_branch_default == 0:
+            self._DEFINE_ATTR.add("next_node")
+            setattr(self, "next_node", "")
+        setattr(self, "depend_task_list", result)
+
+    def get_define(self, camel_attr=True) -> Dict:
+        """Overwrite Base.get_define to get task Condition specific get define."""
+        self.set_define_attr()
+        return super().get_define()
+
+
+class Switch(Task):
+    """Task switch object, declare behavior for switch task to dolphinscheduler."""
+
+    def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs):
+        super().__init__(name, TaskType.SWITCH, *args, **kwargs)
+        self.condition = condition
+        # Set condition tasks as current task downstream
+        self._set_dep()
+
+    def _set_dep(self) -> None:
+        """Set downstream according to parameter `condition`."""
+        downstream = []
+        for condition in self.condition.args:
+            if isinstance(condition, SwitchBranch):
+                downstream.append(condition.task)
+        self.set_downstream(downstream)
+
+    @property
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
+        """Override Task.task_params for switch task.
+
+        switch task have some specials attribute `switch`, and in most of the task
+        this attribute is None and use empty dict `{}` as default value. We do not use class
+        attribute `_task_custom_attr` due to avoid attribute cover.
+        """
+        params = super().task_params
+        params["switchResult"] = self.condition.get_define()
+        return params
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py
new file mode 100644
index 0000000..1f6ff5b
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py
@@ -0,0 +1,300 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task switch."""
+
+from typing import Optional, Tuple
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.tasks.switch import (
+    Branch,
+    Default,
+    Switch,
+    SwitchBranch,
+    SwitchCondition,
+)
+from tests.testing.task import Task
+
+TEST_NAME = "test-task"
+TEST_TYPE = "test-type"
+
+
+def task_switch_arg_wrapper(obj, task: Task, exp: Optional[str] = None) -> SwitchBranch:
+    """Wrap task switch and its subclass."""
+    if obj is Default:
+        return obj(task)
+    elif obj is Branch:
+        return obj(exp, task)
+    else:
+        return obj(task, exp)
+
+
+@pytest.mark.parametrize(
+    "obj",
+    [
+        SwitchBranch,
+        Branch,
+        Default,
+    ],
+)
+def test_switch_branch_attr_next_node(obj: SwitchBranch):
+    """Test get attribute from class switch branch."""
+    task = Task(name=TEST_NAME, task_type=TEST_TYPE)
+    switch_branch = task_switch_arg_wrapper(obj, task=task, exp="unittest")
+    assert switch_branch.next_node == task.code
+
+
+@pytest.mark.parametrize(
+    "obj",
+    [
+        SwitchBranch,
+        Default,
+    ],
+)
+def test_switch_branch_get_define_without_condition(obj: SwitchBranch):
+    """Test function :func:`get_define` with None value of attribute condition from class switch branch."""
+    task = Task(name=TEST_NAME, task_type=TEST_TYPE)
+    expect = {"nextNode": task.code}
+    switch_branch = task_switch_arg_wrapper(obj, task=task)
+    assert switch_branch.get_define() == expect
+
+
+@pytest.mark.parametrize(
+    "obj",
+    [
+        SwitchBranch,
+        Branch,
+    ],
+)
+def test_switch_branch_get_define_condition(obj: SwitchBranch):
+    """Test function :func:`get_define` with specific attribute condition from class switch branch."""
+    task = Task(name=TEST_NAME, task_type=TEST_TYPE)
+    exp = "${var} == 1"
+    expect = {
+        "nextNode": task.code,
+        "condition": exp,
+    }
+    switch_branch = task_switch_arg_wrapper(obj, task=task, exp=exp)
+    assert switch_branch.get_define() == expect
+
+
+@pytest.mark.parametrize(
+    "args, msg",
+    [
+        (
+            (1,),
+            ".*?parameter only support SwitchBranch but got.*?",
+        ),
+        (
+            (Default(Task(TEST_NAME, TEST_TYPE)), 2),
+            ".*?parameter only support SwitchBranch but got.*?",
+        ),
+        (
+            (Default(Task(TEST_NAME, TEST_TYPE)), Default(Task(TEST_NAME, TEST_TYPE))),
+            ".*?parameter only support exactly one default branch",
+        ),
+        (
+            (
+                Branch(condition="unittest", task=Task(TEST_NAME, TEST_TYPE)),
+                Default(Task(TEST_NAME, TEST_TYPE)),
+                Default(Task(TEST_NAME, TEST_TYPE)),
+            ),
+            ".*?parameter only support exactly one default branch",
+        ),
+    ],
+)
+def test_switch_condition_set_define_attr_error(args: Tuple, msg: str):
+    """Test error case on :class:`SwitchCondition`."""
+    switch_condition = SwitchCondition(*args)
+    with pytest.raises(PyDSParamException, match=msg):
+        switch_condition.set_define_attr()
+
+
+def test_switch_condition_set_define_attr_default():
+    """Test set :class:`Default` to attribute on :class:`SwitchCondition`."""
+    task = Task(TEST_NAME, TEST_TYPE)
+    switch_condition = SwitchCondition(Default(task))
+    switch_condition.set_define_attr()
+    assert getattr(switch_condition, "next_node") == task.code
+    assert getattr(switch_condition, "depend_task_list") == []
+
+
+def test_switch_condition_set_define_attr_branch():
+    """Test set :class:`Branch` to attribute on :class:`SwitchCondition`."""
+    task = Task(TEST_NAME, TEST_TYPE)
+    switch_condition = SwitchCondition(
+        Branch("unittest1", task), Branch("unittest2", task)
+    )
+    expect = [
+        {"condition": "unittest1", "nextNode": task.code},
+        {"condition": "unittest2", "nextNode": task.code},
+    ]
+
+    switch_condition.set_define_attr()
+    assert getattr(switch_condition, "next_node") == ""
+    assert getattr(switch_condition, "depend_task_list") == expect
+
+
+def test_switch_condition_set_define_attr_mix_branch_and_default():
+    """Test set bot :class:`Branch` and :class:`Default` to attribute on :class:`SwitchCondition`."""
+    task = Task(TEST_NAME, TEST_TYPE)
+    switch_condition = SwitchCondition(
+        Branch("unittest1", task), Branch("unittest2", task), Default(task)
+    )
+    expect = [
+        {"condition": "unittest1", "nextNode": task.code},
+        {"condition": "unittest2", "nextNode": task.code},
+    ]
+
+    switch_condition.set_define_attr()
+    assert getattr(switch_condition, "next_node") == task.code
+    assert getattr(switch_condition, "depend_task_list") == expect
+
+
+def test_switch_condition_get_define_default():
+    """Test function :func:`get_define` with :class:`Default` in :class:`SwitchCondition`."""
+    task = Task(TEST_NAME, TEST_TYPE)
+    switch_condition = SwitchCondition(Default(task))
+    expect = {
+        "dependTaskList": [],
+        "nextNode": task.code,
+    }
+    assert switch_condition.get_define() == expect
+
+
+def test_switch_condition_get_define_branch():
+    """Test function :func:`get_define` with :class:`Branch` in :class:`SwitchCondition`."""
+    task = Task(TEST_NAME, TEST_TYPE)
+    switch_condition = SwitchCondition(
+        Branch("unittest1", task), Branch("unittest2", task)
+    )
+    expect = {
+        "dependTaskList": [
+            {"condition": "unittest1", "nextNode": task.code},
+            {"condition": "unittest2", "nextNode": task.code},
+        ],
+        "nextNode": "",
+    }
+    assert switch_condition.get_define() == expect
+
+
+def test_switch_condition_get_define_mix_branch_and_default():
+    """Test function :func:`get_define` with both :class:`Branch` and :class:`Default`."""
+    task = Task(TEST_NAME, TEST_TYPE)
+    switch_condition = SwitchCondition(
+        Branch("unittest1", task), Branch("unittest2", task), Default(task)
+    )
+    expect = {
+        "dependTaskList": [
+            {"condition": "unittest1", "nextNode": task.code},
+            {"condition": "unittest2", "nextNode": task.code},
+        ],
+        "nextNode": task.code,
+    }
+    assert switch_condition.get_define() == expect
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test_switch_get_define(mock_task_code_version):
+    """Test task switch :func:`get_define`."""
+    task = Task(name=TEST_NAME, task_type=TEST_TYPE)
+    switch_condition = SwitchCondition(
+        Branch(condition="${var1} > 1", task=task),
+        Branch(condition="${var1} <= 1", task=task),
+        Default(task),
+    )
+
+    name = "test_switch_get_define"
+    expect = {
+        "code": 123,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "SWITCH",
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+            "switchResult": {
+                "dependTaskList": [
+                    {"condition": "${var1} > 1", "nextNode": task.code},
+                    {"condition": "${var1} <= 1", "nextNode": task.code},
+                ],
+                "nextNode": task.code,
+            },
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+
+    task = Switch(name, condition=switch_condition)
+    assert task.get_define() == expect
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test_switch_set_dep_workflow(mock_task_code_version):
+    """Test task switch set dependence in workflow level."""
+    with ProcessDefinition(name="test-switch-set-dep-workflow") as pd:
+        parent = Task(name="parent", task_type=TEST_TYPE)
+        switch_child_1 = Task(name="switch_child_1", task_type=TEST_TYPE)
+        switch_child_2 = Task(name="switch_child_2", task_type=TEST_TYPE)
+        switch_condition = SwitchCondition(
+            Branch(condition="${var} > 1", task=switch_child_1),
+            Default(task=switch_child_2),
+        )
+
+        switch = Switch(name=TEST_NAME, condition=switch_condition)
+        parent >> switch
+        # General tasks test
+        assert len(pd.tasks) == 4
+        assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
+            [parent, switch, switch_child_1, switch_child_2], key=lambda t: t.name
+        )
+        # Task dep test
+        assert parent._downstream_task_codes == {switch.code}
+        assert switch._upstream_task_codes == {parent.code}
+
+        # Switch task dep after ProcessDefinition function get_define called
+        assert switch._downstream_task_codes == {
+            switch_child_1.code,
+            switch_child_2.code,
+        }
+        assert all(
+            [
+                child._upstream_task_codes == {switch.code}
+                for child in [switch_child_1, switch_child_2]
+            ]
+        )