You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2021/12/22 03:46:43 UTC
[dolphinscheduler] branch dev updated: [python] Add task switch (#7531)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 946a0c7 [python] Add task switch (#7531)
946a0c7 is described below
commit 946a0c7c5768506e7ca92a21e7aed6ad5aa60871
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Wed Dec 22 11:46:34 2021 +0800
[python] Add task switch (#7531)
* [python] Add task switch
close: #6928
* Fix code style
---
.../examples/task_switch_example.py | 51 ++++
.../src/pydolphinscheduler/constants.py | 1 +
.../src/pydolphinscheduler/tasks/switch.py | 158 +++++++++++
.../pydolphinscheduler/tests/tasks/test_switch.py | 300 +++++++++++++++++++++
4 files changed, 510 insertions(+)
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
new file mode 100644
index 0000000..418d569
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+r"""
+A example workflow for task switch.
+
+This example will create four task in single workflow, with three shell task and one switch task. Task switch
+have one upstream which we declare explicit with syntax `parent >> switch`, and two downstream automatically
+set dependence by switch task by passing parameter `condition`. The graph of this workflow like:
+ --> switch_child_1
+ /
+parent -> switch ->
+ \
+ --> switch_child_2
+.
+"""
+
+from tasks.switch import Branch, Default, Switch, SwitchCondition
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.shell import Shell
+
+with ProcessDefinition(
+ name="task_dependent_external",
+ tenant="tenant_exists",
+) as pd:
+ parent = Shell(name="parent", command="echo parent")
+ switch_child_1 = Shell(name="switch_child_1", command="echo switch_child_1")
+ switch_child_2 = Shell(name="switch_child_2", command="echo switch_child_2")
+ switch_condition = SwitchCondition(
+ Branch(condition="${var} > 1", task=switch_child_1),
+ Default(task=switch_child_2),
+ )
+
+ switch = Switch(name="switch", condition=switch_condition)
+ parent >> switch
+ pd.submit()
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 65ab6ca..940c749 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -76,6 +76,7 @@ class TaskType(str):
DATAX = "DATAX"
DEPENDENT = "DEPENDENT"
CONDITIONS = "CONDITIONS"
+ SWITCH = "SWITCH"
class DefaultTaskCodeNum(str):
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py
new file mode 100644
index 0000000..28032f8
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task Switch."""
+
+from typing import Dict, Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.base import Base
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSParamException
+
+
+class SwitchBranch(Base):
+ """Base class of ConditionBranch of task switch.
+
+ It a parent class for :class:`Branch` and :class:`Default`.
+ """
+
+ _DEFINE_ATTR = {
+ "next_node",
+ }
+
+ def __init__(self, task: Task, exp: Optional[str] = None):
+ super().__init__(f"Switch.{self.__class__.__name__.upper()}")
+ self.task = task
+ self.exp = exp
+
+ @property
+ def next_node(self) -> str:
+ """Get task switch property next_node, it return task code when init class switch."""
+ return self.task.code
+
+ @property
+ def condition(self) -> Optional[str]:
+ """Get task switch property condition."""
+ return self.exp
+
+ def get_define(self, camel_attr: bool = True) -> Dict:
+ """Get :class:`ConditionBranch` definition attribute communicate to Java gateway server."""
+ if self.condition:
+ self._DEFINE_ATTR.add("condition")
+ return super().get_define()
+
+
+class Branch(SwitchBranch):
+ """Common condition branch for switch task.
+
+ If any condition in :class:`Branch` match, would set this :class:`Branch`'s task as downstream of task
+ switch. If all condition branch do not match would set :class:`Default`'s task as task switch downstream.
+ """
+
+ def __init__(self, condition: str, task: Task):
+ super().__init__(task, condition)
+
+
+class Default(SwitchBranch):
+ """Class default branch for switch task.
+
+ If all condition of :class:`Branch` do not match, task switch would run the tasks in :class:`Default`
+ and set :class:`Default`'s task as switch downstream. Please notice that each switch condition
+ could only have one single :class:`Default`.
+ """
+
+ def __init__(self, task: Task):
+ super().__init__(task)
+
+
+class SwitchCondition(Base):
+ """Set switch condition of given parameter."""
+
+ _DEFINE_ATTR = {
+ "depend_task_list",
+ }
+
+ def __init__(self, *args):
+ super().__init__(self.__class__.__name__)
+ self.args = args
+
+ def set_define_attr(self) -> None:
+ """Set attribute to function :func:`get_define`.
+
+ It is a wrapper for both `And` and `Or` operator.
+ """
+ result = []
+ num_branch_default = 0
+ for condition in self.args:
+ if isinstance(condition, SwitchBranch):
+ if num_branch_default < 1:
+ if isinstance(condition, Default):
+ self._DEFINE_ATTR.add("next_node")
+ setattr(self, "next_node", condition.next_node)
+ num_branch_default += 1
+ elif isinstance(condition, Branch):
+ result.append(condition.get_define())
+ else:
+ raise PyDSParamException(
+ "Task Switch's parameter only support exactly one default branch."
+ )
+ else:
+ raise PyDSParamException(
+ "Task Switch's parameter only support SwitchBranch but got %s.",
+ type(condition),
+ )
+ # Handle switch default branch, default value is `""` if not provide.
+ if num_branch_default == 0:
+ self._DEFINE_ATTR.add("next_node")
+ setattr(self, "next_node", "")
+ setattr(self, "depend_task_list", result)
+
+ def get_define(self, camel_attr=True) -> Dict:
+ """Overwrite Base.get_define to get task Condition specific get define."""
+ self.set_define_attr()
+ return super().get_define()
+
+
+class Switch(Task):
+ """Task switch object, declare behavior for switch task to dolphinscheduler."""
+
+ def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs):
+ super().__init__(name, TaskType.SWITCH, *args, **kwargs)
+ self.condition = condition
+ # Set condition tasks as current task downstream
+ self._set_dep()
+
+ def _set_dep(self) -> None:
+ """Set downstream according to parameter `condition`."""
+ downstream = []
+ for condition in self.condition.args:
+ if isinstance(condition, SwitchBranch):
+ downstream.append(condition.task)
+ self.set_downstream(downstream)
+
+ @property
+ def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
+ """Override Task.task_params for switch task.
+
+ switch task have some specials attribute `switch`, and in most of the task
+ this attribute is None and use empty dict `{}` as default value. We do not use class
+ attribute `_task_custom_attr` due to avoid attribute cover.
+ """
+ params = super().task_params
+ params["switchResult"] = self.condition.get_define()
+ return params
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py
new file mode 100644
index 0000000..1f6ff5b
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_switch.py
@@ -0,0 +1,300 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task switch."""
+
+from typing import Optional, Tuple
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.tasks.switch import (
+ Branch,
+ Default,
+ Switch,
+ SwitchBranch,
+ SwitchCondition,
+)
+from tests.testing.task import Task
+
+TEST_NAME = "test-task"
+TEST_TYPE = "test-type"
+
+
+def task_switch_arg_wrapper(obj, task: Task, exp: Optional[str] = None) -> SwitchBranch:
+ """Wrap task switch and its subclass."""
+ if obj is Default:
+ return obj(task)
+ elif obj is Branch:
+ return obj(exp, task)
+ else:
+ return obj(task, exp)
+
+
+@pytest.mark.parametrize(
+ "obj",
+ [
+ SwitchBranch,
+ Branch,
+ Default,
+ ],
+)
+def test_switch_branch_attr_next_node(obj: SwitchBranch):
+ """Test get attribute from class switch branch."""
+ task = Task(name=TEST_NAME, task_type=TEST_TYPE)
+ switch_branch = task_switch_arg_wrapper(obj, task=task, exp="unittest")
+ assert switch_branch.next_node == task.code
+
+
+@pytest.mark.parametrize(
+ "obj",
+ [
+ SwitchBranch,
+ Default,
+ ],
+)
+def test_switch_branch_get_define_without_condition(obj: SwitchBranch):
+ """Test function :func:`get_define` with None value of attribute condition from class switch branch."""
+ task = Task(name=TEST_NAME, task_type=TEST_TYPE)
+ expect = {"nextNode": task.code}
+ switch_branch = task_switch_arg_wrapper(obj, task=task)
+ assert switch_branch.get_define() == expect
+
+
+@pytest.mark.parametrize(
+ "obj",
+ [
+ SwitchBranch,
+ Branch,
+ ],
+)
+def test_switch_branch_get_define_condition(obj: SwitchBranch):
+ """Test function :func:`get_define` with specific attribute condition from class switch branch."""
+ task = Task(name=TEST_NAME, task_type=TEST_TYPE)
+ exp = "${var} == 1"
+ expect = {
+ "nextNode": task.code,
+ "condition": exp,
+ }
+ switch_branch = task_switch_arg_wrapper(obj, task=task, exp=exp)
+ assert switch_branch.get_define() == expect
+
+
+@pytest.mark.parametrize(
+ "args, msg",
+ [
+ (
+ (1,),
+ ".*?parameter only support SwitchBranch but got.*?",
+ ),
+ (
+ (Default(Task(TEST_NAME, TEST_TYPE)), 2),
+ ".*?parameter only support SwitchBranch but got.*?",
+ ),
+ (
+ (Default(Task(TEST_NAME, TEST_TYPE)), Default(Task(TEST_NAME, TEST_TYPE))),
+ ".*?parameter only support exactly one default branch",
+ ),
+ (
+ (
+ Branch(condition="unittest", task=Task(TEST_NAME, TEST_TYPE)),
+ Default(Task(TEST_NAME, TEST_TYPE)),
+ Default(Task(TEST_NAME, TEST_TYPE)),
+ ),
+ ".*?parameter only support exactly one default branch",
+ ),
+ ],
+)
+def test_switch_condition_set_define_attr_error(args: Tuple, msg: str):
+ """Test error case on :class:`SwitchCondition`."""
+ switch_condition = SwitchCondition(*args)
+ with pytest.raises(PyDSParamException, match=msg):
+ switch_condition.set_define_attr()
+
+
+def test_switch_condition_set_define_attr_default():
+ """Test set :class:`Default` to attribute on :class:`SwitchCondition`."""
+ task = Task(TEST_NAME, TEST_TYPE)
+ switch_condition = SwitchCondition(Default(task))
+ switch_condition.set_define_attr()
+ assert getattr(switch_condition, "next_node") == task.code
+ assert getattr(switch_condition, "depend_task_list") == []
+
+
+def test_switch_condition_set_define_attr_branch():
+ """Test set :class:`Branch` to attribute on :class:`SwitchCondition`."""
+ task = Task(TEST_NAME, TEST_TYPE)
+ switch_condition = SwitchCondition(
+ Branch("unittest1", task), Branch("unittest2", task)
+ )
+ expect = [
+ {"condition": "unittest1", "nextNode": task.code},
+ {"condition": "unittest2", "nextNode": task.code},
+ ]
+
+ switch_condition.set_define_attr()
+ assert getattr(switch_condition, "next_node") == ""
+ assert getattr(switch_condition, "depend_task_list") == expect
+
+
+def test_switch_condition_set_define_attr_mix_branch_and_default():
+ """Test set bot :class:`Branch` and :class:`Default` to attribute on :class:`SwitchCondition`."""
+ task = Task(TEST_NAME, TEST_TYPE)
+ switch_condition = SwitchCondition(
+ Branch("unittest1", task), Branch("unittest2", task), Default(task)
+ )
+ expect = [
+ {"condition": "unittest1", "nextNode": task.code},
+ {"condition": "unittest2", "nextNode": task.code},
+ ]
+
+ switch_condition.set_define_attr()
+ assert getattr(switch_condition, "next_node") == task.code
+ assert getattr(switch_condition, "depend_task_list") == expect
+
+
+def test_switch_condition_get_define_default():
+ """Test function :func:`get_define` with :class:`Default` in :class:`SwitchCondition`."""
+ task = Task(TEST_NAME, TEST_TYPE)
+ switch_condition = SwitchCondition(Default(task))
+ expect = {
+ "dependTaskList": [],
+ "nextNode": task.code,
+ }
+ assert switch_condition.get_define() == expect
+
+
+def test_switch_condition_get_define_branch():
+ """Test function :func:`get_define` with :class:`Branch` in :class:`SwitchCondition`."""
+ task = Task(TEST_NAME, TEST_TYPE)
+ switch_condition = SwitchCondition(
+ Branch("unittest1", task), Branch("unittest2", task)
+ )
+ expect = {
+ "dependTaskList": [
+ {"condition": "unittest1", "nextNode": task.code},
+ {"condition": "unittest2", "nextNode": task.code},
+ ],
+ "nextNode": "",
+ }
+ assert switch_condition.get_define() == expect
+
+
+def test_switch_condition_get_define_mix_branch_and_default():
+ """Test function :func:`get_define` with both :class:`Branch` and :class:`Default`."""
+ task = Task(TEST_NAME, TEST_TYPE)
+ switch_condition = SwitchCondition(
+ Branch("unittest1", task), Branch("unittest2", task), Default(task)
+ )
+ expect = {
+ "dependTaskList": [
+ {"condition": "unittest1", "nextNode": task.code},
+ {"condition": "unittest2", "nextNode": task.code},
+ ],
+ "nextNode": task.code,
+ }
+ assert switch_condition.get_define() == expect
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_switch_get_define(mock_task_code_version):
+ """Test task switch :func:`get_define`."""
+ task = Task(name=TEST_NAME, task_type=TEST_TYPE)
+ switch_condition = SwitchCondition(
+ Branch(condition="${var1} > 1", task=task),
+ Branch(condition="${var1} <= 1", task=task),
+ Default(task),
+ )
+
+ name = "test_switch_get_define"
+ expect = {
+ "code": 123,
+ "name": name,
+ "version": 1,
+ "description": None,
+ "delayTime": 0,
+ "taskType": "SWITCH",
+ "taskParams": {
+ "resourceList": [],
+ "localParams": [],
+ "dependence": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ "waitStartTimeout": {},
+ "switchResult": {
+ "dependTaskList": [
+ {"condition": "${var1} > 1", "nextNode": task.code},
+ {"condition": "${var1} <= 1", "nextNode": task.code},
+ ],
+ "nextNode": task.code,
+ },
+ },
+ "flag": "YES",
+ "taskPriority": "MEDIUM",
+ "workerGroup": "default",
+ "failRetryTimes": 0,
+ "failRetryInterval": 1,
+ "timeoutFlag": "CLOSE",
+ "timeoutNotifyStrategy": None,
+ "timeout": 0,
+ }
+
+ task = Switch(name, condition=switch_condition)
+ assert task.get_define() == expect
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_switch_set_dep_workflow(mock_task_code_version):
+ """Test task switch set dependence in workflow level."""
+ with ProcessDefinition(name="test-switch-set-dep-workflow") as pd:
+ parent = Task(name="parent", task_type=TEST_TYPE)
+ switch_child_1 = Task(name="switch_child_1", task_type=TEST_TYPE)
+ switch_child_2 = Task(name="switch_child_2", task_type=TEST_TYPE)
+ switch_condition = SwitchCondition(
+ Branch(condition="${var} > 1", task=switch_child_1),
+ Default(task=switch_child_2),
+ )
+
+ switch = Switch(name=TEST_NAME, condition=switch_condition)
+ parent >> switch
+ # General tasks test
+ assert len(pd.tasks) == 4
+ assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
+ [parent, switch, switch_child_1, switch_child_2], key=lambda t: t.name
+ )
+ # Task dep test
+ assert parent._downstream_task_codes == {switch.code}
+ assert switch._upstream_task_codes == {parent.code}
+
+ # Switch task dep after ProcessDefinition function get_define called
+ assert switch._downstream_task_codes == {
+ switch_child_1.code,
+ switch_child_2.code,
+ }
+ assert all(
+ [
+ child._upstream_task_codes == {switch.code}
+ for child in [switch_child_1, switch_child_2]
+ ]
+ )