You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/05/16 12:18:33 UTC
[dolphinscheduler] 15/15: [python] Fix pre_submit_check error when local params in tasks (#10032)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch 3.0.0-beta-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 1b4036e2ce55ae62575a6dc601cccecf3c1f832c
Author: 陈家名 <13...@163.com>
AuthorDate: Mon May 16 19:51:41 2022 +0800
[python] Fix pre_submit_check error when local params in tasks (#10032)
(cherry picked from commit a2cd2a99e65c3a85bdebfc37e1465e98b2d4431a)
---
.../pydolphinscheduler/core/process_definition.py | 6 +++--
.../src/pydolphinscheduler/tasks/switch.py | 6 ++++-
.../tests/core/test_process_definition.py | 29 +++++++++++++++++++++-
3 files changed, 37 insertions(+), 4 deletions(-)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index 649e2ce51a..dbf2c41795 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -350,14 +350,16 @@ class ProcessDefinition(Base):
This method should be called before process definition submit to java gateway
For now, we have below checker:
- * `self.param` should be set if task `switch` in this workflow.
+ * `self.param` or at least one local param of task should be set if task `switch` in this workflow.
"""
if (
any([task.task_type == TaskType.SWITCH for task in self.tasks.values()])
and self.param is None
+ and all([len(task.local_params) == 0 for task in self.tasks.values()])
):
raise PyDSParamException(
- "Parameter param must be provider if task Switch in process definition."
+ "Parameter param or at least one local_param of task must "
+ "be provider if task Switch in process definition."
)
def submit(self) -> int:
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py
index 28032f88e7..0c9a2b82b3 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py
@@ -129,7 +129,11 @@ class SwitchCondition(Base):
class Switch(Task):
- """Task switch object, declare behavior for switch task to dolphinscheduler."""
+ """Task switch object, declare behavior for switch task to dolphinscheduler.
+
+ Param of process definition or at least one local param of task must be set
+ if task `switch` in this workflow.
+ """
def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs):
super().__init__(name, TaskType.SWITCH, *args, **kwargs)
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
index 88028f72c7..36e1cb035e 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
@@ -240,11 +240,38 @@ def test__pre_submit_check_switch_without_param(mock_code_version):
parent >> switch
with pytest.raises(
PyDSParamException,
- match="Parameter param must be provider if task Switch in process definition.",
+ match="Parameter param or at least one local_param of task must "
+ "be provider if task Switch in process definition.",
):
pd._pre_submit_check()
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test__pre_submit_check_switch_with_local_params(mock_code_version):
+ """Test :func:`_pre_submit_check` if process definition with switch with local params of task."""
+ with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+ parent = Task(
+ name="parent",
+ task_type=TEST_TASK_TYPE,
+ local_params=[
+ {"prop": "var", "direct": "OUT", "type": "VARCHAR", "value": ""}
+ ],
+ )
+ switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
+ switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
+ switch_condition = SwitchCondition(
+ Branch(condition="${var} > 1", task=switch_child_1),
+ Default(task=switch_child_2),
+ )
+
+ switch = Switch(name="switch", condition=switch_condition)
+ parent >> switch
+ pd._pre_submit_check()
+
+
def test_process_definition_get_define_without_task():
"""Test process definition function get_define without task."""
expect = {