You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/05/16 11:51:52 UTC

[dolphinscheduler] branch dev updated: [python] Fix pre_submit_check error when local params in tasks (#10032)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new a2cd2a99e6 [python] Fix pre_submit_check error when local params in tasks (#10032)
a2cd2a99e6 is described below

commit a2cd2a99e65c3a85bdebfc37e1465e98b2d4431a
Author: 陈家名 <13...@163.com>
AuthorDate: Mon May 16 19:51:41 2022 +0800

    [python] Fix pre_submit_check error when local params in tasks (#10032)
---
 .../pydolphinscheduler/core/process_definition.py  |  6 +++--
 .../src/pydolphinscheduler/tasks/switch.py         |  6 ++++-
 .../tests/core/test_process_definition.py          | 29 +++++++++++++++++++++-
 3 files changed, 37 insertions(+), 4 deletions(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index 649e2ce51a..dbf2c41795 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -350,14 +350,16 @@ class ProcessDefinition(Base):
 
         This method should be called before process definition submit to java gateway
         For now, we have below checker:
-        * `self.param` should be set if task `switch` in this workflow.
+        * `self.param` or at least one local param of task should be set if task `switch` in this workflow.
         """
         if (
             any([task.task_type == TaskType.SWITCH for task in self.tasks.values()])
             and self.param is None
+            and all([len(task.local_params) == 0 for task in self.tasks.values()])
         ):
             raise PyDSParamException(
-                "Parameter param must be provider if task Switch in process definition."
+                "Parameter param or at least one local_param of task must "
+                "be provider if task Switch in process definition."
             )
 
     def submit(self) -> int:
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py
index 28032f88e7..0c9a2b82b3 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/switch.py
@@ -129,7 +129,11 @@ class SwitchCondition(Base):
 
 
 class Switch(Task):
-    """Task switch object, declare behavior for switch task to dolphinscheduler."""
+    """Task switch object, declare behavior for switch task to dolphinscheduler.
+
+    Param of process definition or at least one local param of task must be set
+    if task `switch` in this workflow.
+    """
 
     def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs):
         super().__init__(name, TaskType.SWITCH, *args, **kwargs)
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
index 88028f72c7..36e1cb035e 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
@@ -240,11 +240,38 @@ def test__pre_submit_check_switch_without_param(mock_code_version):
         parent >> switch
         with pytest.raises(
             PyDSParamException,
-            match="Parameter param must be provider if task Switch in process definition.",
+            match="Parameter param or at least one local_param of task must "
+            "be provider if task Switch in process definition.",
         ):
             pd._pre_submit_check()
 
 
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test__pre_submit_check_switch_with_local_params(mock_code_version):
+    """Test :func:`_pre_submit_check` if process definition with switch with local params of task."""
+    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+        parent = Task(
+            name="parent",
+            task_type=TEST_TASK_TYPE,
+            local_params=[
+                {"prop": "var", "direct": "OUT", "type": "VARCHAR", "value": ""}
+            ],
+        )
+        switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
+        switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
+        switch_condition = SwitchCondition(
+            Branch(condition="${var} > 1", task=switch_child_1),
+            Default(task=switch_child_2),
+        )
+
+        switch = Switch(name="switch", condition=switch_condition)
+        parent >> switch
+        pd._pre_submit_check()
+
+
 def test_process_definition_get_define_without_task():
     """Test process definition function get_define without task."""
     expect = {