You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2021/12/28 03:36:18 UTC
[dolphinscheduler] branch 2.0.2-prepare updated: [python] Fix task condition set wrong deps (#7650) (#7655)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
new d23b344 [python] Fix task condition set wrong deps (#7650) (#7655)
d23b344 is described below
commit d23b3444bb0514f77c821be6dcdf105c9267e60e
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Tue Dec 28 11:34:25 2021 +0800
[python] Fix task condition set wrong deps (#7650) (#7655)
After #7505 merged. we could use condition task type
but our dependent set in the wrong direction, all
the condition operators should be upstream of the
current task instead of downstream
fix: #7649
---
.../examples/task_conditions_example.py | 29 +++++++-------
.../src/pydolphinscheduler/tasks/condition.py | 8 ++--
.../tests/tasks/test_condition.py | 45 ++++++++++++----------
3 files changed, 44 insertions(+), 38 deletions(-)
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
index a7723f3..8b66b79 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
@@ -22,11 +22,11 @@ This example will create five task in single workflow, with four shell task and
condition have one upstream which we declare explicit with syntax `parent >> condition`, and three downstream
automatically set dependence by condition task by passing parameter `condition`. The graph of this workflow
like:
- --> condition_success_1
- /
-parent -> conditions -> --> condition_success_2
- \
- --> condition_fail
+pre_task_success_1 ->
+ \
+pre_task_success_2 -> --> conditions -> end
+ /
+pre_task_fail ->
.
"""
@@ -35,21 +35,22 @@ from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Conditions
from pydolphinscheduler.tasks.shell import Shell
with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists") as pd:
- parent = Shell(name="parent", command="echo parent")
- condition_success_1 = Shell(
- name="condition_success_1", command="echo condition_success_1"
+ condition_pre_task_1 = Shell(
+ name="pre_task_success_1", command="echo pre_task_success_1"
)
- condition_success_2 = Shell(
- name="condition_success_2", command="echo condition_success_2"
+ condition_pre_task_2 = Shell(
+ name="pre_task_success_2", command="echo pre_task_success_2"
)
- condition_fail = Shell(name="condition_fail", command="echo condition_fail")
+ condition_pre_task_3 = Shell(name="pre_task_fail", command="echo pre_task_fail")
cond_operator = And(
And(
- SUCCESS(condition_success_1, condition_success_2),
- FAILURE(condition_fail),
+ SUCCESS(condition_pre_task_1, condition_pre_task_2),
+ FAILURE(condition_pre_task_3),
),
)
+ end = Shell(name="end", command="echo parent")
+
condition = Conditions(name="conditions", condition=cond_operator)
- parent >> condition
+ condition >> end
pd.submit()
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
index 2278f49..905a41b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
@@ -164,13 +164,13 @@ class Conditions(Task):
self._set_dep()
def _set_dep(self) -> None:
- """Set downstream according to parameter `condition`."""
- downstream = []
+ """Set upstream according to parameter `condition`."""
+ upstream = []
for cond in self.condition.args:
if isinstance(cond, ConditionOperator):
for status in cond.args:
- downstream.extend(list(status.tasks))
- self.set_downstream(downstream)
+ upstream.extend(list(status.tasks))
+ self.set_upstream(upstream)
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
index 5647d5b..9933c4f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
@@ -396,44 +396,49 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio
def test_condition_set_dep_workflow(mock_task_code_version):
"""Test task condition set dependence in workflow level."""
with ProcessDefinition(name="test-condition-set-dep-workflow") as pd:
- parent = Task(name="parent", task_type=TEST_TYPE)
- condition_success_1 = Task(name="condition_success_1", task_type=TEST_TYPE)
- condition_success_2 = Task(name="condition_success_2", task_type=TEST_TYPE)
- condition_fail = Task(name="condition_fail", task_type=TEST_TYPE)
+ condition_pre_task_1 = Task(name="pre_task_success_1", task_type=TEST_TYPE)
+ condition_pre_task_2 = Task(name="pre_task_success_2", task_type=TEST_TYPE)
+ condition_pre_task_3 = Task(name="pre_task_fail", task_type=TEST_TYPE)
cond_operator = And(
And(
- SUCCESS(condition_success_1, condition_success_2),
- FAILURE(condition_fail),
+ SUCCESS(condition_pre_task_1, condition_pre_task_2),
+ FAILURE(condition_pre_task_3),
),
)
+ end = Task(name="end", task_type=TEST_TYPE)
+
+ condition = Conditions(name="conditions", condition=cond_operator)
+ condition >> end
- condition = Conditions(name=TEST_NAME, condition=cond_operator)
- parent >> condition
# General tasks test
assert len(pd.tasks) == 5
assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
[
- parent,
condition,
- condition_success_1,
- condition_success_2,
- condition_fail,
+ condition_pre_task_1,
+ condition_pre_task_2,
+ condition_pre_task_3,
+ end,
],
key=lambda t: t.name,
)
# Task dep test
- assert parent._downstream_task_codes == {condition.code}
- assert condition._upstream_task_codes == {parent.code}
+ assert end._upstream_task_codes == {condition.code}
+ assert condition._downstream_task_codes == {end.code}
# Condition task dep after ProcessDefinition function get_define called
- assert condition._downstream_task_codes == {
- condition_success_1.code,
- condition_success_2.code,
- condition_fail.code,
+ assert condition._upstream_task_codes == {
+ condition_pre_task_1.code,
+ condition_pre_task_2.code,
+ condition_pre_task_3.code,
}
assert all(
[
- child._upstream_task_codes == {condition.code}
- for child in [condition_success_1, condition_success_2, condition_fail]
+ child._downstream_task_codes == {condition.code}
+ for child in [
+ condition_pre_task_1,
+ condition_pre_task_2,
+ condition_pre_task_3,
+ ]
]
)