You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/12/13 09:06:15 UTC

[dolphinscheduler-sdk-python] branch main updated: [chore] Change workflow instance name from pd (#42)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 70afdfc  [chore] Change workflow instance name from pd (#42)
70afdfc is described below

commit 70afdfca754adda4ab67038e8f80d0d2a98752bc
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Tue Dec 13 17:06:09 2022 +0800

    [chore] Change workflow instance name from pd (#42)
    
    Change instance name from pd to workflow
---
 docs/source/concept.rst                            |  22 ++--
 src/pydolphinscheduler/core/workflow.py            |   4 +-
 src/pydolphinscheduler/core/yaml_workflow.py       |   8 +-
 .../examples/bulk_create_example.py                |   6 +-
 .../examples/multi_resources_example.py            |   4 +-
 .../examples/task_condition_example.py             |   4 +-
 .../examples/task_datax_example.py                 |   4 +-
 .../examples/task_dependent_example.py             |   8 +-
 .../examples/task_dvc_example.py                   |   4 +-
 .../examples/task_flink_example.py                 |   4 +-
 .../examples/task_kubernetes_example.py            |   4 +-
 .../examples/task_map_reduce_example.py            |   4 +-
 .../examples/task_mlflow_example.py                |   4 +-
 .../examples/task_openmldb_example.py              |   4 +-
 .../examples/task_pytorch_example.py               |   4 +-
 .../examples/task_sagemaker_example.py             |   4 +-
 .../examples/task_spark_example.py                 |   4 +-
 .../examples/task_switch_example.py                |   4 +-
 src/pydolphinscheduler/examples/tutorial.py        |   4 +-
 .../examples/tutorial_decorator.py                 |   4 +-
 tests/core/test_workflow.py                        | 118 +++++++++++----------
 tests/integration/test_process_definition.py       |   4 +-
 tests/tasks/test_condition.py                      |   6 +-
 tests/tasks/test_func_wrap.py                      |  42 ++++----
 tests/tasks/test_switch.py                         |   6 +-
 25 files changed, 143 insertions(+), 141 deletions(-)

diff --git a/docs/source/concept.rst b/docs/source/concept.rst
index 3b2f3d0..6048ed4 100644
--- a/docs/source/concept.rst
+++ b/docs/source/concept.rst
@@ -31,11 +31,11 @@ Workflow could be initialized in normal assign statement or in context manger.
 .. code-block:: python
 
    # Initialization with assign statement
-   pd = Workflow(name="my first workflow")
+   workflow = Workflow(name="my first workflow")
 
    # Or context manger 
-   with Workflow(name="my first workflow") as pd:
-       pd.submit()
+   with Workflow(name="my first workflow") as workflow:
+       workflow.submit()
 
 Workflow is the main object communicate between *PyDolphinScheduler* and DolphinScheduler daemon.
 After workflow and task is be declared, you could use `submit` and `run` notify server your definition.
@@ -46,10 +46,10 @@ But if you want to run the workflow after you submit it, you could use attribute
 .. code-block:: python
 
    # Just submit definition, without run it
-   pd.submit()
+   workflow.submit()
    
    # Both submit and run definition
-   pd.run()
+   workflow.run()
 
 Schedule
 ~~~~~~~~
@@ -84,7 +84,7 @@ Tenant is the user who run task command in machine or in virtual machine. it cou
 .. code-block:: python
 
    # 
-   pd = Workflow(name="workflow tenant", tenant="tenant_exists")
+   workflow = Workflow(name="workflow tenant", tenant="tenant_exists")
 
 .. note::
 
@@ -114,7 +114,7 @@ Parameter ``execution type`` can be set in
 
   .. code-block:: python
 
-     pd = Workflow(
+     workflow = Workflow(
          name="workflow_name",
          execution_type="parallel"
      )
@@ -173,11 +173,11 @@ decide workflow of task. You could set `workflow` in both normal assign or in co
 .. code-block:: python
 
    # Normal assign, have to explicit declaration and pass `Workflow` instance to task
-   pd = Workflow(name="my first workflow")
-   shell_task = Shell(name="shell", command="echo shell task", workflow=pd)
+   workflow = Workflow(name="my first workflow")
+   shell_task = Shell(name="shell", command="echo shell task", workflow=workflow)
 
-   # Context manger, `Workflow` instance pd would implicit declaration to task
-   with Workflow(name="my first workflow") as pd:
+   # Context manger, `Workflow` instance workflow would implicit declaration to task
+   with Workflow(name="my first workflow") as workflow:
        shell_task = Shell(name="shell", command="echo shell task",
 
 With both `Workflow`_, `Tasks`_  and `Tasks Dependence`_, we could build a workflow with multiple tasks.
diff --git a/src/pydolphinscheduler/core/workflow.py b/src/pydolphinscheduler/core/workflow.py
index 02906fb..f972a8f 100644
--- a/src/pydolphinscheduler/core/workflow.py
+++ b/src/pydolphinscheduler/core/workflow.py
@@ -37,9 +37,9 @@ class WorkflowContext:
     _context_managed_workflow: Optional["Workflow"] = None
 
     @classmethod
-    def set(cls, pd: "Workflow") -> None:
+    def set(cls, workflow: "Workflow") -> None:
         """Set attribute self._context_managed_workflow."""
-        cls._context_managed_workflow = pd
+        cls._context_managed_workflow = workflow
 
     @classmethod
     def get(cls) -> Optional["Workflow"]:
diff --git a/src/pydolphinscheduler/core/yaml_workflow.py b/src/pydolphinscheduler/core/yaml_workflow.py
index 5401112..507a518 100644
--- a/src/pydolphinscheduler/core/yaml_workflow.py
+++ b/src/pydolphinscheduler/core/yaml_workflow.py
@@ -146,7 +146,7 @@ class YamlWorkflow(YamlParser):
 
         workflow_name = workflow_params["name"]
         logger.info(f"Create workflow: {workflow_name}")
-        with Workflow(**workflow_params) as pd:
+        with Workflow(**workflow_params) as workflow:
 
             # save dependencies between tasks
             dependencies = {}
@@ -170,11 +170,11 @@ class YamlWorkflow(YamlParser):
                     upstream_task = name2task[upstream_task_name]
                     upstream_task >> downstream_task
 
-            pd.submit()
+            workflow.submit()
             # if set is_run, run the workflow after submit
             if is_run:
-                logger.info(f"run workflow: {pd}")
-                pd.run()
+                logger.info(f"run workflow: {workflow}")
+                workflow.run()
 
         return workflow_name
 
diff --git a/src/pydolphinscheduler/examples/bulk_create_example.py b/src/pydolphinscheduler/examples/bulk_create_example.py
index 9b89f73..229811c 100644
--- a/src/pydolphinscheduler/examples/bulk_create_example.py
+++ b/src/pydolphinscheduler/examples/bulk_create_example.py
@@ -41,7 +41,7 @@ IS_CHAIN = True
 for wf in range(0, NUM_WORKFLOWS):
     workflow_name = f"workflow:{wf}"
 
-    with Workflow(name=workflow_name, tenant=TENANT) as pd:
+    with Workflow(name=workflow_name, tenant=TENANT) as workflow:
         for t in range(0, NUM_TASKS):
             task_name = f"task:{t}-{workflow_name}"
             command = f"echo This is task {task_name}"
@@ -49,7 +49,7 @@ for wf in range(0, NUM_WORKFLOWS):
 
             if IS_CHAIN and t > 0:
                 pre_task_name = f"task:{t-1}-{workflow_name}"
-                pd.get_one_task_by_name(pre_task_name) >> task
+                workflow.get_one_task_by_name(pre_task_name) >> task
 
         # We just submit workflow and task definition without set schedule time or run it manually
-        pd.submit()
+        workflow.submit()
diff --git a/src/pydolphinscheduler/examples/multi_resources_example.py b/src/pydolphinscheduler/examples/multi_resources_example.py
index dd06bd0..978a357 100644
--- a/src/pydolphinscheduler/examples/multi_resources_example.py
+++ b/src/pydolphinscheduler/examples/multi_resources_example.py
@@ -72,7 +72,7 @@ with Workflow(
         Resource(name=main, content="from dependence import now\nprint(now)"),
     ],
     # [end create_new_resources]
-) as pd:
+) as workflow:
     # [start use_exists_resources]
     task_use_resource = Shell(
         name="use-resource",
@@ -84,5 +84,5 @@ with Workflow(
     )
     # [end use_exists_resources]
 
-    pd.run()
+    workflow.run()
 # [end workflow]
diff --git a/src/pydolphinscheduler/examples/task_condition_example.py b/src/pydolphinscheduler/examples/task_condition_example.py
index ea8ca3a..585bc76 100644
--- a/src/pydolphinscheduler/examples/task_condition_example.py
+++ b/src/pydolphinscheduler/examples/task_condition_example.py
@@ -35,7 +35,7 @@ from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Condition
 from pydolphinscheduler.tasks.shell import Shell
 
-with Workflow(name="task_condition_example", tenant="tenant_exists") as pd:
+with Workflow(name="task_condition_example", tenant="tenant_exists") as workflow:
     pre_task_1 = Shell(name="pre_task_1", command="echo pre_task_1")
     pre_task_2 = Shell(name="pre_task_2", command="echo pre_task_2")
     pre_task_3 = Shell(name="pre_task_3", command="echo pre_task_3")
@@ -55,5 +55,5 @@ with Workflow(name="task_condition_example", tenant="tenant_exists") as pd:
         success_task=success_branch,
         failed_task=fail_branch,
     )
-    pd.submit()
+    workflow.submit()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_datax_example.py b/src/pydolphinscheduler/examples/task_datax_example.py
index aa4d00a..a1422ef 100644
--- a/src/pydolphinscheduler/examples/task_datax_example.py
+++ b/src/pydolphinscheduler/examples/task_datax_example.py
@@ -75,7 +75,7 @@ JSON_TEMPLATE = {
 with Workflow(
     name="task_datax_example",
     tenant="tenant_exists",
-) as pd:
+) as workflow:
     # This task synchronizes the data in `t_ds_project`
     # of `first_mysql` database to `target_project` of `second_mysql` database.
     # You have to make sure data source named `first_mysql` and `second_mysql` exists
@@ -91,5 +91,5 @@ with Workflow(
     # You can custom json_template of datax to sync data. This task create a new
     # datax job same as task1, transfer record from `first_mysql` to `second_mysql`
     task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE))
-    pd.run()
+    workflow.run()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_dependent_example.py b/src/pydolphinscheduler/examples/task_dependent_example.py
index 93c607c..648bb5c 100644
--- a/src/pydolphinscheduler/examples/task_dependent_example.py
+++ b/src/pydolphinscheduler/examples/task_dependent_example.py
@@ -43,16 +43,16 @@ from pydolphinscheduler.tasks.shell import Shell
 with Workflow(
     name="task_dependent_external",
     tenant="tenant_exists",
-) as pd:
+) as workflow:
     task_1 = Shell(name="task_1", command="echo task 1")
     task_2 = Shell(name="task_2", command="echo task 2")
     task_3 = Shell(name="task_3", command="echo task 3")
-    pd.submit()
+    workflow.submit()
 
 with Workflow(
     name="task_dependent_example",
     tenant="tenant_exists",
-) as pd:
+) as workflow:
     task = Dependent(
         name="task_dependent",
         dependence=And(
@@ -70,5 +70,5 @@ with Workflow(
             )
         ),
     )
-    pd.submit()
+    workflow.submit()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_dvc_example.py b/src/pydolphinscheduler/examples/task_dvc_example.py
index 8d1976a..98e03f1 100644
--- a/src/pydolphinscheduler/examples/task_dvc_example.py
+++ b/src/pydolphinscheduler/examples/task_dvc_example.py
@@ -26,7 +26,7 @@ repository = "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git"
 with Workflow(
     name="task_dvc_example",
     tenant="tenant_exists",
-) as pd:
+) as workflow:
     init_task = DVCInit(name="init_dvc", repository=repository, store_url="~/dvc_data")
     upload_task = DVCUpload(
         name="upload_data",
@@ -47,6 +47,6 @@ with Workflow(
 
     init_task >> upload_task >> download_task
 
-    pd.run()
+    workflow.run()
 
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_flink_example.py b/src/pydolphinscheduler/examples/task_flink_example.py
index a00ba7b..e5084b4 100644
--- a/src/pydolphinscheduler/examples/task_flink_example.py
+++ b/src/pydolphinscheduler/examples/task_flink_example.py
@@ -21,7 +21,7 @@
 from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.flink import DeployMode, Flink, ProgramType
 
-with Workflow(name="task_flink_example", tenant="tenant_exists") as pd:
+with Workflow(name="task_flink_example", tenant="tenant_exists") as workflow:
     task = Flink(
         name="task_flink",
         main_class="org.apache.flink.streaming.examples.wordcount.WordCount",
@@ -29,5 +29,5 @@ with Workflow(name="task_flink_example", tenant="tenant_exists") as pd:
         program_type=ProgramType.JAVA,
         deploy_mode=DeployMode.LOCAL,
     )
-    pd.run()
+    workflow.run()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_kubernetes_example.py b/src/pydolphinscheduler/examples/task_kubernetes_example.py
index d1c807c..3f2ee0d 100644
--- a/src/pydolphinscheduler/examples/task_kubernetes_example.py
+++ b/src/pydolphinscheduler/examples/task_kubernetes_example.py
@@ -24,7 +24,7 @@ from pydolphinscheduler.tasks.kubernetes import Kubernetes
 with Workflow(
     name="task_kubernetes_example",
     tenant="tenant_exists",
-) as pd:
+) as workflow:
     task_k8s = Kubernetes(
         name="task_k8s",
         image="ds-dev",
@@ -32,5 +32,5 @@ with Workflow(
         min_cpu_cores=2.0,
         min_memory_space=10.0,
     )
-    pd.submit()
+    workflow.submit()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_map_reduce_example.py b/src/pydolphinscheduler/examples/task_map_reduce_example.py
index 117c503..70a6b2b 100644
--- a/src/pydolphinscheduler/examples/task_map_reduce_example.py
+++ b/src/pydolphinscheduler/examples/task_map_reduce_example.py
@@ -22,7 +22,7 @@ from pydolphinscheduler.core.engine import ProgramType
 from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.map_reduce import MR
 
-with Workflow(name="task_map_reduce_example", tenant="tenant_exists") as pd:
+with Workflow(name="task_map_reduce_example", tenant="tenant_exists") as workflow:
     task = MR(
         name="task_mr",
         main_class="wordcount",
@@ -30,5 +30,5 @@ with Workflow(name="task_map_reduce_example", tenant="tenant_exists") as pd:
         program_type=ProgramType.JAVA,
         main_args="/dolphinscheduler/tenant_exists/resources/file.txt /output/ds",
     )
-    pd.run()
+    workflow.run()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_mlflow_example.py b/src/pydolphinscheduler/examples/task_mlflow_example.py
index f0d51a2..7030bcf 100644
--- a/src/pydolphinscheduler/examples/task_mlflow_example.py
+++ b/src/pydolphinscheduler/examples/task_mlflow_example.py
@@ -32,7 +32,7 @@ mlflow_tracking_uri = "http://127.0.0.1:5000"
 with Workflow(
     name="task_mlflow_example",
     tenant="tenant_exists",
-) as pd:
+) as workflow:
 
     # run custom mlflow project to train model
     train_custom = MLFlowProjectsCustom(
@@ -88,6 +88,6 @@ with Workflow(
 
     train_basic_algorithm >> deploy_mlflow
 
-    pd.submit()
+    workflow.submit()
 
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_openmldb_example.py b/src/pydolphinscheduler/examples/task_openmldb_example.py
index 86454b7..a8186e7 100644
--- a/src/pydolphinscheduler/examples/task_openmldb_example.py
+++ b/src/pydolphinscheduler/examples/task_openmldb_example.py
@@ -30,7 +30,7 @@ INTO TABLE talkingdata OPTIONS(mode='overwrite');
 with Workflow(
     name="task_openmldb_example",
     tenant="tenant_exists",
-) as pd:
+) as workflow:
     task_openmldb = OpenMLDB(
         name="task_openmldb",
         zookeeper="127.0.0.1:2181",
@@ -39,5 +39,5 @@ with Workflow(
         sql=sql,
     )
 
-    pd.run()
+    workflow.run()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_pytorch_example.py b/src/pydolphinscheduler/examples/task_pytorch_example.py
index 8aa7ea9..8cb3a2d 100644
--- a/src/pydolphinscheduler/examples/task_pytorch_example.py
+++ b/src/pydolphinscheduler/examples/task_pytorch_example.py
@@ -24,7 +24,7 @@ from pydolphinscheduler.tasks.pytorch import Pytorch
 with Workflow(
     name="task_pytorch_example",
     tenant="tenant_exists",
-) as pd:
+) as workflow:
 
     # run project with existing environment
     task_existing_env = Pytorch(
@@ -58,5 +58,5 @@ with Workflow(
         requirements="requirements.txt",
     )
 
-    pd.submit()
+    workflow.submit()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_sagemaker_example.py b/src/pydolphinscheduler/examples/task_sagemaker_example.py
index f14ceb5..cbebfa7 100644
--- a/src/pydolphinscheduler/examples/task_sagemaker_example.py
+++ b/src/pydolphinscheduler/examples/task_sagemaker_example.py
@@ -36,11 +36,11 @@ sagemaker_request_data = {
 with Workflow(
     name="task_sagemaker_example",
     tenant="tenant_exists",
-) as pd:
+) as workflow:
     task_sagemaker = SageMaker(
         name="task_sagemaker",
         sagemaker_request_json=json.dumps(sagemaker_request_data, indent=2),
     )
 
-    pd.run()
+    workflow.run()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_spark_example.py b/src/pydolphinscheduler/examples/task_spark_example.py
index 142f7d5..77ec4ac 100644
--- a/src/pydolphinscheduler/examples/task_spark_example.py
+++ b/src/pydolphinscheduler/examples/task_spark_example.py
@@ -21,7 +21,7 @@
 from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark
 
-with Workflow(name="task_spark_example", tenant="tenant_exists") as pd:
+with Workflow(name="task_spark_example", tenant="tenant_exists") as workflow:
     task = Spark(
         name="task_spark",
         main_class="org.apache.spark.examples.SparkPi",
@@ -29,5 +29,5 @@ with Workflow(name="task_spark_example", tenant="tenant_exists") as pd:
         program_type=ProgramType.JAVA,
         deploy_mode=DeployMode.LOCAL,
     )
-    pd.run()
+    workflow.run()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_switch_example.py b/src/pydolphinscheduler/examples/task_switch_example.py
index d573342..b5c60f0 100644
--- a/src/pydolphinscheduler/examples/task_switch_example.py
+++ b/src/pydolphinscheduler/examples/task_switch_example.py
@@ -36,7 +36,7 @@ from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondi
 
 with Workflow(
     name="task_switch_example", tenant="tenant_exists", param={"var": "1"}
-) as pd:
+) as workflow:
     parent = Shell(name="parent", command="echo parent")
     switch_child_1 = Shell(name="switch_child_1", command="echo switch_child_1")
     switch_child_2 = Shell(name="switch_child_2", command="echo switch_child_2")
@@ -47,5 +47,5 @@ with Workflow(
 
     switch = Switch(name="switch", condition=switch_condition)
     parent >> switch
-    pd.submit()
+    workflow.submit()
 # [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/tutorial.py b/src/pydolphinscheduler/examples/tutorial.py
index cb6d47f..10243fd 100644
--- a/src/pydolphinscheduler/examples/tutorial.py
+++ b/src/pydolphinscheduler/examples/tutorial.py
@@ -46,7 +46,7 @@ with Workflow(
     schedule="0 0 0 * * ? *",
     start_time="2021-01-01",
     tenant="tenant_exists",
-) as pd:
+) as workflow:
     # [end workflow_declare]
     # [start task_declare]
     task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
@@ -63,6 +63,6 @@ with Workflow(
     # [end task_relation_declare]
 
     # [start submit_or_run]
-    pd.run()
+    workflow.run()
     # [end submit_or_run]
 # [end tutorial]
diff --git a/src/pydolphinscheduler/examples/tutorial_decorator.py b/src/pydolphinscheduler/examples/tutorial_decorator.py
index 9740af7..9e584a6 100644
--- a/src/pydolphinscheduler/examples/tutorial_decorator.py
+++ b/src/pydolphinscheduler/examples/tutorial_decorator.py
@@ -75,7 +75,7 @@ with Workflow(
     schedule="0 0 0 * * ? *",
     start_time="2021-01-01",
     tenant="tenant_exists",
-) as pd:
+) as workflow:
     # [end workflow_declare]
 
     # [start task_relation_declare]
@@ -86,6 +86,6 @@ with Workflow(
     # [end task_relation_declare]
 
     # [start submit_or_run]
-    pd.run()
+    workflow.run()
     # [end submit_or_run]
 # [end tutorial]
diff --git a/tests/core/test_workflow.py b/tests/core/test_workflow.py
index e5c8601..43f1ddd 100644
--- a/tests/core/test_workflow.py
+++ b/tests/core/test_workflow.py
@@ -40,8 +40,10 @@ TEST_TASK_TYPE = "test-task-type"
 @pytest.mark.parametrize("func", ["run", "submit", "start"])
 def test_workflow_key_attr(func):
     """Test workflow have specific functions or attributes."""
-    with Workflow(TEST_WORKFLOW_NAME) as pd:
-        assert hasattr(pd, func), f"Workflow instance don't have attribute `{func}`"
+    with Workflow(TEST_WORKFLOW_NAME) as workflow:
+        assert hasattr(
+            workflow, func
+        ), f"Workflow instance don't have attribute `{func}`"
 
 
 @pytest.mark.parametrize(
@@ -71,10 +73,10 @@ def test_workflow_key_attr(func):
 )
 def test_workflow_default_value(name, value):
     """Test workflow default attributes."""
-    with Workflow(TEST_WORKFLOW_NAME) as pd:
-        assert getattr(pd, name) == value, (
+    with Workflow(TEST_WORKFLOW_NAME) as workflow:
+        assert getattr(workflow, name) == value, (
             f"Workflow instance attribute `{name}` not with "
-            f"except default value `{getattr(pd, name)}`"
+            f"except default value `{getattr(workflow, name)}`"
         )
 
 
@@ -100,10 +102,10 @@ def test_workflow_default_value(name, value):
 )
 def test_set_attr(name, cls, expect):
     """Test workflow set attributes which get with same type."""
-    with Workflow(TEST_WORKFLOW_NAME) as pd:
-        setattr(pd, name, expect)
+    with Workflow(TEST_WORKFLOW_NAME) as workflow:
+        setattr(workflow, name, expect)
         assert (
-            getattr(pd, name) == expect
+            getattr(workflow, name) == expect
         ), f"Workflow set attribute `{name}` do not work expect"
 
 
@@ -116,9 +118,9 @@ def test_set_attr(name, cls, expect):
 )
 def test_set_release_state(value, expect):
     """Test workflow set release_state attributes."""
-    with Workflow(TEST_WORKFLOW_NAME, release_state=value) as pd:
+    with Workflow(TEST_WORKFLOW_NAME, release_state=value) as workflow:
         assert (
-            getattr(pd, "release_state") == expect
+            getattr(workflow, "release_state") == expect
         ), "Workflow set attribute release_state do not return expect value."
 
 
@@ -134,12 +136,12 @@ def test_set_release_state(value, expect):
 )
 def test_set_release_state_error(value):
     """Test workflow set release_state attributes with error."""
-    pd = Workflow(TEST_WORKFLOW_NAME, release_state=value)
+    workflow = Workflow(TEST_WORKFLOW_NAME, release_state=value)
     with pytest.raises(
         PyDSParamException,
         match="Parameter release_state only support `online` or `offline` but get.*",
     ):
-        pd.release_state
+        workflow.release_state
 
 
 @pytest.mark.parametrize(
@@ -153,10 +155,10 @@ def test_set_release_state_error(value):
 )
 def test_set_attr_return_special_object(set_attr, set_val, get_attr, get_val):
     """Test workflow set attributes which get with different type."""
-    with Workflow(TEST_WORKFLOW_NAME) as pd:
-        setattr(pd, set_attr, set_val)
+    with Workflow(TEST_WORKFLOW_NAME) as workflow:
+        setattr(workflow, set_attr, set_val)
         assert get_val == getattr(
-            pd, get_attr
+            workflow, get_attr
         ), f"Set attribute {set_attr} can not get back with {get_val}."
 
 
@@ -174,8 +176,8 @@ def test__parse_datetime(val, expect):
 
     Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
     """
-    with Workflow(TEST_WORKFLOW_NAME) as pd:
-        assert expect == pd._parse_datetime(
+    with Workflow(TEST_WORKFLOW_NAME) as workflow:
+        assert expect == workflow._parse_datetime(
             val
         ), f"Function _parse_datetime with unexpect value by {val}."
 
@@ -190,9 +192,9 @@ def test__parse_datetime(val, expect):
 )
 def test__parse_datetime_not_support_type(val: Any):
     """Test workflow function _parse_datetime not support type error."""
-    with Workflow(TEST_WORKFLOW_NAME) as pd:
+    with Workflow(TEST_WORKFLOW_NAME) as workflow:
         with pytest.raises(PyDSParamException, match="Do not support value type.*?"):
-            pd._parse_datetime(val)
+            workflow._parse_datetime(val)
 
 
 @pytest.mark.parametrize(
@@ -272,8 +274,8 @@ def test_execute_type_not_support_type(val: str):
 )
 def test_property_param_json(param, expect):
     """Test Workflow's property param_json."""
-    pd = Workflow(TEST_WORKFLOW_NAME, param=param)
-    assert pd.param_json == expect
+    workflow = Workflow(TEST_WORKFLOW_NAME, param=param)
+    assert workflow.param_json == expect
 
 
 @patch(
@@ -282,7 +284,7 @@ def test_property_param_json(param, expect):
 )
 def test__pre_submit_check_switch_without_param(mock_code_version):
     """Test :func:`_pre_submit_check` if workflow with switch but without attribute param."""
-    with Workflow(TEST_WORKFLOW_NAME) as pd:
+    with Workflow(TEST_WORKFLOW_NAME) as workflow:
         parent = Task(name="parent", task_type=TEST_TASK_TYPE)
         switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
         switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
@@ -298,7 +300,7 @@ def test__pre_submit_check_switch_without_param(mock_code_version):
             match="Parameter param or at least one local_param of task must "
             "be provider if task Switch in workflow.",
         ):
-            pd._pre_submit_check()
+            workflow._pre_submit_check()
 
 
 @patch(
@@ -307,7 +309,7 @@ def test__pre_submit_check_switch_without_param(mock_code_version):
 )
 def test__pre_submit_check_switch_with_local_params(mock_code_version):
     """Test :func:`_pre_submit_check` if workflow with switch with local params of task."""
-    with Workflow(TEST_WORKFLOW_NAME) as pd:
+    with Workflow(TEST_WORKFLOW_NAME) as workflow:
         parent = Task(
             name="parent",
             task_type=TEST_TASK_TYPE,
@@ -324,7 +326,7 @@ def test__pre_submit_check_switch_with_local_params(mock_code_version):
 
         switch = Switch(name="switch", condition=switch_condition)
         parent >> switch
-        pd._pre_submit_check()
+        workflow._pre_submit_check()
 
 
 def test_workflow_get_define_without_task():
@@ -346,45 +348,45 @@ def test_workflow_get_define_without_task():
         "taskRelationJson": [{}],
         "resourceList": [],
     }
-    with Workflow(TEST_WORKFLOW_NAME) as pd:
-        assert pd.get_define() == expect
+    with Workflow(TEST_WORKFLOW_NAME) as workflow:
+        assert workflow.get_define() == expect
 
 
 def test_workflow_simple_context_manager():
     """Test simple create workflow in workflow context manager mode."""
     expect_tasks_num = 5
-    with Workflow(TEST_WORKFLOW_NAME) as pd:
+    with Workflow(TEST_WORKFLOW_NAME) as workflow:
         for i in range(expect_tasks_num):
             curr_task = Task(name=f"task-{i}", task_type=f"type-{i}")
             # Set deps task i as i-1 parent
             if i > 0:
-                pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
+                pre_task = workflow.get_one_task_by_name(f"task-{i - 1}")
                 curr_task.set_upstream(pre_task)
-        assert len(pd.tasks) == expect_tasks_num
+        assert len(workflow.tasks) == expect_tasks_num
 
         # Test if task workflow same as origin one
-        task: Task = pd.get_one_task_by_name("task-0")
-        assert pd is task.workflow
+        task: Task = workflow.get_one_task_by_name("task-0")
+        assert workflow is task.workflow
 
         # Test if all tasks with expect deps
         for i in range(expect_tasks_num):
-            task: Task = pd.get_one_task_by_name(f"task-{i}")
+            task: Task = workflow.get_one_task_by_name(f"task-{i}")
             if i == 0:
                 assert task._upstream_task_codes == set()
                 assert task._downstream_task_codes == {
-                    pd.get_one_task_by_name("task-1").code
+                    workflow.get_one_task_by_name("task-1").code
                 }
             elif i == expect_tasks_num - 1:
                 assert task._upstream_task_codes == {
-                    pd.get_one_task_by_name(f"task-{i - 1}").code
+                    workflow.get_one_task_by_name(f"task-{i - 1}").code
                 }
                 assert task._downstream_task_codes == set()
             else:
                 assert task._upstream_task_codes == {
-                    pd.get_one_task_by_name(f"task-{i - 1}").code
+                    workflow.get_one_task_by_name(f"task-{i - 1}").code
                 }
                 assert task._downstream_task_codes == {
-                    pd.get_one_task_by_name(f"task-{i + 1}").code
+                    workflow.get_one_task_by_name(f"task-{i + 1}").code
                 }
 
 
@@ -399,38 +401,38 @@ def test_deprecated_workflow_simple_context_manager():
         assert issubclass(w[-1].category, DeprecationWarning)
         assert "deprecated" in str(w[-1].message)
 
-        with ProcessDefinition(TEST_WORKFLOW_NAME) as pd:
+        with ProcessDefinition(TEST_WORKFLOW_NAME) as workflow:
             for i in range(expect_tasks_num):
                 curr_task = Task(name=f"task-{i}", task_type=f"type-{i}")
                 # Set deps task i as i-1 parent
                 if i > 0:
-                    pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
+                    pre_task = workflow.get_one_task_by_name(f"task-{i - 1}")
                     curr_task.set_upstream(pre_task)
-            assert len(pd.tasks) == expect_tasks_num
+            assert len(workflow.tasks) == expect_tasks_num
 
             # Test if task workflow same as origin one
-            task: Task = pd.get_one_task_by_name("task-0")
-            assert pd is task.workflow
+            task: Task = workflow.get_one_task_by_name("task-0")
+            assert workflow is task.workflow
 
             # Test if all tasks with expect deps
             for i in range(expect_tasks_num):
-                task: Task = pd.get_one_task_by_name(f"task-{i}")
+                task: Task = workflow.get_one_task_by_name(f"task-{i}")
                 if i == 0:
                     assert task._upstream_task_codes == set()
                     assert task._downstream_task_codes == {
-                        pd.get_one_task_by_name("task-1").code
+                        workflow.get_one_task_by_name("task-1").code
                     }
                 elif i == expect_tasks_num - 1:
                     assert task._upstream_task_codes == {
-                        pd.get_one_task_by_name(f"task-{i - 1}").code
+                        workflow.get_one_task_by_name(f"task-{i - 1}").code
                     }
                     assert task._downstream_task_codes == set()
                 else:
                     assert task._upstream_task_codes == {
-                        pd.get_one_task_by_name(f"task-{i - 1}").code
+                        workflow.get_one_task_by_name(f"task-{i - 1}").code
                     }
                     assert task._downstream_task_codes == {
-                        pd.get_one_task_by_name(f"task-{i + 1}").code
+                        workflow.get_one_task_by_name(f"task-{i + 1}").code
                     }
 
 
@@ -441,19 +443,19 @@ def test_workflow_simple_separate():
     test_workflow_simple_context_manager.
     """
     expect_tasks_num = 5
-    pd = Workflow(TEST_WORKFLOW_NAME)
+    workflow = Workflow(TEST_WORKFLOW_NAME)
     for i in range(expect_tasks_num):
         curr_task = Task(
             name=f"task-{i}",
             task_type=f"type-{i}",
-            workflow=pd,
+            workflow=workflow,
         )
         # Set deps task i as i-1 parent
         if i > 0:
-            pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
+            pre_task = workflow.get_one_task_by_name(f"task-{i - 1}")
             curr_task.set_upstream(pre_task)
-    assert len(pd.tasks) == expect_tasks_num
-    assert all(["task-" in task.name for task in pd.task_list])
+    assert len(workflow.tasks) == expect_tasks_num
+    assert all(["task-" in task.name for task in workflow.task_list])
 
 
 @pytest.mark.parametrize(
@@ -467,8 +469,8 @@ def test_set_workflow_user_attr(user_attrs):
     default_value = {
         "tenant": configuration.WORKFLOW_TENANT,
     }
-    with Workflow(TEST_WORKFLOW_NAME, **user_attrs) as pd:
-        user = pd.user
+    with Workflow(TEST_WORKFLOW_NAME, **user_attrs) as workflow:
+        user = workflow.user
         for attr in default_value:
             # Get assigned attribute if we specific, else get default value
             except_attr = (
@@ -486,8 +488,8 @@ def test_schedule_json_none_schedule():
     with Workflow(
         TEST_WORKFLOW_NAME,
         schedule=None,
-    ) as pd:
-        assert pd.schedule_json is None
+    ) as workflow:
+        assert workflow.schedule_json is None
 
 
 # We freeze time here, because we test start_time with None, and if will get datetime.datetime.now. If we do
@@ -561,5 +563,5 @@ def test_schedule_json_start_and_end_time(start_time, end_time, expect_date):
         start_time=start_time,
         end_time=end_time,
         timezone=configuration.WORKFLOW_TIME_ZONE,
-    ) as pd:
-        assert pd.schedule_json == expect
+    ) as workflow:
+        assert workflow.schedule_json == expect
diff --git a/tests/integration/test_process_definition.py b/tests/integration/test_process_definition.py
index 1ea0051..010ca76 100644
--- a/tests/integration/test_process_definition.py
+++ b/tests/integration/test_process_definition.py
@@ -45,6 +45,6 @@ def test_change_workflow_attr(pre: Dict, post: Dict):
     """Test whether workflow success when specific attribute change."""
     assert pre.keys() == post.keys(), "Not equal keys for pre and post attribute."
     for attrs in [pre, post]:
-        with Workflow(name=WORKFLOW_NAME, **attrs) as pd:
+        with Workflow(name=WORKFLOW_NAME, **attrs) as workflow:
             Shell(name=TASK_NAME, command="echo 1")
-            pd.submit()
+            workflow.submit()
diff --git a/tests/tasks/test_condition.py b/tests/tasks/test_condition.py
index 700f418..3381783 100644
--- a/tests/tasks/test_condition.py
+++ b/tests/tasks/test_condition.py
@@ -401,7 +401,7 @@ def test_condition_get_define(mock_condition_code_version, mock_task_code_versio
 )
 def test_condition_set_dep_workflow(mock_task_code_version):
     """Test task condition set dependence in workflow level."""
-    with Workflow(name="test-condition-set-dep-workflow") as pd:
+    with Workflow(name="test-condition-set-dep-workflow") as workflow:
         pre_task_1 = Task(name="pre_task_1", task_type=TEST_TYPE)
         pre_task_2 = Task(name="pre_task_2", task_type=TEST_TYPE)
         pre_task_3 = Task(name="pre_task_3", task_type=TEST_TYPE)
@@ -423,8 +423,8 @@ def test_condition_set_dep_workflow(mock_task_code_version):
         )
 
         # General tasks test
-        assert len(pd.tasks) == 6
-        assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
+        assert len(workflow.tasks) == 6
+        assert sorted(workflow.task_list, key=lambda t: t.name) == sorted(
             [
                 pre_task_1,
                 pre_task_2,
diff --git a/tests/tasks/test_func_wrap.py b/tests/tasks/test_func_wrap.py
index 8c94c8e..aa0da26 100644
--- a/tests/tasks/test_func_wrap.py
+++ b/tests/tasks/test_func_wrap.py
@@ -41,13 +41,13 @@ def test_single_task_outside(mock_code):
     def foo():
         print(TASK_NAME)
 
-    with Workflow(WORKFLOW_NAME) as pd:
+    with Workflow(WORKFLOW_NAME) as workflow:
         foo()
 
-    assert pd is not None and pd.name == WORKFLOW_NAME
-    assert len(pd.tasks) == 1
+    assert workflow is not None and workflow.name == WORKFLOW_NAME
+    assert len(workflow.tasks) == 1
 
-    pd_task = pd.tasks[12345]
+    pd_task = workflow.tasks[12345]
     assert pd_task.name == "foo"
     assert pd_task.raw_script == "def foo():\n    print(TASK_NAME)\nfoo()"
 
@@ -57,7 +57,7 @@ def test_single_task_outside(mock_code):
 )
 def test_single_task_inside(mock_code):
     """Test single decorator task which inside workflow."""
-    with Workflow(WORKFLOW_NAME) as pd:
+    with Workflow(WORKFLOW_NAME) as workflow:
 
         @task
         def foo():
@@ -65,10 +65,10 @@ def test_single_task_inside(mock_code):
 
         foo()
 
-    assert pd is not None and pd.name == WORKFLOW_NAME
-    assert len(pd.tasks) == 1
+    assert workflow is not None and workflow.name == WORKFLOW_NAME
+    assert len(workflow.tasks) == 1
 
-    pd_task = pd.tasks[12345]
+    pd_task = workflow.tasks[12345]
     assert pd_task.name == "foo"
     assert pd_task.raw_script == "def foo():\n    print(TASK_NAME)\nfoo()"
 
@@ -84,7 +84,7 @@ def test_addition_decorator_error(mock_code):
     def foo():
         print(TASK_NAME)
 
-    with Workflow(WORKFLOW_NAME) as pd:  # noqa: F841
+    with Workflow(WORKFLOW_NAME) as workflow:  # noqa: F841
         with pytest.raises(
             PyDSParamException, match="Do no support other decorators for.*"
         ):
@@ -106,18 +106,18 @@ def test_multiple_tasks_outside(mock_code):
     def bar():
         print(TASK_NAME)
 
-    with Workflow(WORKFLOW_NAME) as pd:
+    with Workflow(WORKFLOW_NAME) as workflow:
         foo = foo()
         bar = bar()
 
         foo >> bar
 
-    assert pd is not None and pd.name == WORKFLOW_NAME
-    assert len(pd.tasks) == 2
+    assert workflow is not None and workflow.name == WORKFLOW_NAME
+    assert len(workflow.tasks) == 2
 
-    task_foo = pd.get_one_task_by_name("foo")
-    task_bar = pd.get_one_task_by_name("bar")
-    assert set(pd.task_list) == {task_foo, task_bar}
+    task_foo = workflow.get_one_task_by_name("foo")
+    task_bar = workflow.get_one_task_by_name("bar")
+    assert set(workflow.task_list) == {task_foo, task_bar}
     assert (
         task_foo is not None
         and task_foo._upstream_task_codes == set()
@@ -136,7 +136,7 @@ def test_multiple_tasks_outside(mock_code):
 )
 def test_multiple_tasks_inside(mock_code):
     """Test multiple decorator tasks which inside workflow."""
-    with Workflow(WORKFLOW_NAME) as pd:
+    with Workflow(WORKFLOW_NAME) as workflow:
 
         @task
         def foo():
@@ -151,12 +151,12 @@ def test_multiple_tasks_inside(mock_code):
 
         foo >> bar
 
-    assert pd is not None and pd.name == WORKFLOW_NAME
-    assert len(pd.tasks) == 2
+    assert workflow is not None and workflow.name == WORKFLOW_NAME
+    assert len(workflow.tasks) == 2
 
-    task_foo = pd.get_one_task_by_name("foo")
-    task_bar = pd.get_one_task_by_name("bar")
-    assert set(pd.task_list) == {task_foo, task_bar}
+    task_foo = workflow.get_one_task_by_name("foo")
+    task_bar = workflow.get_one_task_by_name("bar")
+    assert set(workflow.task_list) == {task_foo, task_bar}
     assert (
         task_foo is not None
         and task_foo._upstream_task_codes == set()
diff --git a/tests/tasks/test_switch.py b/tests/tasks/test_switch.py
index 37c3b44..677ba41 100644
--- a/tests/tasks/test_switch.py
+++ b/tests/tasks/test_switch.py
@@ -266,7 +266,7 @@ def test_switch_get_define(mock_task_code_version):
 )
 def test_switch_set_dep_workflow(mock_task_code_version):
     """Test task switch set dependence in workflow level."""
-    with Workflow(name="test-switch-set-dep-workflow") as pd:
+    with Workflow(name="test-switch-set-dep-workflow") as workflow:
         parent = Task(name="parent", task_type=TEST_TYPE)
         switch_child_1 = Task(name="switch_child_1", task_type=TEST_TYPE)
         switch_child_2 = Task(name="switch_child_2", task_type=TEST_TYPE)
@@ -278,8 +278,8 @@ def test_switch_set_dep_workflow(mock_task_code_version):
         switch = Switch(name=TEST_NAME, condition=switch_condition)
         parent >> switch
         # General tasks test
-        assert len(pd.tasks) == 4
-        assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
+        assert len(workflow.tasks) == 4
+        assert sorted(workflow.task_list, key=lambda t: t.name) == sorted(
             [parent, switch, switch_child_1, switch_child_2], key=lambda t: t.name
         )
         # Task dep test