You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/12/13 09:06:15 UTC
[dolphinscheduler-sdk-python] branch main updated: [chore] Change workflow instance name from pd (#42)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new 70afdfc [chore] Change workflow instance name from pd (#42)
70afdfc is described below
commit 70afdfca754adda4ab67038e8f80d0d2a98752bc
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Tue Dec 13 17:06:09 2022 +0800
[chore] Change workflow instance name from pd (#42)
Change instance name from pd to workflow
---
docs/source/concept.rst | 22 ++--
src/pydolphinscheduler/core/workflow.py | 4 +-
src/pydolphinscheduler/core/yaml_workflow.py | 8 +-
.../examples/bulk_create_example.py | 6 +-
.../examples/multi_resources_example.py | 4 +-
.../examples/task_condition_example.py | 4 +-
.../examples/task_datax_example.py | 4 +-
.../examples/task_dependent_example.py | 8 +-
.../examples/task_dvc_example.py | 4 +-
.../examples/task_flink_example.py | 4 +-
.../examples/task_kubernetes_example.py | 4 +-
.../examples/task_map_reduce_example.py | 4 +-
.../examples/task_mlflow_example.py | 4 +-
.../examples/task_openmldb_example.py | 4 +-
.../examples/task_pytorch_example.py | 4 +-
.../examples/task_sagemaker_example.py | 4 +-
.../examples/task_spark_example.py | 4 +-
.../examples/task_switch_example.py | 4 +-
src/pydolphinscheduler/examples/tutorial.py | 4 +-
.../examples/tutorial_decorator.py | 4 +-
tests/core/test_workflow.py | 118 +++++++++++----------
tests/integration/test_process_definition.py | 4 +-
tests/tasks/test_condition.py | 6 +-
tests/tasks/test_func_wrap.py | 42 ++++----
tests/tasks/test_switch.py | 6 +-
25 files changed, 143 insertions(+), 141 deletions(-)
diff --git a/docs/source/concept.rst b/docs/source/concept.rst
index 3b2f3d0..6048ed4 100644
--- a/docs/source/concept.rst
+++ b/docs/source/concept.rst
@@ -31,11 +31,11 @@ Workflow could be initialized in normal assign statement or in context manger.
.. code-block:: python
# Initialization with assign statement
- pd = Workflow(name="my first workflow")
+ workflow = Workflow(name="my first workflow")
# Or context manger
- with Workflow(name="my first workflow") as pd:
- pd.submit()
+ with Workflow(name="my first workflow") as workflow:
+ workflow.submit()
Workflow is the main object communicate between *PyDolphinScheduler* and DolphinScheduler daemon.
After workflow and task is be declared, you could use `submit` and `run` notify server your definition.
@@ -46,10 +46,10 @@ But if you want to run the workflow after you submit it, you could use attribute
.. code-block:: python
# Just submit definition, without run it
- pd.submit()
+ workflow.submit()
# Both submit and run definition
- pd.run()
+ workflow.run()
Schedule
~~~~~~~~
@@ -84,7 +84,7 @@ Tenant is the user who run task command in machine or in virtual machine. it cou
.. code-block:: python
#
- pd = Workflow(name="workflow tenant", tenant="tenant_exists")
+ workflow = Workflow(name="workflow tenant", tenant="tenant_exists")
.. note::
@@ -114,7 +114,7 @@ Parameter ``execution type`` can be set in
.. code-block:: python
- pd = Workflow(
+ workflow = Workflow(
name="workflow_name",
execution_type="parallel"
)
@@ -173,11 +173,11 @@ decide workflow of task. You could set `workflow` in both normal assign or in co
.. code-block:: python
# Normal assign, have to explicit declaration and pass `Workflow` instance to task
- pd = Workflow(name="my first workflow")
- shell_task = Shell(name="shell", command="echo shell task", workflow=pd)
+ workflow = Workflow(name="my first workflow")
+ shell_task = Shell(name="shell", command="echo shell task", workflow=workflow)
- # Context manger, `Workflow` instance pd would implicit declaration to task
- with Workflow(name="my first workflow") as pd:
+ # Context manger, `Workflow` instance workflow would implicit declaration to task
+ with Workflow(name="my first workflow") as workflow:
shell_task = Shell(name="shell", command="echo shell task",
With both `Workflow`_, `Tasks`_ and `Tasks Dependence`_, we could build a workflow with multiple tasks.
diff --git a/src/pydolphinscheduler/core/workflow.py b/src/pydolphinscheduler/core/workflow.py
index 02906fb..f972a8f 100644
--- a/src/pydolphinscheduler/core/workflow.py
+++ b/src/pydolphinscheduler/core/workflow.py
@@ -37,9 +37,9 @@ class WorkflowContext:
_context_managed_workflow: Optional["Workflow"] = None
@classmethod
- def set(cls, pd: "Workflow") -> None:
+ def set(cls, workflow: "Workflow") -> None:
"""Set attribute self._context_managed_workflow."""
- cls._context_managed_workflow = pd
+ cls._context_managed_workflow = workflow
@classmethod
def get(cls) -> Optional["Workflow"]:
diff --git a/src/pydolphinscheduler/core/yaml_workflow.py b/src/pydolphinscheduler/core/yaml_workflow.py
index 5401112..507a518 100644
--- a/src/pydolphinscheduler/core/yaml_workflow.py
+++ b/src/pydolphinscheduler/core/yaml_workflow.py
@@ -146,7 +146,7 @@ class YamlWorkflow(YamlParser):
workflow_name = workflow_params["name"]
logger.info(f"Create workflow: {workflow_name}")
- with Workflow(**workflow_params) as pd:
+ with Workflow(**workflow_params) as workflow:
# save dependencies between tasks
dependencies = {}
@@ -170,11 +170,11 @@ class YamlWorkflow(YamlParser):
upstream_task = name2task[upstream_task_name]
upstream_task >> downstream_task
- pd.submit()
+ workflow.submit()
# if set is_run, run the workflow after submit
if is_run:
- logger.info(f"run workflow: {pd}")
- pd.run()
+ logger.info(f"run workflow: {workflow}")
+ workflow.run()
return workflow_name
diff --git a/src/pydolphinscheduler/examples/bulk_create_example.py b/src/pydolphinscheduler/examples/bulk_create_example.py
index 9b89f73..229811c 100644
--- a/src/pydolphinscheduler/examples/bulk_create_example.py
+++ b/src/pydolphinscheduler/examples/bulk_create_example.py
@@ -41,7 +41,7 @@ IS_CHAIN = True
for wf in range(0, NUM_WORKFLOWS):
workflow_name = f"workflow:{wf}"
- with Workflow(name=workflow_name, tenant=TENANT) as pd:
+ with Workflow(name=workflow_name, tenant=TENANT) as workflow:
for t in range(0, NUM_TASKS):
task_name = f"task:{t}-{workflow_name}"
command = f"echo This is task {task_name}"
@@ -49,7 +49,7 @@ for wf in range(0, NUM_WORKFLOWS):
if IS_CHAIN and t > 0:
pre_task_name = f"task:{t-1}-{workflow_name}"
- pd.get_one_task_by_name(pre_task_name) >> task
+ workflow.get_one_task_by_name(pre_task_name) >> task
# We just submit workflow and task definition without set schedule time or run it manually
- pd.submit()
+ workflow.submit()
diff --git a/src/pydolphinscheduler/examples/multi_resources_example.py b/src/pydolphinscheduler/examples/multi_resources_example.py
index dd06bd0..978a357 100644
--- a/src/pydolphinscheduler/examples/multi_resources_example.py
+++ b/src/pydolphinscheduler/examples/multi_resources_example.py
@@ -72,7 +72,7 @@ with Workflow(
Resource(name=main, content="from dependence import now\nprint(now)"),
],
# [end create_new_resources]
-) as pd:
+) as workflow:
# [start use_exists_resources]
task_use_resource = Shell(
name="use-resource",
@@ -84,5 +84,5 @@ with Workflow(
)
# [end use_exists_resources]
- pd.run()
+ workflow.run()
# [end workflow]
diff --git a/src/pydolphinscheduler/examples/task_condition_example.py b/src/pydolphinscheduler/examples/task_condition_example.py
index ea8ca3a..585bc76 100644
--- a/src/pydolphinscheduler/examples/task_condition_example.py
+++ b/src/pydolphinscheduler/examples/task_condition_example.py
@@ -35,7 +35,7 @@ from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Condition
from pydolphinscheduler.tasks.shell import Shell
-with Workflow(name="task_condition_example", tenant="tenant_exists") as pd:
+with Workflow(name="task_condition_example", tenant="tenant_exists") as workflow:
pre_task_1 = Shell(name="pre_task_1", command="echo pre_task_1")
pre_task_2 = Shell(name="pre_task_2", command="echo pre_task_2")
pre_task_3 = Shell(name="pre_task_3", command="echo pre_task_3")
@@ -55,5 +55,5 @@ with Workflow(name="task_condition_example", tenant="tenant_exists") as pd:
success_task=success_branch,
failed_task=fail_branch,
)
- pd.submit()
+ workflow.submit()
# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_datax_example.py b/src/pydolphinscheduler/examples/task_datax_example.py
index aa4d00a..a1422ef 100644
--- a/src/pydolphinscheduler/examples/task_datax_example.py
+++ b/src/pydolphinscheduler/examples/task_datax_example.py
@@ -75,7 +75,7 @@ JSON_TEMPLATE = {
with Workflow(
name="task_datax_example",
tenant="tenant_exists",
-) as pd:
+) as workflow:
# This task synchronizes the data in `t_ds_project`
# of `first_mysql` database to `target_project` of `second_mysql` database.
# You have to make sure data source named `first_mysql` and `second_mysql` exists
@@ -91,5 +91,5 @@ with Workflow(
# You can custom json_template of datax to sync data. This task create a new
# datax job same as task1, transfer record from `first_mysql` to `second_mysql`
task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE))
- pd.run()
+ workflow.run()
# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_dependent_example.py b/src/pydolphinscheduler/examples/task_dependent_example.py
index 93c607c..648bb5c 100644
--- a/src/pydolphinscheduler/examples/task_dependent_example.py
+++ b/src/pydolphinscheduler/examples/task_dependent_example.py
@@ -43,16 +43,16 @@ from pydolphinscheduler.tasks.shell import Shell
with Workflow(
name="task_dependent_external",
tenant="tenant_exists",
-) as pd:
+) as workflow:
task_1 = Shell(name="task_1", command="echo task 1")
task_2 = Shell(name="task_2", command="echo task 2")
task_3 = Shell(name="task_3", command="echo task 3")
- pd.submit()
+ workflow.submit()
with Workflow(
name="task_dependent_example",
tenant="tenant_exists",
-) as pd:
+) as workflow:
task = Dependent(
name="task_dependent",
dependence=And(
@@ -70,5 +70,5 @@ with Workflow(
)
),
)
- pd.submit()
+ workflow.submit()
# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_dvc_example.py b/src/pydolphinscheduler/examples/task_dvc_example.py
index 8d1976a..98e03f1 100644
--- a/src/pydolphinscheduler/examples/task_dvc_example.py
+++ b/src/pydolphinscheduler/examples/task_dvc_example.py
@@ -26,7 +26,7 @@ repository = "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git"
with Workflow(
name="task_dvc_example",
tenant="tenant_exists",
-) as pd:
+) as workflow:
init_task = DVCInit(name="init_dvc", repository=repository, store_url="~/dvc_data")
upload_task = DVCUpload(
name="upload_data",
@@ -47,6 +47,6 @@ with Workflow(
init_task >> upload_task >> download_task
- pd.run()
+ workflow.run()
# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_flink_example.py b/src/pydolphinscheduler/examples/task_flink_example.py
index a00ba7b..e5084b4 100644
--- a/src/pydolphinscheduler/examples/task_flink_example.py
+++ b/src/pydolphinscheduler/examples/task_flink_example.py
@@ -21,7 +21,7 @@
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.flink import DeployMode, Flink, ProgramType
-with Workflow(name="task_flink_example", tenant="tenant_exists") as pd:
+with Workflow(name="task_flink_example", tenant="tenant_exists") as workflow:
task = Flink(
name="task_flink",
main_class="org.apache.flink.streaming.examples.wordcount.WordCount",
@@ -29,5 +29,5 @@ with Workflow(name="task_flink_example", tenant="tenant_exists") as pd:
program_type=ProgramType.JAVA,
deploy_mode=DeployMode.LOCAL,
)
- pd.run()
+ workflow.run()
# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_kubernetes_example.py b/src/pydolphinscheduler/examples/task_kubernetes_example.py
index d1c807c..3f2ee0d 100644
--- a/src/pydolphinscheduler/examples/task_kubernetes_example.py
+++ b/src/pydolphinscheduler/examples/task_kubernetes_example.py
@@ -24,7 +24,7 @@ from pydolphinscheduler.tasks.kubernetes import Kubernetes
with Workflow(
name="task_kubernetes_example",
tenant="tenant_exists",
-) as pd:
+) as workflow:
task_k8s = Kubernetes(
name="task_k8s",
image="ds-dev",
@@ -32,5 +32,5 @@ with Workflow(
min_cpu_cores=2.0,
min_memory_space=10.0,
)
- pd.submit()
+ workflow.submit()
# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_map_reduce_example.py b/src/pydolphinscheduler/examples/task_map_reduce_example.py
index 117c503..70a6b2b 100644
--- a/src/pydolphinscheduler/examples/task_map_reduce_example.py
+++ b/src/pydolphinscheduler/examples/task_map_reduce_example.py
@@ -22,7 +22,7 @@ from pydolphinscheduler.core.engine import ProgramType
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.map_reduce import MR
-with Workflow(name="task_map_reduce_example", tenant="tenant_exists") as pd:
+with Workflow(name="task_map_reduce_example", tenant="tenant_exists") as workflow:
task = MR(
name="task_mr",
main_class="wordcount",
@@ -30,5 +30,5 @@ with Workflow(name="task_map_reduce_example", tenant="tenant_exists") as pd:
program_type=ProgramType.JAVA,
main_args="/dolphinscheduler/tenant_exists/resources/file.txt /output/ds",
)
- pd.run()
+ workflow.run()
# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_mlflow_example.py b/src/pydolphinscheduler/examples/task_mlflow_example.py
index f0d51a2..7030bcf 100644
--- a/src/pydolphinscheduler/examples/task_mlflow_example.py
+++ b/src/pydolphinscheduler/examples/task_mlflow_example.py
@@ -32,7 +32,7 @@ mlflow_tracking_uri = "http://127.0.0.1:5000"
with Workflow(
name="task_mlflow_example",
tenant="tenant_exists",
-) as pd:
+) as workflow:
# run custom mlflow project to train model
train_custom = MLFlowProjectsCustom(
@@ -88,6 +88,6 @@ with Workflow(
train_basic_algorithm >> deploy_mlflow
- pd.submit()
+ workflow.submit()
# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_openmldb_example.py b/src/pydolphinscheduler/examples/task_openmldb_example.py
index 86454b7..a8186e7 100644
--- a/src/pydolphinscheduler/examples/task_openmldb_example.py
+++ b/src/pydolphinscheduler/examples/task_openmldb_example.py
@@ -30,7 +30,7 @@ INTO TABLE talkingdata OPTIONS(mode='overwrite');
with Workflow(
name="task_openmldb_example",
tenant="tenant_exists",
-) as pd:
+) as workflow:
task_openmldb = OpenMLDB(
name="task_openmldb",
zookeeper="127.0.0.1:2181",
@@ -39,5 +39,5 @@ with Workflow(
sql=sql,
)
- pd.run()
+ workflow.run()
# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_pytorch_example.py b/src/pydolphinscheduler/examples/task_pytorch_example.py
index 8aa7ea9..8cb3a2d 100644
--- a/src/pydolphinscheduler/examples/task_pytorch_example.py
+++ b/src/pydolphinscheduler/examples/task_pytorch_example.py
@@ -24,7 +24,7 @@ from pydolphinscheduler.tasks.pytorch import Pytorch
with Workflow(
name="task_pytorch_example",
tenant="tenant_exists",
-) as pd:
+) as workflow:
# run project with existing environment
task_existing_env = Pytorch(
@@ -58,5 +58,5 @@ with Workflow(
requirements="requirements.txt",
)
- pd.submit()
+ workflow.submit()
# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_sagemaker_example.py b/src/pydolphinscheduler/examples/task_sagemaker_example.py
index f14ceb5..cbebfa7 100644
--- a/src/pydolphinscheduler/examples/task_sagemaker_example.py
+++ b/src/pydolphinscheduler/examples/task_sagemaker_example.py
@@ -36,11 +36,11 @@ sagemaker_request_data = {
with Workflow(
name="task_sagemaker_example",
tenant="tenant_exists",
-) as pd:
+) as workflow:
task_sagemaker = SageMaker(
name="task_sagemaker",
sagemaker_request_json=json.dumps(sagemaker_request_data, indent=2),
)
- pd.run()
+ workflow.run()
# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_spark_example.py b/src/pydolphinscheduler/examples/task_spark_example.py
index 142f7d5..77ec4ac 100644
--- a/src/pydolphinscheduler/examples/task_spark_example.py
+++ b/src/pydolphinscheduler/examples/task_spark_example.py
@@ -21,7 +21,7 @@
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark
-with Workflow(name="task_spark_example", tenant="tenant_exists") as pd:
+with Workflow(name="task_spark_example", tenant="tenant_exists") as workflow:
task = Spark(
name="task_spark",
main_class="org.apache.spark.examples.SparkPi",
@@ -29,5 +29,5 @@ with Workflow(name="task_spark_example", tenant="tenant_exists") as pd:
program_type=ProgramType.JAVA,
deploy_mode=DeployMode.LOCAL,
)
- pd.run()
+ workflow.run()
# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_switch_example.py b/src/pydolphinscheduler/examples/task_switch_example.py
index d573342..b5c60f0 100644
--- a/src/pydolphinscheduler/examples/task_switch_example.py
+++ b/src/pydolphinscheduler/examples/task_switch_example.py
@@ -36,7 +36,7 @@ from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondi
with Workflow(
name="task_switch_example", tenant="tenant_exists", param={"var": "1"}
-) as pd:
+) as workflow:
parent = Shell(name="parent", command="echo parent")
switch_child_1 = Shell(name="switch_child_1", command="echo switch_child_1")
switch_child_2 = Shell(name="switch_child_2", command="echo switch_child_2")
@@ -47,5 +47,5 @@ with Workflow(
switch = Switch(name="switch", condition=switch_condition)
parent >> switch
- pd.submit()
+ workflow.submit()
# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/tutorial.py b/src/pydolphinscheduler/examples/tutorial.py
index cb6d47f..10243fd 100644
--- a/src/pydolphinscheduler/examples/tutorial.py
+++ b/src/pydolphinscheduler/examples/tutorial.py
@@ -46,7 +46,7 @@ with Workflow(
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
tenant="tenant_exists",
-) as pd:
+) as workflow:
# [end workflow_declare]
# [start task_declare]
task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
@@ -63,6 +63,6 @@ with Workflow(
# [end task_relation_declare]
# [start submit_or_run]
- pd.run()
+ workflow.run()
# [end submit_or_run]
# [end tutorial]
diff --git a/src/pydolphinscheduler/examples/tutorial_decorator.py b/src/pydolphinscheduler/examples/tutorial_decorator.py
index 9740af7..9e584a6 100644
--- a/src/pydolphinscheduler/examples/tutorial_decorator.py
+++ b/src/pydolphinscheduler/examples/tutorial_decorator.py
@@ -75,7 +75,7 @@ with Workflow(
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
tenant="tenant_exists",
-) as pd:
+) as workflow:
# [end workflow_declare]
# [start task_relation_declare]
@@ -86,6 +86,6 @@ with Workflow(
# [end task_relation_declare]
# [start submit_or_run]
- pd.run()
+ workflow.run()
# [end submit_or_run]
# [end tutorial]
diff --git a/tests/core/test_workflow.py b/tests/core/test_workflow.py
index e5c8601..43f1ddd 100644
--- a/tests/core/test_workflow.py
+++ b/tests/core/test_workflow.py
@@ -40,8 +40,10 @@ TEST_TASK_TYPE = "test-task-type"
@pytest.mark.parametrize("func", ["run", "submit", "start"])
def test_workflow_key_attr(func):
"""Test workflow have specific functions or attributes."""
- with Workflow(TEST_WORKFLOW_NAME) as pd:
- assert hasattr(pd, func), f"Workflow instance don't have attribute `{func}`"
+ with Workflow(TEST_WORKFLOW_NAME) as workflow:
+ assert hasattr(
+ workflow, func
+ ), f"Workflow instance don't have attribute `{func}`"
@pytest.mark.parametrize(
@@ -71,10 +73,10 @@ def test_workflow_key_attr(func):
)
def test_workflow_default_value(name, value):
"""Test workflow default attributes."""
- with Workflow(TEST_WORKFLOW_NAME) as pd:
- assert getattr(pd, name) == value, (
+ with Workflow(TEST_WORKFLOW_NAME) as workflow:
+ assert getattr(workflow, name) == value, (
f"Workflow instance attribute `{name}` not with "
- f"except default value `{getattr(pd, name)}`"
+ f"except default value `{getattr(workflow, name)}`"
)
@@ -100,10 +102,10 @@ def test_workflow_default_value(name, value):
)
def test_set_attr(name, cls, expect):
"""Test workflow set attributes which get with same type."""
- with Workflow(TEST_WORKFLOW_NAME) as pd:
- setattr(pd, name, expect)
+ with Workflow(TEST_WORKFLOW_NAME) as workflow:
+ setattr(workflow, name, expect)
assert (
- getattr(pd, name) == expect
+ getattr(workflow, name) == expect
), f"Workflow set attribute `{name}` do not work expect"
@@ -116,9 +118,9 @@ def test_set_attr(name, cls, expect):
)
def test_set_release_state(value, expect):
"""Test workflow set release_state attributes."""
- with Workflow(TEST_WORKFLOW_NAME, release_state=value) as pd:
+ with Workflow(TEST_WORKFLOW_NAME, release_state=value) as workflow:
assert (
- getattr(pd, "release_state") == expect
+ getattr(workflow, "release_state") == expect
), "Workflow set attribute release_state do not return expect value."
@@ -134,12 +136,12 @@ def test_set_release_state(value, expect):
)
def test_set_release_state_error(value):
"""Test workflow set release_state attributes with error."""
- pd = Workflow(TEST_WORKFLOW_NAME, release_state=value)
+ workflow = Workflow(TEST_WORKFLOW_NAME, release_state=value)
with pytest.raises(
PyDSParamException,
match="Parameter release_state only support `online` or `offline` but get.*",
):
- pd.release_state
+ workflow.release_state
@pytest.mark.parametrize(
@@ -153,10 +155,10 @@ def test_set_release_state_error(value):
)
def test_set_attr_return_special_object(set_attr, set_val, get_attr, get_val):
"""Test workflow set attributes which get with different type."""
- with Workflow(TEST_WORKFLOW_NAME) as pd:
- setattr(pd, set_attr, set_val)
+ with Workflow(TEST_WORKFLOW_NAME) as workflow:
+ setattr(workflow, set_attr, set_val)
assert get_val == getattr(
- pd, get_attr
+ workflow, get_attr
), f"Set attribute {set_attr} can not get back with {get_val}."
@@ -174,8 +176,8 @@ def test__parse_datetime(val, expect):
Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
"""
- with Workflow(TEST_WORKFLOW_NAME) as pd:
- assert expect == pd._parse_datetime(
+ with Workflow(TEST_WORKFLOW_NAME) as workflow:
+ assert expect == workflow._parse_datetime(
val
), f"Function _parse_datetime with unexpect value by {val}."
@@ -190,9 +192,9 @@ def test__parse_datetime(val, expect):
)
def test__parse_datetime_not_support_type(val: Any):
"""Test workflow function _parse_datetime not support type error."""
- with Workflow(TEST_WORKFLOW_NAME) as pd:
+ with Workflow(TEST_WORKFLOW_NAME) as workflow:
with pytest.raises(PyDSParamException, match="Do not support value type.*?"):
- pd._parse_datetime(val)
+ workflow._parse_datetime(val)
@pytest.mark.parametrize(
@@ -272,8 +274,8 @@ def test_execute_type_not_support_type(val: str):
)
def test_property_param_json(param, expect):
"""Test Workflow's property param_json."""
- pd = Workflow(TEST_WORKFLOW_NAME, param=param)
- assert pd.param_json == expect
+ workflow = Workflow(TEST_WORKFLOW_NAME, param=param)
+ assert workflow.param_json == expect
@patch(
@@ -282,7 +284,7 @@ def test_property_param_json(param, expect):
)
def test__pre_submit_check_switch_without_param(mock_code_version):
"""Test :func:`_pre_submit_check` if workflow with switch but without attribute param."""
- with Workflow(TEST_WORKFLOW_NAME) as pd:
+ with Workflow(TEST_WORKFLOW_NAME) as workflow:
parent = Task(name="parent", task_type=TEST_TASK_TYPE)
switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
@@ -298,7 +300,7 @@ def test__pre_submit_check_switch_without_param(mock_code_version):
match="Parameter param or at least one local_param of task must "
"be provider if task Switch in workflow.",
):
- pd._pre_submit_check()
+ workflow._pre_submit_check()
@patch(
@@ -307,7 +309,7 @@ def test__pre_submit_check_switch_without_param(mock_code_version):
)
def test__pre_submit_check_switch_with_local_params(mock_code_version):
"""Test :func:`_pre_submit_check` if workflow with switch with local params of task."""
- with Workflow(TEST_WORKFLOW_NAME) as pd:
+ with Workflow(TEST_WORKFLOW_NAME) as workflow:
parent = Task(
name="parent",
task_type=TEST_TASK_TYPE,
@@ -324,7 +326,7 @@ def test__pre_submit_check_switch_with_local_params(mock_code_version):
switch = Switch(name="switch", condition=switch_condition)
parent >> switch
- pd._pre_submit_check()
+ workflow._pre_submit_check()
def test_workflow_get_define_without_task():
@@ -346,45 +348,45 @@ def test_workflow_get_define_without_task():
"taskRelationJson": [{}],
"resourceList": [],
}
- with Workflow(TEST_WORKFLOW_NAME) as pd:
- assert pd.get_define() == expect
+ with Workflow(TEST_WORKFLOW_NAME) as workflow:
+ assert workflow.get_define() == expect
def test_workflow_simple_context_manager():
"""Test simple create workflow in workflow context manager mode."""
expect_tasks_num = 5
- with Workflow(TEST_WORKFLOW_NAME) as pd:
+ with Workflow(TEST_WORKFLOW_NAME) as workflow:
for i in range(expect_tasks_num):
curr_task = Task(name=f"task-{i}", task_type=f"type-{i}")
# Set deps task i as i-1 parent
if i > 0:
- pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
+ pre_task = workflow.get_one_task_by_name(f"task-{i - 1}")
curr_task.set_upstream(pre_task)
- assert len(pd.tasks) == expect_tasks_num
+ assert len(workflow.tasks) == expect_tasks_num
# Test if task workflow same as origin one
- task: Task = pd.get_one_task_by_name("task-0")
- assert pd is task.workflow
+ task: Task = workflow.get_one_task_by_name("task-0")
+ assert workflow is task.workflow
# Test if all tasks with expect deps
for i in range(expect_tasks_num):
- task: Task = pd.get_one_task_by_name(f"task-{i}")
+ task: Task = workflow.get_one_task_by_name(f"task-{i}")
if i == 0:
assert task._upstream_task_codes == set()
assert task._downstream_task_codes == {
- pd.get_one_task_by_name("task-1").code
+ workflow.get_one_task_by_name("task-1").code
}
elif i == expect_tasks_num - 1:
assert task._upstream_task_codes == {
- pd.get_one_task_by_name(f"task-{i - 1}").code
+ workflow.get_one_task_by_name(f"task-{i - 1}").code
}
assert task._downstream_task_codes == set()
else:
assert task._upstream_task_codes == {
- pd.get_one_task_by_name(f"task-{i - 1}").code
+ workflow.get_one_task_by_name(f"task-{i - 1}").code
}
assert task._downstream_task_codes == {
- pd.get_one_task_by_name(f"task-{i + 1}").code
+ workflow.get_one_task_by_name(f"task-{i + 1}").code
}
@@ -399,38 +401,38 @@ def test_deprecated_workflow_simple_context_manager():
assert issubclass(w[-1].category, DeprecationWarning)
assert "deprecated" in str(w[-1].message)
- with ProcessDefinition(TEST_WORKFLOW_NAME) as pd:
+ with ProcessDefinition(TEST_WORKFLOW_NAME) as workflow:
for i in range(expect_tasks_num):
curr_task = Task(name=f"task-{i}", task_type=f"type-{i}")
# Set deps task i as i-1 parent
if i > 0:
- pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
+ pre_task = workflow.get_one_task_by_name(f"task-{i - 1}")
curr_task.set_upstream(pre_task)
- assert len(pd.tasks) == expect_tasks_num
+ assert len(workflow.tasks) == expect_tasks_num
# Test if task workflow same as origin one
- task: Task = pd.get_one_task_by_name("task-0")
- assert pd is task.workflow
+ task: Task = workflow.get_one_task_by_name("task-0")
+ assert workflow is task.workflow
# Test if all tasks with expect deps
for i in range(expect_tasks_num):
- task: Task = pd.get_one_task_by_name(f"task-{i}")
+ task: Task = workflow.get_one_task_by_name(f"task-{i}")
if i == 0:
assert task._upstream_task_codes == set()
assert task._downstream_task_codes == {
- pd.get_one_task_by_name("task-1").code
+ workflow.get_one_task_by_name("task-1").code
}
elif i == expect_tasks_num - 1:
assert task._upstream_task_codes == {
- pd.get_one_task_by_name(f"task-{i - 1}").code
+ workflow.get_one_task_by_name(f"task-{i - 1}").code
}
assert task._downstream_task_codes == set()
else:
assert task._upstream_task_codes == {
- pd.get_one_task_by_name(f"task-{i - 1}").code
+ workflow.get_one_task_by_name(f"task-{i - 1}").code
}
assert task._downstream_task_codes == {
- pd.get_one_task_by_name(f"task-{i + 1}").code
+ workflow.get_one_task_by_name(f"task-{i + 1}").code
}
@@ -441,19 +443,19 @@ def test_workflow_simple_separate():
test_workflow_simple_context_manager.
"""
expect_tasks_num = 5
- pd = Workflow(TEST_WORKFLOW_NAME)
+ workflow = Workflow(TEST_WORKFLOW_NAME)
for i in range(expect_tasks_num):
curr_task = Task(
name=f"task-{i}",
task_type=f"type-{i}",
- workflow=pd,
+ workflow=workflow,
)
# Set deps task i as i-1 parent
if i > 0:
- pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
+ pre_task = workflow.get_one_task_by_name(f"task-{i - 1}")
curr_task.set_upstream(pre_task)
- assert len(pd.tasks) == expect_tasks_num
- assert all(["task-" in task.name for task in pd.task_list])
+ assert len(workflow.tasks) == expect_tasks_num
+ assert all(["task-" in task.name for task in workflow.task_list])
@pytest.mark.parametrize(
@@ -467,8 +469,8 @@ def test_set_workflow_user_attr(user_attrs):
default_value = {
"tenant": configuration.WORKFLOW_TENANT,
}
- with Workflow(TEST_WORKFLOW_NAME, **user_attrs) as pd:
- user = pd.user
+ with Workflow(TEST_WORKFLOW_NAME, **user_attrs) as workflow:
+ user = workflow.user
for attr in default_value:
# Get assigned attribute if we specific, else get default value
except_attr = (
@@ -486,8 +488,8 @@ def test_schedule_json_none_schedule():
with Workflow(
TEST_WORKFLOW_NAME,
schedule=None,
- ) as pd:
- assert pd.schedule_json is None
+ ) as workflow:
+ assert workflow.schedule_json is None
# We freeze time here, because we test start_time with None, and if will get datetime.datetime.now. If we do
@@ -561,5 +563,5 @@ def test_schedule_json_start_and_end_time(start_time, end_time, expect_date):
start_time=start_time,
end_time=end_time,
timezone=configuration.WORKFLOW_TIME_ZONE,
- ) as pd:
- assert pd.schedule_json == expect
+ ) as workflow:
+ assert workflow.schedule_json == expect
diff --git a/tests/integration/test_process_definition.py b/tests/integration/test_process_definition.py
index 1ea0051..010ca76 100644
--- a/tests/integration/test_process_definition.py
+++ b/tests/integration/test_process_definition.py
@@ -45,6 +45,6 @@ def test_change_workflow_attr(pre: Dict, post: Dict):
"""Test whether workflow success when specific attribute change."""
assert pre.keys() == post.keys(), "Not equal keys for pre and post attribute."
for attrs in [pre, post]:
- with Workflow(name=WORKFLOW_NAME, **attrs) as pd:
+ with Workflow(name=WORKFLOW_NAME, **attrs) as workflow:
Shell(name=TASK_NAME, command="echo 1")
- pd.submit()
+ workflow.submit()
diff --git a/tests/tasks/test_condition.py b/tests/tasks/test_condition.py
index 700f418..3381783 100644
--- a/tests/tasks/test_condition.py
+++ b/tests/tasks/test_condition.py
@@ -401,7 +401,7 @@ def test_condition_get_define(mock_condition_code_version, mock_task_code_versio
)
def test_condition_set_dep_workflow(mock_task_code_version):
"""Test task condition set dependence in workflow level."""
- with Workflow(name="test-condition-set-dep-workflow") as pd:
+ with Workflow(name="test-condition-set-dep-workflow") as workflow:
pre_task_1 = Task(name="pre_task_1", task_type=TEST_TYPE)
pre_task_2 = Task(name="pre_task_2", task_type=TEST_TYPE)
pre_task_3 = Task(name="pre_task_3", task_type=TEST_TYPE)
@@ -423,8 +423,8 @@ def test_condition_set_dep_workflow(mock_task_code_version):
)
# General tasks test
- assert len(pd.tasks) == 6
- assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
+ assert len(workflow.tasks) == 6
+ assert sorted(workflow.task_list, key=lambda t: t.name) == sorted(
[
pre_task_1,
pre_task_2,
diff --git a/tests/tasks/test_func_wrap.py b/tests/tasks/test_func_wrap.py
index 8c94c8e..aa0da26 100644
--- a/tests/tasks/test_func_wrap.py
+++ b/tests/tasks/test_func_wrap.py
@@ -41,13 +41,13 @@ def test_single_task_outside(mock_code):
def foo():
print(TASK_NAME)
- with Workflow(WORKFLOW_NAME) as pd:
+ with Workflow(WORKFLOW_NAME) as workflow:
foo()
- assert pd is not None and pd.name == WORKFLOW_NAME
- assert len(pd.tasks) == 1
+ assert workflow is not None and workflow.name == WORKFLOW_NAME
+ assert len(workflow.tasks) == 1
- pd_task = pd.tasks[12345]
+ pd_task = workflow.tasks[12345]
assert pd_task.name == "foo"
assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()"
@@ -57,7 +57,7 @@ def test_single_task_outside(mock_code):
)
def test_single_task_inside(mock_code):
"""Test single decorator task which inside workflow."""
- with Workflow(WORKFLOW_NAME) as pd:
+ with Workflow(WORKFLOW_NAME) as workflow:
@task
def foo():
@@ -65,10 +65,10 @@ def test_single_task_inside(mock_code):
foo()
- assert pd is not None and pd.name == WORKFLOW_NAME
- assert len(pd.tasks) == 1
+ assert workflow is not None and workflow.name == WORKFLOW_NAME
+ assert len(workflow.tasks) == 1
- pd_task = pd.tasks[12345]
+ pd_task = workflow.tasks[12345]
assert pd_task.name == "foo"
assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()"
@@ -84,7 +84,7 @@ def test_addition_decorator_error(mock_code):
def foo():
print(TASK_NAME)
- with Workflow(WORKFLOW_NAME) as pd: # noqa: F841
+ with Workflow(WORKFLOW_NAME) as workflow: # noqa: F841
with pytest.raises(
PyDSParamException, match="Do no support other decorators for.*"
):
@@ -106,18 +106,18 @@ def test_multiple_tasks_outside(mock_code):
def bar():
print(TASK_NAME)
- with Workflow(WORKFLOW_NAME) as pd:
+ with Workflow(WORKFLOW_NAME) as workflow:
foo = foo()
bar = bar()
foo >> bar
- assert pd is not None and pd.name == WORKFLOW_NAME
- assert len(pd.tasks) == 2
+ assert workflow is not None and workflow.name == WORKFLOW_NAME
+ assert len(workflow.tasks) == 2
- task_foo = pd.get_one_task_by_name("foo")
- task_bar = pd.get_one_task_by_name("bar")
- assert set(pd.task_list) == {task_foo, task_bar}
+ task_foo = workflow.get_one_task_by_name("foo")
+ task_bar = workflow.get_one_task_by_name("bar")
+ assert set(workflow.task_list) == {task_foo, task_bar}
assert (
task_foo is not None
and task_foo._upstream_task_codes == set()
@@ -136,7 +136,7 @@ def test_multiple_tasks_outside(mock_code):
)
def test_multiple_tasks_inside(mock_code):
"""Test multiple decorator tasks which inside workflow."""
- with Workflow(WORKFLOW_NAME) as pd:
+ with Workflow(WORKFLOW_NAME) as workflow:
@task
def foo():
@@ -151,12 +151,12 @@ def test_multiple_tasks_inside(mock_code):
foo >> bar
- assert pd is not None and pd.name == WORKFLOW_NAME
- assert len(pd.tasks) == 2
+ assert workflow is not None and workflow.name == WORKFLOW_NAME
+ assert len(workflow.tasks) == 2
- task_foo = pd.get_one_task_by_name("foo")
- task_bar = pd.get_one_task_by_name("bar")
- assert set(pd.task_list) == {task_foo, task_bar}
+ task_foo = workflow.get_one_task_by_name("foo")
+ task_bar = workflow.get_one_task_by_name("bar")
+ assert set(workflow.task_list) == {task_foo, task_bar}
assert (
task_foo is not None
and task_foo._upstream_task_codes == set()
diff --git a/tests/tasks/test_switch.py b/tests/tasks/test_switch.py
index 37c3b44..677ba41 100644
--- a/tests/tasks/test_switch.py
+++ b/tests/tasks/test_switch.py
@@ -266,7 +266,7 @@ def test_switch_get_define(mock_task_code_version):
)
def test_switch_set_dep_workflow(mock_task_code_version):
"""Test task switch set dependence in workflow level."""
- with Workflow(name="test-switch-set-dep-workflow") as pd:
+ with Workflow(name="test-switch-set-dep-workflow") as workflow:
parent = Task(name="parent", task_type=TEST_TYPE)
switch_child_1 = Task(name="switch_child_1", task_type=TEST_TYPE)
switch_child_2 = Task(name="switch_child_2", task_type=TEST_TYPE)
@@ -278,8 +278,8 @@ def test_switch_set_dep_workflow(mock_task_code_version):
switch = Switch(name=TEST_NAME, condition=switch_condition)
parent >> switch
# General tasks test
- assert len(pd.tasks) == 4
- assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
+ assert len(workflow.tasks) == 4
+ assert sorted(workflow.task_list, key=lambda t: t.name) == sorted(
[parent, switch, switch_child_1, switch_child_2], key=lambda t: t.name
)
# Task dep test