You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2021/12/28 03:36:18 UTC

[dolphinscheduler] branch 2.0.2-prepare updated: [python] Fix task condition set wrong deps (#7650) (#7655)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
     new d23b344  [python] Fix task condition set wrong deps (#7650) (#7655)
d23b344 is described below

commit d23b3444bb0514f77c821be6dcdf105c9267e60e
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Tue Dec 28 11:34:25 2021 +0800

    [python] Fix task condition set wrong deps (#7650) (#7655)
    
    After #7505 merged. we could use condition task type
    but our dependent set in the wrong direction, all
    the condition operators should be upstream of the
    current task instead of downstream
    
    fix: #7649
---
 .../examples/task_conditions_example.py            | 29 +++++++-------
 .../src/pydolphinscheduler/tasks/condition.py      |  8 ++--
 .../tests/tasks/test_condition.py                  | 45 ++++++++++++----------
 3 files changed, 44 insertions(+), 38 deletions(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
index a7723f3..8b66b79 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
@@ -22,11 +22,11 @@ This example will create five task in single workflow, with four shell task and
 condition have one upstream which we declare explicit with syntax `parent >> condition`, and three downstream
 automatically set dependence by condition task by passing parameter `condition`. The graph of this workflow
 like:
-                         --> condition_success_1
-                       /
-parent -> conditions ->  --> condition_success_2
-                       \
-                         --> condition_fail
+pre_task_success_1 ->
+                      \
+pre_task_success_2 ->  --> conditions -> end
+                      /
+pre_task_fail      ->
 .
 """
 
@@ -35,21 +35,22 @@ from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Conditions
 from pydolphinscheduler.tasks.shell import Shell
 
 with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists") as pd:
-    parent = Shell(name="parent", command="echo parent")
-    condition_success_1 = Shell(
-        name="condition_success_1", command="echo condition_success_1"
+    condition_pre_task_1 = Shell(
+        name="pre_task_success_1", command="echo pre_task_success_1"
     )
-    condition_success_2 = Shell(
-        name="condition_success_2", command="echo condition_success_2"
+    condition_pre_task_2 = Shell(
+        name="pre_task_success_2", command="echo pre_task_success_2"
     )
-    condition_fail = Shell(name="condition_fail", command="echo condition_fail")
+    condition_pre_task_3 = Shell(name="pre_task_fail", command="echo pre_task_fail")
     cond_operator = And(
         And(
-            SUCCESS(condition_success_1, condition_success_2),
-            FAILURE(condition_fail),
+            SUCCESS(condition_pre_task_1, condition_pre_task_2),
+            FAILURE(condition_pre_task_3),
         ),
     )
 
+    end = Shell(name="end", command="echo parent")
+
     condition = Conditions(name="conditions", condition=cond_operator)
-    parent >> condition
+    condition >> end
     pd.submit()
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
index 2278f49..905a41b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
@@ -164,13 +164,13 @@ class Conditions(Task):
         self._set_dep()
 
     def _set_dep(self) -> None:
-        """Set downstream according to parameter `condition`."""
-        downstream = []
+        """Set upstream according to parameter `condition`."""
+        upstream = []
         for cond in self.condition.args:
             if isinstance(cond, ConditionOperator):
                 for status in cond.args:
-                    downstream.extend(list(status.tasks))
-        self.set_downstream(downstream)
+                    upstream.extend(list(status.tasks))
+        self.set_upstream(upstream)
 
     @property
     def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
index 5647d5b..9933c4f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
@@ -396,44 +396,49 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio
 def test_condition_set_dep_workflow(mock_task_code_version):
     """Test task condition set dependence in workflow level."""
     with ProcessDefinition(name="test-condition-set-dep-workflow") as pd:
-        parent = Task(name="parent", task_type=TEST_TYPE)
-        condition_success_1 = Task(name="condition_success_1", task_type=TEST_TYPE)
-        condition_success_2 = Task(name="condition_success_2", task_type=TEST_TYPE)
-        condition_fail = Task(name="condition_fail", task_type=TEST_TYPE)
+        condition_pre_task_1 = Task(name="pre_task_success_1", task_type=TEST_TYPE)
+        condition_pre_task_2 = Task(name="pre_task_success_2", task_type=TEST_TYPE)
+        condition_pre_task_3 = Task(name="pre_task_fail", task_type=TEST_TYPE)
         cond_operator = And(
             And(
-                SUCCESS(condition_success_1, condition_success_2),
-                FAILURE(condition_fail),
+                SUCCESS(condition_pre_task_1, condition_pre_task_2),
+                FAILURE(condition_pre_task_3),
             ),
         )
+        end = Task(name="end", task_type=TEST_TYPE)
+
+        condition = Conditions(name="conditions", condition=cond_operator)
+        condition >> end
 
-        condition = Conditions(name=TEST_NAME, condition=cond_operator)
-        parent >> condition
         # General tasks test
         assert len(pd.tasks) == 5
         assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
             [
-                parent,
                 condition,
-                condition_success_1,
-                condition_success_2,
-                condition_fail,
+                condition_pre_task_1,
+                condition_pre_task_2,
+                condition_pre_task_3,
+                end,
             ],
             key=lambda t: t.name,
         )
         # Task dep test
-        assert parent._downstream_task_codes == {condition.code}
-        assert condition._upstream_task_codes == {parent.code}
+        assert end._upstream_task_codes == {condition.code}
+        assert condition._downstream_task_codes == {end.code}
 
         # Condition task dep after ProcessDefinition function get_define called
-        assert condition._downstream_task_codes == {
-            condition_success_1.code,
-            condition_success_2.code,
-            condition_fail.code,
+        assert condition._upstream_task_codes == {
+            condition_pre_task_1.code,
+            condition_pre_task_2.code,
+            condition_pre_task_3.code,
         }
         assert all(
             [
-                child._upstream_task_codes == {condition.code}
-                for child in [condition_success_1, condition_success_2, condition_fail]
+                child._downstream_task_codes == {condition.code}
+                for child in [
+                    condition_pre_task_1,
+                    condition_pre_task_2,
+                    condition_pre_task_3,
+                ]
             ]
         )