You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/11/16 13:34:06 UTC

[dolphinscheduler-sdk-python] branch main updated: [chore] Change class name from process definition to workflow (#26)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git


The following commit(s) were added to refs/heads/main by this push:
     new afdd923  [chore] Change class name from process definition to workflow (#26)
afdd923 is described below

commit afdd923995f4947ec4a9ae8f1b6ff7e1964e5209
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Wed Nov 16 21:34:01 2022 +0800

    [chore] Change class name from process definition to workflow (#26)
    
    we should change the class keyword from `process definition` to `workflow`
    
    fix: #22
---
 .github/workflows/ci.yaml                          |   2 +
 .gitignore                                         |   1 +
 DEVELOP.md                                         |   4 +-
 README.md                                          |   2 +-
 UPDATING.md                                        |  12 +-
 docs/source/concept.rst                            |  50 +--
 docs/source/tasks/index.rst                        |   2 +-
 .../tasks/{sub_process.rst => sub_workflow.rst}    |  10 +-
 docs/source/tutorial.rst                           |  58 +--
 examples/yaml_define/Condition.yaml                |   2 +-
 examples/yaml_define/DataX.yaml                    |   2 +-
 examples/yaml_define/Dependent.yaml                |  18 +-
 examples/yaml_define/Dependent_External.yaml       |   2 +-
 examples/yaml_define/Dvc.yaml                      |   2 +-
 examples/yaml_define/Flink.yaml                    |   2 +-
 examples/yaml_define/Http.yaml                     |   2 +-
 examples/yaml_define/Kubernetes.yaml               |   2 +-
 examples/yaml_define/MapReduce.yaml                |   2 +-
 examples/yaml_define/MoreConfiguration.yaml        |   2 +-
 examples/yaml_define/OpenMLDB.yaml                 |   2 +-
 examples/yaml_define/Procedure.yaml                |   2 +-
 examples/yaml_define/Python.yaml                   |   2 +-
 examples/yaml_define/Pytorch.yaml                  |   2 +-
 examples/yaml_define/Sagemaker.yaml                |   2 +-
 examples/yaml_define/Shell.yaml                    |   2 +-
 examples/yaml_define/Spark.yaml                    |   2 +-
 examples/yaml_define/Sql.yaml                      |   2 +-
 .../{SubProcess.yaml => SubWorkflow.yaml}          |   4 +-
 examples/yaml_define/Switch.yaml                   |   2 +-
 examples/yaml_define/example_sub_workflow.yaml     |   2 +-
 examples/yaml_define/mlflow.yaml                   |   2 +-
 examples/yaml_define/tutorial.yaml                 |   2 +-
 src/pydolphinscheduler/cli/commands.py             |   6 +-
 src/pydolphinscheduler/constants.py                |   2 +-
 src/pydolphinscheduler/core/__init__.py            |   4 +-
 src/pydolphinscheduler/core/process_definition.py  | 450 +--------------------
 src/pydolphinscheduler/core/task.py                |  74 ++--
 .../core/{process_definition.py => workflow.py}    |  90 ++---
 .../{yaml_process_define.py => yaml_workflow.py}   |  78 ++--
 .../examples/bulk_create_example.py                |   4 +-
 .../examples/task_condition_example.py             |   4 +-
 .../examples/task_datax_example.py                 |   4 +-
 .../examples/task_dependent_example.py             |  10 +-
 .../examples/task_dvc_example.py                   |   4 +-
 .../examples/task_flink_example.py                 |   4 +-
 .../examples/task_kubernetes_example.py            |   4 +-
 .../examples/task_map_reduce_example.py            |   4 +-
 .../examples/task_mlflow_example.py                |   4 +-
 .../examples/task_openmldb_example.py              |   4 +-
 .../examples/task_pytorch_example.py               |   4 +-
 .../examples/task_sagemaker_example.py             |   4 +-
 .../examples/task_spark_example.py                 |   4 +-
 .../examples/task_switch_example.py                |   4 +-
 src/pydolphinscheduler/examples/tutorial.py        |   6 +-
 .../examples/tutorial_decorator.py                 |   6 +-
 .../examples/tutorial_resource_plugin.py           |  10 +-
 src/pydolphinscheduler/exceptions.py               |   4 +-
 src/pydolphinscheduler/java_gateway.py             |  34 +-
 src/pydolphinscheduler/tasks/__init__.py           |   4 +-
 src/pydolphinscheduler/tasks/dependent.py          |  29 +-
 src/pydolphinscheduler/tasks/sub_process.py        |  46 +--
 src/pydolphinscheduler/tasks/sub_workflow.py       |  56 +++
 src/pydolphinscheduler/tasks/switch.py             |   2 +-
 tests/core/test_task.py                            |  20 +-
 ...test_process_definition.py => test_workflow.py} | 164 +++++---
 ...aml_process_define.py => test_yaml_workflow.py} |  22 +-
 tests/example/test_example.py                      |  53 ++-
 tests/integration/test_process_definition.py       |  14 +-
 tests/tasks/test_condition.py                      |   8 +-
 tests/tasks/test_dependent.py                      |  84 ++--
 tests/tasks/test_func_wrap.py                      |  30 +-
 tests/tasks/test_sub_process.py                    | 115 ------
 tests/tasks/test_sub_workflow.py                   | 170 ++++++++
 tests/tasks/test_switch.py                         |   6 +-
 tests/test_docs.py                                 |   3 +-
 tests/testing/constants.py                         |   2 +-
 76 files changed, 812 insertions(+), 1047 deletions(-)

diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index 96e0db7..684cd3d 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -157,6 +157,8 @@ jobs:
           repository: apache/dolphinscheduler
           path: dolphinscheduler
           submodules: true
+          # Temporary add to make https://github.com/apache/dolphinscheduler-sdk-python/issues/12 work
+          # ref: refs/pull/12918/head
       - name: Cache local Maven repository
         uses: actions/cache@v3
         with:
diff --git a/.gitignore b/.gitignore
index 907b2b1..d20c274 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,6 +5,7 @@
 # Cache
 __pycache__/
 .tox/
+.pytest_cache/
 
 # Build
 build/
diff --git a/DEVELOP.md b/DEVELOP.md
index 2d1a80c..0409b2f 100644
--- a/DEVELOP.md
+++ b/DEVELOP.md
@@ -46,9 +46,9 @@ define by code, user usually do not care user, tenant, or queue exists or not. A
 a new workflow by the code his/her definition. So we have some **side object** in `pydolphinscheduler/side`
 directory, their only check object exists or not, and create them if not exists.
 
-### Process Definition
+### Workflow
 
-pydolphinscheduler workflow object name, process definition is also same name as Java object(maybe would be change to
+pydolphinscheduler workflow object name, workflow is also same name as Java object(maybe would be change to
 other word for more simple).
 
 ### Tasks
diff --git a/README.md b/README.md
index c00700a..3a269e3 100644
--- a/README.md
+++ b/README.md
@@ -78,7 +78,7 @@ python ./tutorial.py
 > tenant value in `example/tutorial.py`. For now the value is `tenant_exists`, please change it to username exists
 > in you environment.
 
-After command execute, you could see a new project with single process definition named *tutorial* in the
+After command execute, you could see a new project with single workflow named *tutorial* in the
 [UI-project list](https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/project/project-list.html).
 
 ## Develop
diff --git a/UPDATING.md b/UPDATING.md
index b298c3b..a1d9c99 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -22,10 +22,18 @@ under the License.
 Updating is try to document non-backward compatible updates which notice users the detail changes about pydolphinscheduler.
 It started after version 2.0.5 released
 
-## dev
+## Main
 
-* Remove parameter ``task_location`` in process definition and Java Gateway service ([#11681](https://github.com/apache/dolphinscheduler/pull/11681))
 * Remove the spark version of spark task ([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
+* Change class name from process definition to workflow ([#26](https://github.com/apache/dolphinscheduler-sdk-python/pull/26))
+  * Deprecated class `ProcessDefinition` to `Workflow`
+  * Deprecated class `SubProcess` to `SubWorkflow`, and change parameter name from `process_definition_name` to `workflow_name`
+  * Deprecated class `Dependent` parameter from `process_definition_name` to `workflow_name`
+  * And all above deprecated will be remove in version 4.1.0
+
+## 3.1.0
+
+* Remove parameter ``task_location`` in process definition and Java Gateway service ([#11681](https://github.com/apache/dolphinscheduler/pull/11681))
 
 ## 3.0.0
 
diff --git a/docs/source/concept.rst b/docs/source/concept.rst
index 9db389b..f5bc367 100644
--- a/docs/source/concept.rst
+++ b/docs/source/concept.rst
@@ -20,25 +20,25 @@ Concepts
 
 In this section, you would know the core concepts of *PyDolphinScheduler*.
 
-Process Definition
-------------------
+Workflow
+--------
 
-Process definition describe the whole things except `tasks`_ and `tasks dependence`_, which including
+Workflow describe the whole things except `tasks`_ and `tasks dependence`_, which including
 name, schedule interval, schedule start time and end time. You would know scheduler 
 
-Process definition could be initialized in normal assign statement or in context manger.
+Workflow could be initialized in normal assign statement or in context manger.
 
 .. code-block:: python
 
    # Initialization with assign statement
-   pd = ProcessDefinition(name="my first process definition")
+   pd = Workflow(name="my first workflow")
 
    # Or context manger 
-   with ProcessDefinition(name="my first process definition") as pd:
+   with Workflow(name="my first workflow") as pd:
        pd.submit()
 
-Process definition is the main object communicate between *PyDolphinScheduler* and DolphinScheduler daemon.
-After process definition and task is be declared, you could use `submit` and `run` notify server your definition.
+Workflow is the main object communicate between *PyDolphinScheduler* and DolphinScheduler daemon.
+After workflow and task is be declared, you could use `submit` and `run` notify server your definition.
 
 If you just want to submit your definition and create workflow, without run it, you should use attribute `submit`.
 But if you want to run the workflow after you submit it, you could use attribute `run`.
@@ -84,7 +84,7 @@ Tenant is the user who run task command in machine or in virtual machine. it cou
 .. code-block:: python
 
    # 
-   pd = ProcessDefinition(name="process definition tenant", tenant="tenant_exists")
+   pd = Workflow(name="workflow tenant", tenant="tenant_exists")
 
 .. note::
 
@@ -93,9 +93,9 @@ Tenant is the user who run task command in machine or in virtual machine. it cou
 Execution Type
 ~~~~~~~~~~~~~~
 
-Decision which behavior to run when process definition have multiple instances. when process definition
+Decision which behavior to run when workflow have multiple instances. when workflow
 schedule interval is too short, it may cause multiple instances run at the same time. We can use this
-parameter to control the behavior about how to run those process definition instances. Currently we
+parameter to control the behavior about how to run those workflow instances. Currently we
 have four execution type:
 
 * ``parallel`` (default value): it means all instances will allow to run even though the previous
@@ -105,7 +105,7 @@ have four execution type:
 * ``serial_discard``: it means the all instance will be discard(abandon) if the previous instance
   is not finished.
 * ``serial_priority``: it means the all instance will wait for the previous instance to finish,
-  and all the waiting instances will be executed base on process definition priority order.
+  and all the waiting instances will be executed base on workflow priority order.
 
 Parameter ``execution type`` can be set in
 
@@ -114,8 +114,8 @@ Parameter ``execution type`` can be set in
 
   .. code-block:: python
 
-     pd = ProcessDefinition(
-         name="process-definition",
+     pd = Workflow(
+         name="workflow_name",
          execution_type="parallel"
      )
 
@@ -141,7 +141,7 @@ If you want to see all type of tasks, you could see :doc:`tasks/index`.
 Tasks Dependence
 ~~~~~~~~~~~~~~~~
 
-You could define many tasks in on single `Process Definition`_. If all those task is in parallel processing,
+You could define many tasks in on single `Workflow`_. If all those task is in parallel processing,
 then you could leave them alone without adding any additional information. But if there have some tasks should
 not be run unless pre task in workflow have be done, we should set task dependence to them. Set tasks dependence
 have two mainly way and both of them is easy. You could use bitwise operator `>>` and `<<`, or task attribute 
@@ -164,23 +164,23 @@ have two mainly way and both of them is easy. You could use bitwise operator `>>
    # for some tasks have same dependence.
    task1 >> [task2, task3]
 
-Task With Process Definition
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+Task With Workflow
+~~~~~~~~~~~~~~~~~~
 
-In most of data orchestration cases, you should assigned attribute `process_definition` to task instance to
-decide workflow of task. You could set `process_definition` in both normal assign or in context manger mode
+In most of data orchestration cases, you should assigned attribute `workflow` to task instance to
+decide workflow of task. You could set `workflow` in both normal assign or in context manger mode
 
 .. code-block:: python
 
-   # Normal assign, have to explicit declaration and pass `ProcessDefinition` instance to task
-   pd = ProcessDefinition(name="my first process definition")
-   shell_task = Shell(name="shell", command="echo shell task", process_definition=pd)
+   # Normal assign, have to explicit declaration and pass `Workflow` instance to task
+   pd = Workflow(name="my first workflow")
+   shell_task = Shell(name="shell", command="echo shell task", workflow=pd)
 
-   # Context manger, `ProcessDefinition` instance pd would implicit declaration to task
-   with ProcessDefinition(name="my first process definition") as pd:
+   # Context manger, `Workflow` instance pd would implicit declaration to task
+   with Workflow(name="my first workflow") as pd:
        shell_task = Shell(name="shell", command="echo shell task",
 
-With both `Process Definition`_, `Tasks`_  and `Tasks Dependence`_, we could build a workflow with multiple tasks.
+With both `Workflow`_, `Tasks`_  and `Tasks Dependence`_, we could build a workflow with multiple tasks.
 
 Authentication Token
 --------------------
diff --git a/docs/source/tasks/index.rst b/docs/source/tasks/index.rst
index dba3503..fa2e73a 100644
--- a/docs/source/tasks/index.rst
+++ b/docs/source/tasks/index.rst
@@ -40,7 +40,7 @@ In this section
    kubernetes
 
    datax
-   sub_process
+   sub_workflow
 
    sagemaker
    mlflow
diff --git a/docs/source/tasks/sub_process.rst b/docs/source/tasks/sub_workflow.rst
similarity index 85%
rename from docs/source/tasks/sub_process.rst
rename to docs/source/tasks/sub_workflow.rst
index 894dd0f..026131a 100644
--- a/docs/source/tasks/sub_process.rst
+++ b/docs/source/tasks/sub_workflow.rst
@@ -15,22 +15,22 @@
    specific language governing permissions and limitations
    under the License.
 
-Sub Process
-===========
+Sub Workflow
+============
 
-.. automodule:: pydolphinscheduler.tasks.sub_process
+.. automodule:: pydolphinscheduler.tasks.sub_workflow
 
 
 YAML file example
 -----------------
 
-.. literalinclude:: ../../../examples/yaml_define/SubProcess.yaml
+.. literalinclude:: ../../../examples/yaml_define/SubWorkflow.yaml
    :start-after: # under the License.
    :language: yaml
 
 
 
-example_subprocess.yaml:
+example_sub_workflow.yaml:
 
 .. literalinclude:: ../../../examples/yaml_define/example_sub_workflow.yaml
    :start-after: # under the License.
diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst
index 16e0d35..695c945 100644
--- a/docs/source/tutorial.rst
+++ b/docs/source/tutorial.rst
@@ -37,7 +37,7 @@ There are two types of tutorials: traditional and task decorator.
   versatility to the traditional way because it only supported Python functions and without build-in tasks
   supported. But it is helpful if your workflow is all built with Python or if you already have some Python
   workflow code and want to migrate them to pydolphinscheduler.
-- **YAML File**: We can use pydolphinscheduler CLI to create process using YAML file: :code:`pydolphinscheduler yaml -f tutorial.yaml`. 
+- **YAML File**: We can use pydolphinscheduler CLI to create workflow using YAML file: :code:`pydolphinscheduler yaml -f tutorial.yaml`. 
   We can find more YAML file examples in `examples/yaml_define <https://github.com/apache/dolphinscheduler-sdk-python/tree/main/examples/yaml_define>`_
 
 .. tab:: Tradition
@@ -72,7 +72,7 @@ First of all, we should import the necessary module which we would use later jus
       :start-after: [start package_import]
       :end-before: [end package_import]
 
-   In tradition tutorial we import :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` and
+   In tradition tutorial we import :class:`pydolphinscheduler.core.workflow.Workflow` and
    :class:`pydolphinscheduler.tasks.shell.Shell`.
 
    If you want to use other task type you could click and :doc:`see all tasks we support <tasks/index>`
@@ -84,16 +84,16 @@ First of all, we should import the necessary module which we would use later jus
       :start-after: [start package_import]
       :end-before: [end package_import]
 
-   In task decorator tutorial we import :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` and
+   In task decorator tutorial we import :class:`pydolphinscheduler.core.workflow.Workflow` and
    :func:`pydolphinscheduler.tasks.func_wrap.task`.
 
-Process Definition Declaration
-------------------------------
+workflow Declaration
+--------------------
 
-We should instantiate :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` object after we
-import them from `import necessary module`_. Here we declare basic arguments for process definition(aka, workflow).
-We define the name of :code:`ProcessDefinition`, using `Python context manager`_ and it **the only required argument**
-for `ProcessDefinition`. Besides, we also declare three arguments named :code:`schedule` and :code:`start_time`
+We should instantiate :class:`pydolphinscheduler.core.workflow.Workflow` object after we
+import them from `import necessary module`_. Here we declare basic arguments for workflow.
+We define the name of :code:`Workflow`, using `Python context manager`_ and it **the only required argument**
+for `Workflow`. Besides, we also declare three arguments named :code:`schedule` and :code:`start_time`
 which setting workflow schedule interval and schedule start_time, and argument :code:`tenant` defines which tenant
 will be running this task in the DolphinScheduler worker. See :ref:`section tenant <concept:tenant>` in
 *PyDolphinScheduler* :doc:`concept` for more information.
@@ -116,12 +116,12 @@ will be running this task in the DolphinScheduler worker. See :ref:`section tena
 
    .. literalinclude:: ../../examples/yaml_define/tutorial.yaml
       :start-after: # under the License.
-      :end-before: # Define the tasks under the workflow
+      :end-before: # Define the tasks within the workflow
       :language: yaml
 
-We could find more detail about :code:`ProcessDefinition` in :ref:`concept about process definition <concept:process definition>`
-if you are interested in it. For all arguments of object process definition, you could find in the
-:class:`pydolphinscheduler.core.process_definition` API documentation.
+We could find more detail about :code:`Workflow` in :ref:`concept about workflow <concept:workflow>`
+if you are interested in it. For all arguments of object workflow, you could find in the
+:class:`pydolphinscheduler.core.workflow` API documentation.
 
 Task Declaration
 ----------------
@@ -144,7 +144,7 @@ Task Declaration
 
    We declare four tasks to show how to create tasks, and both of them are created by the task decorator which
    using :func:`pydolphinscheduler.tasks.func_wrap.task`. All we have to do is add a decorator named
-   :code:`@task` to existing Python function, and then use them inside :class:`pydolphinscheduler.core.process_definition`
+   :code:`@task` to existing Python function, and then use them inside :class:`pydolphinscheduler.core.workflow`
 
    .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
       :dedent: 0
@@ -157,13 +157,13 @@ Task Declaration
 .. tab:: YAML File
 
    .. literalinclude:: ../../examples/yaml_define/tutorial.yaml
-      :start-after: # Define the tasks under the workflow 
+      :start-after: # Define the tasks within the workflow 
       :language: yaml
 
 Setting Task Dependence
 -----------------------
 
-After we declare both process definition and task, we have four tasks that are independent and will be running
+After we declare both workflow and task, we have four tasks that are independent and will be running
 in parallel. If you want to start one task until some task is finished, you have to set dependence on those
 tasks.
 
@@ -193,7 +193,7 @@ and task `task_child_two` was done, because both two task is `task_union`'s upst
    We can use :code:`deps:[]` to set task dependence
 
    .. literalinclude:: ../../examples/yaml_define/tutorial.yaml
-      :start-after: # Define the tasks under the workflow 
+      :start-after: # Define the tasks within the workflow 
       :language: yaml
 
 .. note::
@@ -210,7 +210,7 @@ After that, we finish our workflow definition, with four tasks and task dependen
 local, we should let the DolphinScheduler daemon know how the definition of workflow. So the last thing we
 have to do is submit the workflow to the DolphinScheduler daemon.
 
-Fortunately, we have a convenient method to submit workflow via `ProcessDefinition` attribute :code:`run` which
+Fortunately, we have a convenient method to submit workflow via `Workflow` attribute :code:`run` which
 will create workflow definition as well as workflow schedule.
 
 .. tab:: Tradition
@@ -245,24 +245,24 @@ At last, we could execute this workflow code in your terminal like other Python
 
    If you do not start your DolphinScheduler API server, you could find how to start it in
    :ref:`start:start Python gateway service` for more detail. Besides attribute :code:`run`, we have attribute
-   :code:`submit` for object `ProcessDefinition` which just submits workflow to the daemon but does not set
-   the workflow schedule information. For more detail, you could see :ref:`concept:process definition`.
+   :code:`submit` for object `Workflow` which just submits workflow to the daemon but does not set
+   the workflow schedule information. For more detail, you could see :ref:`concept:workflow`.
 
 DAG Graph After Tutorial Run
 ----------------------------
 
 After we run the tutorial code, you could log in DolphinScheduler web UI, go and see the
-`DolphinScheduler project page`_. They is a new process definition be created by *PyDolphinScheduler* and it
+`DolphinScheduler project page`_. They is a new workflow be created by *PyDolphinScheduler* and it
 named "tutorial" or "tutorial_decorator". The task graph of workflow like below:
 
 .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
    :language: text
    :lines: 24-28
 
-Create Process Using YAML File
-------------------------------
+Create Workflow Using YAML File
+-------------------------------
 
-We can use pydolphinscheduler CLI to create process using YAML file
+We can use pydolphinscheduler CLI to create workflow using YAML file
 
 .. code-block:: bash
 
@@ -271,7 +271,7 @@ We can use pydolphinscheduler CLI to create process using YAML file
 We can use the following four special grammars to define workflows more flexibly.
 
 - :code:`$FILE{"file_name"}`: Read the file (:code:`file_name`) contents and replace them to that location.
-- :code:`$WORKFLOW{"other_workflow.yaml"}`: Refer to another process defined using YAML file (:code:`other_workflow.yaml`) and replace the process name in this location.
+- :code:`$WORKFLOW{"other_workflow.yaml"}`: Refer to another workflow defined using YAML file (:code:`other_workflow.yaml`) and replace the workflow name in this location.
 - :code:`$ENV{env_name}`: Read the environment variable (:code:`env_name`) and replace it to that location.
 - :code:`${CONFIG.key_name}`: Read the configuration value of key (:code:`key_name`) and it them to that location.
 
@@ -290,7 +290,7 @@ For exmaples, our file directory structure is as follows:
        ├── Dependent.yaml
        ├── example_datax.json
        ├── example_sql.sql
-       ├── example_subprocess.yaml
+       ├── example_sub_workflow.yaml
        ├── Flink.yaml
        ├── Http.yaml
        ├── MapReduce.yaml
@@ -300,17 +300,17 @@ For exmaples, our file directory structure is as follows:
        ├── Shell.yaml
        ├── Spark.yaml
        ├── Sql.yaml
-       ├── SubProcess.yaml
+       ├── SubWorkflow.yaml
        └── Switch.yaml
 
 After we run
 
 .. code-block:: bash
 
-   pydolphinscheduler yaml -file yaml_define/SubProcess.yaml
+   pydolphinscheduler yaml -file yaml_define/SubWorkflow.yaml
 
 
-the :code:`$WORKFLOW{"example_sub_workflow.yaml"}` will be set to :code:`$WORKFLOW{"yaml_define/example_sub_workflow.yaml"}`, because :code:`./example_subprocess.yaml` does not exist and :code:`yaml_define/example_sub_workflow.yaml` does.
+the :code:`$WORKFLOW{"example_sub_workflow.yaml"}` will be set to :code:`$WORKFLOW{"yaml_define/example_sub_workflow.yaml"}`, because :code:`./example_sub_workflow.yaml` does not exist and :code:`yaml_define/example_sub_workflow.yaml` does.
 
 Furthermore, this feature supports recursion all the way down.
 
diff --git a/examples/yaml_define/Condition.yaml b/examples/yaml_define/Condition.yaml
index c65b8c7..2416323 100644
--- a/examples/yaml_define/Condition.yaml
+++ b/examples/yaml_define/Condition.yaml
@@ -19,7 +19,7 @@
 workflow:
   name: "Condition"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - { "task_type": "Shell", "name": "pre_task_1", "command": "echo pre_task_1" }
   - { "task_type": "Shell", "name": "pre_task_2", "command": "echo pre_task_2" }
diff --git a/examples/yaml_define/DataX.yaml b/examples/yaml_define/DataX.yaml
index 00ecd54..acf7aee 100644
--- a/examples/yaml_define/DataX.yaml
+++ b/examples/yaml_define/DataX.yaml
@@ -19,7 +19,7 @@
 workflow:
   name: "DataX"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - name: task
     task_type: DataX
diff --git a/examples/yaml_define/Dependent.yaml b/examples/yaml_define/Dependent.yaml
index d69fac0..289cea2 100644
--- a/examples/yaml_define/Dependent.yaml
+++ b/examples/yaml_define/Dependent.yaml
@@ -18,7 +18,7 @@
 workflow:
   name: "Dependent"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - name: dependent
     task_type: Dependent
@@ -28,22 +28,22 @@ tasks:
       - op: or
         groups:
           - project_name: pydolphin
-            process_definition_name: task_dependent_external
+            workflow_name: task_dependent_external
             dependent_task_name: task_1
 
           - project_name: pydolphin
-            process_definition_name: task_dependent_external
+            workflow_name: task_dependent_external
             dependent_task_name: task_2
 
       - op: and
         groups:
           - project_name: pydolphin
-            process_definition_name: task_dependent_external
+            workflow_name: task_dependent_external
             dependent_task_name: task_1
             dependent_date: LAST_WEDNESDAY 
 
           - project_name: pydolphin
-            process_definition_name: task_dependent_external
+            workflow_name: task_dependent_external
             dependent_task_name: task_2
             dependent_date: last24Hours 
 
@@ -57,20 +57,20 @@ tasks:
         # we can use $WORKFLOW{"Dependent_External.yaml"} to create or update a workflow from dependent_external.yaml and set the value to that workflow name
         groups:
           - project_name: ${CONFIG.WORKFLOW_PROJECT} 
-            process_definition_name: $WORKFLOW{"Dependent_External.yaml"} 
+            workflow_name: $WORKFLOW{"Dependent_External.yaml"} 
             dependent_task_name: task_1
 
           - project_name: ${CONFIG.WORKFLOW_PROJECT} 
-            process_definition_name: $WORKFLOW{"Dependent_External.yaml"} 
+            workflow_name: $WORKFLOW{"Dependent_External.yaml"} 
             dependent_task_name: task_2
       - op: and
         groups:
           - project_name: ${CONFIG.WORKFLOW_PROJECT} 
-            process_definition_name: $WORKFLOW{"Dependent_External.yaml"} 
+            workflow_name: $WORKFLOW{"Dependent_External.yaml"} 
             dependent_task_name: task_1
             dependent_date: LAST_WEDNESDAY 
 
           - project_name: ${CONFIG.WORKFLOW_PROJECT} 
-            process_definition_name: $WORKFLOW{"Dependent_External.yaml"} 
+            workflow_name: $WORKFLOW{"Dependent_External.yaml"} 
             dependent_task_name: task_2
             dependent_date: last24Hours 
diff --git a/examples/yaml_define/Dependent_External.yaml b/examples/yaml_define/Dependent_External.yaml
index 577ff6a..77ba180 100644
--- a/examples/yaml_define/Dependent_External.yaml
+++ b/examples/yaml_define/Dependent_External.yaml
@@ -19,7 +19,7 @@
 workflow:
   name: "task_dependent_external"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
   - { "task_type": "Shell", "name": "task_2", "command": "echo task 2" }
diff --git a/examples/yaml_define/Dvc.yaml b/examples/yaml_define/Dvc.yaml
index a6ec18c..9a5aa12 100644
--- a/examples/yaml_define/Dvc.yaml
+++ b/examples/yaml_define/Dvc.yaml
@@ -23,7 +23,7 @@ workflow:
   name: "DVC"
   release_state: "offline"
 
-# Define the tasks under the process
+# Define the tasks within the workflow
 tasks:
   - name: init_dvc 
     task_type: DVCInit
diff --git a/examples/yaml_define/Flink.yaml b/examples/yaml_define/Flink.yaml
index 2449d43..a044d16 100644
--- a/examples/yaml_define/Flink.yaml
+++ b/examples/yaml_define/Flink.yaml
@@ -19,7 +19,7 @@
 workflow:
   name: "Flink"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - name: task
     task_type: Flink
diff --git a/examples/yaml_define/Http.yaml b/examples/yaml_define/Http.yaml
index 1483aeb..839788b 100644
--- a/examples/yaml_define/Http.yaml
+++ b/examples/yaml_define/Http.yaml
@@ -19,7 +19,7 @@
 workflow:
   name: "Http"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - name: task
     task_type: Http
diff --git a/examples/yaml_define/Kubernetes.yaml b/examples/yaml_define/Kubernetes.yaml
index 3197931..1128ca7 100644
--- a/examples/yaml_define/Kubernetes.yaml
+++ b/examples/yaml_define/Kubernetes.yaml
@@ -19,7 +19,7 @@
 workflow:
   name: "kubernetes"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - name: kubernetes
     task_type: K8S
diff --git a/examples/yaml_define/MapReduce.yaml b/examples/yaml_define/MapReduce.yaml
index e1a2b57..a8a83cd 100644
--- a/examples/yaml_define/MapReduce.yaml
+++ b/examples/yaml_define/MapReduce.yaml
@@ -19,7 +19,7 @@
 workflow:
   name: "MapReduce"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - name: task
     task_type: MR
diff --git a/examples/yaml_define/MoreConfiguration.yaml b/examples/yaml_define/MoreConfiguration.yaml
index 258aa33..6fb4357 100644
--- a/examples/yaml_define/MoreConfiguration.yaml
+++ b/examples/yaml_define/MoreConfiguration.yaml
@@ -21,7 +21,7 @@ workflow:
   param:
     n: 1
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - name: shell_0
     task_type: Shell
diff --git a/examples/yaml_define/OpenMLDB.yaml b/examples/yaml_define/OpenMLDB.yaml
index b455cb0..99db726 100644
--- a/examples/yaml_define/OpenMLDB.yaml
+++ b/examples/yaml_define/OpenMLDB.yaml
@@ -19,7 +19,7 @@
 workflow:
   name: "OpenMLDB"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - name: OpenMLDB
     task_type: OpenMLDB
diff --git a/examples/yaml_define/Procedure.yaml b/examples/yaml_define/Procedure.yaml
index 829a961..ecbdf92 100644
--- a/examples/yaml_define/Procedure.yaml
+++ b/examples/yaml_define/Procedure.yaml
@@ -19,7 +19,7 @@
 workflow:
   name: "Procedure"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - name: task
     task_type: Procedure
diff --git a/examples/yaml_define/Python.yaml b/examples/yaml_define/Python.yaml
index 728b5c9..cd05d11 100644
--- a/examples/yaml_define/Python.yaml
+++ b/examples/yaml_define/Python.yaml
@@ -19,7 +19,7 @@
 workflow:
   name: "Python"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - name: python
     task_type: Python
diff --git a/examples/yaml_define/Pytorch.yaml b/examples/yaml_define/Pytorch.yaml
index 8706824..2bc9228 100644
--- a/examples/yaml_define/Pytorch.yaml
+++ b/examples/yaml_define/Pytorch.yaml
@@ -19,7 +19,7 @@
 workflow:
   name: "Pytorch"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
 
   # run project with existing environment
diff --git a/examples/yaml_define/Sagemaker.yaml b/examples/yaml_define/Sagemaker.yaml
index 9f77a3c..5610e89 100644
--- a/examples/yaml_define/Sagemaker.yaml
+++ b/examples/yaml_define/Sagemaker.yaml
@@ -20,7 +20,7 @@ workflow:
   name: "Sagemaker"
   release_state: "offline"
 
-# Define the tasks under the process
+# Define the tasks within the workflow
 tasks:
   - name: sagemaker
     task_type: Sagemaker
diff --git a/examples/yaml_define/Shell.yaml b/examples/yaml_define/Shell.yaml
index fdbe126..7096833 100644
--- a/examples/yaml_define/Shell.yaml
+++ b/examples/yaml_define/Shell.yaml
@@ -21,7 +21,7 @@ workflow:
   release_state: "offline"
   run: true
 
-# Define the tasks under the process
+# Define the tasks within the workflow
 tasks:
   - name: task_parent
     task_type: Shell
diff --git a/examples/yaml_define/Spark.yaml b/examples/yaml_define/Spark.yaml
index e45514b..d0fc9bb 100644
--- a/examples/yaml_define/Spark.yaml
+++ b/examples/yaml_define/Spark.yaml
@@ -19,7 +19,7 @@
 workflow:
   name: "Spark"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - name: task
     task_type: Spark
diff --git a/examples/yaml_define/Sql.yaml b/examples/yaml_define/Sql.yaml
index c3c7e88..e2cd560 100644
--- a/examples/yaml_define/Sql.yaml
+++ b/examples/yaml_define/Sql.yaml
@@ -19,7 +19,7 @@
 workflow:
   name: "Sql"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - name: task_base
     task_type: Sql
diff --git a/examples/yaml_define/SubProcess.yaml b/examples/yaml_define/SubWorkflow.yaml
similarity index 91%
rename from examples/yaml_define/SubProcess.yaml
rename to examples/yaml_define/SubWorkflow.yaml
index 0ea7549..5479038 100644
--- a/examples/yaml_define/SubProcess.yaml
+++ b/examples/yaml_define/SubWorkflow.yaml
@@ -21,7 +21,7 @@ workflow:
 
 tasks:
   - name: example_workflow
-    task_type: SubProcess
-    process_definition_name: $WORKFLOW{"example_sub_workflow.yaml"}
+    task_type: SubWorkflow
+    workflow_name: $WORKFLOW{"example_sub_workflow.yaml"}
 
   - { "task_type": "Shell", "deps": [example_workflow], "name": "task_3", "command": "echo task 3" }
diff --git a/examples/yaml_define/Switch.yaml b/examples/yaml_define/Switch.yaml
index 33ed688..08d7e0b 100644
--- a/examples/yaml_define/Switch.yaml
+++ b/examples/yaml_define/Switch.yaml
@@ -21,7 +21,7 @@ workflow:
   param:
     var: 1
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - name: switch_child_1
     task_type: Shell
diff --git a/examples/yaml_define/example_sub_workflow.yaml b/examples/yaml_define/example_sub_workflow.yaml
index af3a863..2e43d99 100644
--- a/examples/yaml_define/example_sub_workflow.yaml
+++ b/examples/yaml_define/example_sub_workflow.yaml
@@ -19,7 +19,7 @@
 workflow:
   name: "example_workflow_for_sub_workflow"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
   - { "task_type": "Shell", "deps": [task_1], "name": "task_2", "command": "echo task 2" }
diff --git a/examples/yaml_define/mlflow.yaml b/examples/yaml_define/mlflow.yaml
index 45e5672..ce0c4dd 100644
--- a/examples/yaml_define/mlflow.yaml
+++ b/examples/yaml_define/mlflow.yaml
@@ -23,7 +23,7 @@ mlflow_tracking_uri: &mlflow_tracking_uri "http://127.0.0.1:5000"
 workflow:
   name: "MLflow"
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - name: train_xgboost_native
     task_type: MLFlowProjectsCustom 
diff --git a/examples/yaml_define/tutorial.yaml b/examples/yaml_define/tutorial.yaml
index 104a8c3..40d456b 100644
--- a/examples/yaml_define/tutorial.yaml
+++ b/examples/yaml_define/tutorial.yaml
@@ -24,7 +24,7 @@ workflow:
   release_state: "offline"
   run: true
 
-# Define the tasks under the workflow
+# Define the tasks within the workflow
 tasks:
   - name: task_parent
     task_type: Shell
diff --git a/src/pydolphinscheduler/cli/commands.py b/src/pydolphinscheduler/cli/commands.py
index 8d923f1..6a09326 100644
--- a/src/pydolphinscheduler/cli/commands.py
+++ b/src/pydolphinscheduler/cli/commands.py
@@ -26,7 +26,7 @@ from pydolphinscheduler.configuration import (
     init_config_file,
     set_single_config,
 )
-from pydolphinscheduler.core.yaml_process_define import create_process_definition
+from pydolphinscheduler.core.yaml_workflow import create_workflow
 
 version_option_val = ["major", "minor", "micro"]
 
@@ -102,5 +102,5 @@ def config(getter, setter, init) -> None:
     type=click.Path(exists=True),
 )
 def yaml(yaml_file) -> None:
-    """Create process definition using YAML file."""
-    create_process_definition(yaml_file)
+    """Create workflow using YAML file."""
+    create_workflow(yaml_file)
diff --git a/src/pydolphinscheduler/constants.py b/src/pydolphinscheduler/constants.py
index 0e19577..e916ed2 100644
--- a/src/pydolphinscheduler/constants.py
+++ b/src/pydolphinscheduler/constants.py
@@ -48,7 +48,7 @@ class TaskType(str):
     HTTP = "HTTP"
     PYTHON = "PYTHON"
     SQL = "SQL"
-    SUB_PROCESS = "SUB_PROCESS"
+    SUB_WORKFLOW = "SUB_PROCESS"
     PROCEDURE = "PROCEDURE"
     DATAX = "DATAX"
     DEPENDENT = "DEPENDENT"
diff --git a/src/pydolphinscheduler/core/__init__.py b/src/pydolphinscheduler/core/__init__.py
index b997c3e..a23c768 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/src/pydolphinscheduler/core/__init__.py
@@ -19,12 +19,12 @@
 
 from pydolphinscheduler.core.database import Database
 from pydolphinscheduler.core.engine import Engine
-from pydolphinscheduler.core.process_definition import ProcessDefinition
 from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.workflow import Workflow
 
 __all__ = [
     "Database",
     "Engine",
-    "ProcessDefinition",
+    "Workflow",
     "Task",
 ]
diff --git a/src/pydolphinscheduler/core/process_definition.py b/src/pydolphinscheduler/core/process_definition.py
index 3c4c46a..22e7158 100644
--- a/src/pydolphinscheduler/core/process_definition.py
+++ b/src/pydolphinscheduler/core/process_definition.py
@@ -15,447 +15,15 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Module process definition, core class for workflow define."""
+"""This module is deprecated. Please use `pydolphinscheduler.core.workflow.Workflow`."""
 
-import json
-from datetime import datetime
-from typing import Any, Dict, List, Optional, Set
+import warnings
 
-from pydolphinscheduler import configuration
-from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.resource import Resource
-from pydolphinscheduler.core.resource_plugin import ResourcePlugin
-from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
-from pydolphinscheduler.java_gateway import gateway
-from pydolphinscheduler.models import Base, Project, Tenant, User
-from pydolphinscheduler.utils.date import MAX_DATETIME, conv_from_str, conv_to_schedule
+from pydolphinscheduler.core.workflow import Workflow as ProcessDefinition  # noqa
 
-
-class ProcessDefinitionContext:
-    """Class process definition context, use when task get process definition from context expression."""
-
-    _context_managed_process_definition: Optional["ProcessDefinition"] = None
-
-    @classmethod
-    def set(cls, pd: "ProcessDefinition") -> None:
-        """Set attribute self._context_managed_process_definition."""
-        cls._context_managed_process_definition = pd
-
-    @classmethod
-    def get(cls) -> Optional["ProcessDefinition"]:
-        """Get attribute self._context_managed_process_definition."""
-        return cls._context_managed_process_definition
-
-    @classmethod
-    def delete(cls) -> None:
-        """Delete attribute self._context_managed_process_definition."""
-        cls._context_managed_process_definition = None
-
-
-class ProcessDefinition(Base):
-    """process definition object, will define process definition attribute, task, relation.
-
-    TODO: maybe we should rename this class, currently use DS object name.
-
-    :param execution_type: Decision which behavior to run when process definition have multiple instances.
-        when process definition schedule interval is too short, it may cause multiple instances run at the
-        same time. We can use this parameter to control the behavior about how to run those process definition
-        instances. Currently we have four execution type:
-
-          * ``PARALLEL``: Default value, all instances will allow to run even though the previous
-            instance is not finished.
-          * ``SERIAL_WAIT``: All instance will wait for the previous instance to finish, and all
-            the waiting instances will be executed base on scheduling order.
-          * ``SERIAL_DISCARD``: All instances will be discard(abandon) if the previous instance is not
-            finished.
-          * ``SERIAL_PRIORITY``: means the all instance will wait for the previous instance to finish, and
-            all the waiting instances will be executed base on process definition priority order.
-
-    :param user: The user for current process definition. Will create a new one if it do not exists. If your
-        parameter ``project`` already exists but project's create do not belongs to ``user``, will grant
-        ``project`` to ``user`` automatically.
-    :param project: The project for current process definition. You could see the workflow in this project
-        thought Web UI after it :func:`submit` or :func:`run`. It will create a new project belongs to
-        ``user`` if it does not exists. And when ``project`` exists but project's create do not belongs
-        to ``user``, will grant `project` to ``user`` automatically.
-    :param resource_list: Resource files required by the current process definition.You can create and modify
-        resource files from this field. When the process definition is submitted, these resource files are
-        also submitted along with it.
-    """
-
-    # key attribute for identify ProcessDefinition object
-    _KEY_ATTR = {
-        "name",
-        "project",
-        "tenant",
-        "release_state",
-        "param",
-    }
-
-    _DEFINE_ATTR = {
-        "name",
-        "description",
-        "_project",
-        "_tenant",
-        "worker_group",
-        "warning_type",
-        "warning_group_id",
-        "execution_type",
-        "timeout",
-        "release_state",
-        "param",
-        "tasks",
-        "task_definition_json",
-        "task_relation_json",
-        "resource_list",
-    }
-
-    def __init__(
-        self,
-        name: str,
-        description: Optional[str] = None,
-        schedule: Optional[str] = None,
-        start_time: Optional[str] = None,
-        end_time: Optional[str] = None,
-        timezone: Optional[str] = configuration.WORKFLOW_TIME_ZONE,
-        user: Optional[str] = configuration.WORKFLOW_USER,
-        project: Optional[str] = configuration.WORKFLOW_PROJECT,
-        tenant: Optional[str] = configuration.WORKFLOW_TENANT,
-        worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
-        warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE,
-        warning_group_id: Optional[int] = 0,
-        execution_type: Optional[str] = configuration.WORKFLOW_EXECUTION_TYPE,
-        timeout: Optional[int] = 0,
-        release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
-        param: Optional[Dict] = None,
-        resource_plugin: Optional[ResourcePlugin] = None,
-        resource_list: Optional[List[Resource]] = None,
-    ):
-        super().__init__(name, description)
-        self.schedule = schedule
-        self._start_time = start_time
-        self._end_time = end_time
-        self.timezone = timezone
-        self._user = user
-        self._project = project
-        self._tenant = tenant
-        self.worker_group = worker_group
-        self.warning_type = warning_type
-        if warning_type.strip().upper() not in ("FAILURE", "SUCCESS", "ALL", "NONE"):
-            raise PyDSParamException(
-                "Parameter `warning_type` with unexpect value `%s`", warning_type
-            )
-        else:
-            self.warning_type = warning_type.strip().upper()
-        self.warning_group_id = warning_group_id
-        if execution_type is None or execution_type.strip().upper() not in (
-            "PARALLEL",
-            "SERIAL_WAIT",
-            "SERIAL_DISCARD",
-            "SERIAL_PRIORITY",
-        ):
-            raise PyDSParamException(
-                "Parameter `execution_type` with unexpect value `%s`", execution_type
-            )
-        else:
-            self._execution_type = execution_type
-        self.timeout = timeout
-        self._release_state = release_state
-        self.param = param
-        self.tasks: dict = {}
-        self.resource_plugin = resource_plugin
-        # TODO how to fix circle import
-        self._task_relations: set["TaskRelation"] = set()  # noqa: F821
-        self._process_definition_code = None
-        self.resource_list = resource_list or []
-
-    def __enter__(self) -> "ProcessDefinition":
-        ProcessDefinitionContext.set(self)
-        return self
-
-    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
-        ProcessDefinitionContext.delete()
-
-    @property
-    def tenant(self) -> Tenant:
-        """Get attribute tenant."""
-        return Tenant(self._tenant)
-
-    @tenant.setter
-    def tenant(self, tenant: Tenant) -> None:
-        """Set attribute tenant."""
-        self._tenant = tenant.name
-
-    @property
-    def project(self) -> Project:
-        """Get attribute project."""
-        return Project(self._project)
-
-    @project.setter
-    def project(self, project: Project) -> None:
-        """Set attribute project."""
-        self._project = project.name
-
-    @property
-    def user(self) -> User:
-        """Get user object.
-
-        For now we just get from python models but not from java gateway models, so it may not correct.
-        """
-        return User(name=self._user, tenant=self._tenant)
-
-    @staticmethod
-    def _parse_datetime(val: Any) -> Any:
-        if val is None or isinstance(val, datetime):
-            return val
-        elif isinstance(val, str):
-            return conv_from_str(val)
-        else:
-            raise PyDSParamException("Do not support value type %s for now", type(val))
-
-    @property
-    def start_time(self) -> Any:
-        """Get attribute start_time."""
-        return self._parse_datetime(self._start_time)
-
-    @start_time.setter
-    def start_time(self, val) -> None:
-        """Set attribute start_time."""
-        self._start_time = val
-
-    @property
-    def end_time(self) -> Any:
-        """Get attribute end_time."""
-        return self._parse_datetime(self._end_time)
-
-    @end_time.setter
-    def end_time(self, val) -> None:
-        """Set attribute end_time."""
-        self._end_time = val
-
-    @property
-    def release_state(self) -> int:
-        """Get attribute release_state."""
-        rs_ref = {
-            "online": 1,
-            "offline": 0,
-        }
-        if self._release_state not in rs_ref:
-            raise PyDSParamException(
-                "Parameter release_state only support `online` or `offline` but get %",
-                self._release_state,
-            )
-        return rs_ref[self._release_state]
-
-    @release_state.setter
-    def release_state(self, val: str) -> None:
-        """Set attribute release_state."""
-        self._release_state = val.lower()
-
-    @property
-    def execution_type(self) -> str:
-        """Get attribute execution_type."""
-        return self._execution_type.upper()
-
-    @execution_type.setter
-    def execution_type(self, val: str) -> None:
-        """Set attribute execution_type."""
-        self._execution_type = val
-
-    @property
-    def param_json(self) -> Optional[List[Dict]]:
-        """Return param json base on self.param."""
-        # Handle empty dict and None value
-        if not self.param:
-            return []
-        return [
-            {
-                "prop": k,
-                "direct": "IN",
-                "type": "VARCHAR",
-                "value": v,
-            }
-            for k, v in self.param.items()
-        ]
-
-    @property
-    def task_definition_json(self) -> List[Dict]:
-        """Return all tasks definition in list of dict."""
-        if not self.tasks:
-            return [self.tasks]
-        else:
-            return [task.get_define() for task in self.tasks.values()]
-
-    @property
-    def task_relation_json(self) -> List[Dict]:
-        """Return all relation between tasks pair in list of dict."""
-        if not self.tasks:
-            return [self.tasks]
-        else:
-            self._handle_root_relation()
-            return [tr.get_define() for tr in self._task_relations]
-
-    @property
-    def schedule_json(self) -> Optional[Dict]:
-        """Get schedule parameter json object. This is requests from java gateway interface."""
-        if not self.schedule:
-            return None
-        else:
-            start_time = conv_to_schedule(
-                self.start_time if self.start_time else datetime.now()
-            )
-            end_time = conv_to_schedule(
-                self.end_time if self.end_time else MAX_DATETIME
-            )
-            return {
-                "startTime": start_time,
-                "endTime": end_time,
-                "crontab": self.schedule,
-                "timezoneId": self.timezone,
-            }
-
-    @property
-    def task_list(self) -> List["Task"]:  # noqa: F821
-        """Return list of tasks objects."""
-        return list(self.tasks.values())
-
-    def _handle_root_relation(self):
-        """Handle root task property :class:`pydolphinscheduler.core.task.TaskRelation`.
-
-        Root task in DAG do not have dominant upstream node, but we have to add an exactly default
-        upstream task with task_code equal to `0`. This is requests from java gateway interface.
-        """
-        from pydolphinscheduler.core.task import TaskRelation
-
-        post_relation_code = set()
-        for relation in self._task_relations:
-            post_relation_code.add(relation.post_task_code)
-        for task in self.task_list:
-            if task.code not in post_relation_code:
-                root_relation = TaskRelation(pre_task_code=0, post_task_code=task.code)
-                self._task_relations.add(root_relation)
-
-    def add_task(self, task: "Task") -> None:  # noqa: F821
-        """Add a single task to process definition."""
-        self.tasks[task.code] = task
-        task._process_definition = self
-
-    def add_tasks(self, tasks: List["Task"]) -> None:  # noqa: F821
-        """Add task sequence to process definition, it a wrapper of :func:`add_task`."""
-        for task in tasks:
-            self.add_task(task)
-
-    def get_task(self, code: str) -> "Task":  # noqa: F821
-        """Get task object from process definition by given code."""
-        if code not in self.tasks:
-            raise PyDSTaskNoFoundException(
-                "Task with code %s can not found in process definition %",
-                (code, self.name),
-            )
-        return self.tasks[code]
-
-    # TODO which tying should return in this case
-    def get_tasks_by_name(self, name: str) -> Set["Task"]:  # noqa: F821
-        """Get tasks object by given name, if will return all tasks with this name."""
-        find = set()
-        for task in self.tasks.values():
-            if task.name == name:
-                find.add(task)
-        return find
-
-    def get_one_task_by_name(self, name: str) -> "Task":  # noqa: F821
-        """Get exact one task from process definition by given name.
-
-        Function always return one task even though this process definition have more than one task with
-        this name.
-        """
-        tasks = self.get_tasks_by_name(name)
-        if not tasks:
-            raise PyDSTaskNoFoundException(f"Can not find task with name {name}.")
-        return tasks.pop()
-
-    def run(self):
-        """Submit and Start ProcessDefinition instance.
-
-        Shortcut for function :func:`submit` and function :func:`start`. Only support manual start workflow
-        for now, and schedule run will coming soon.
-        :return:
-        """
-        self.submit()
-        self.start()
-
-    def _ensure_side_model_exists(self):
-        """Ensure process definition models model exists.
-
-        For now, models object including :class:`pydolphinscheduler.models.project.Project`,
-        :class:`pydolphinscheduler.models.tenant.Tenant`, :class:`pydolphinscheduler.models.user.User`.
-        If these model not exists, would create default value in
-        :class:`pydolphinscheduler.constants.ProcessDefinitionDefault`.
-        """
-        # TODO used metaclass for more pythonic
-        self.user.create_if_not_exists()
-        # Project model need User object exists
-        self.project.create_if_not_exists(self._user)
-
-    def _pre_submit_check(self):
-        """Check specific condition satisfy before.
-
-        This method should be called before process definition submit to java gateway
-        For now, we have below checker:
-        * `self.param` or at least one local param of task should be set if task `switch` in this workflow.
-        """
-        if (
-            any([task.task_type == TaskType.SWITCH for task in self.tasks.values()])
-            and self.param is None
-            and all([len(task.local_params) == 0 for task in self.tasks.values()])
-        ):
-            raise PyDSParamException(
-                "Parameter param or at least one local_param of task must "
-                "be provider if task Switch in process definition."
-            )
-
-    def submit(self) -> int:
-        """Submit ProcessDefinition instance to java gateway."""
-        self._ensure_side_model_exists()
-        self._pre_submit_check()
-
-        self._process_definition_code = gateway.create_or_update_process_definition(
-            self._user,
-            self._project,
-            self.name,
-            str(self.description) if self.description else "",
-            json.dumps(self.param_json),
-            self.warning_type,
-            self.warning_group_id,
-            self.execution_type,
-            self.timeout,
-            self.worker_group,
-            self._tenant,
-            self.release_state,
-            # TODO add serialization function
-            json.dumps(self.task_relation_json),
-            json.dumps(self.task_definition_json),
-            json.dumps(self.schedule_json) if self.schedule_json else None,
-            None,
-        )
-        if len(self.resource_list) > 0:
-            for res in self.resource_list:
-                res.user_name = self._user
-                res.create_or_update_resource()
-        return self._process_definition_code
-
-    def start(self) -> None:
-        """Create and start ProcessDefinition instance.
-
-        which post to `start-process-instance` to java gateway
-        """
-        gateway.exec_process_instance(
-            self._user,
-            self._project,
-            self.name,
-            "",
-            self.worker_group,
-            self.warning_type,
-            self.warning_group_id,
-            24 * 3600,
-        )
+warnings.warn(
+    "This module is deprecated and will be remove in 4.1.0. Please use"
+    "`pydolphinscheduler.core.workflow.Workflow` instead.",
+    DeprecationWarning,
+    stacklevel=2,
+)
diff --git a/src/pydolphinscheduler/core/task.py b/src/pydolphinscheduler/core/task.py
index 1c08da9..cff2917 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -30,12 +30,9 @@ from pydolphinscheduler.constants import (
     TaskPriority,
     TaskTimeoutFlag,
 )
-from pydolphinscheduler.core.process_definition import (
-    ProcessDefinition,
-    ProcessDefinitionContext,
-)
 from pydolphinscheduler.core.resource import Resource
 from pydolphinscheduler.core.resource_plugin import ResourcePlugin
+from pydolphinscheduler.core.workflow import Workflow, WorkflowContext
 from pydolphinscheduler.exceptions import PyDSParamException, PyResPluginException
 from pydolphinscheduler.java_gateway import gateway
 from pydolphinscheduler.models import Base
@@ -47,7 +44,7 @@ class TaskRelation(Base):
     """TaskRelation object, describe the relation of exactly two tasks."""
 
     # Add attr `_KEY_ATTR` to overwrite :func:`__eq__`, it is make set
-    # `Task.process_definition._task_relations` work correctly.
+    # `Task.workflow._task_relations` work correctly.
     _KEY_ATTR = {
         "pre_task_code",
         "post_task_code",
@@ -135,7 +132,7 @@ class Task(Base):
         timeout_flag: Optional[int] = TaskTimeoutFlag.CLOSE,
         timeout_notify_strategy: Optional = None,
         timeout: Optional[int] = 0,
-        process_definition: Optional[ProcessDefinition] = None,
+        workflow: Optional[Workflow] = None,
         local_params: Optional[List] = None,
         resource_list: Optional[List] = None,
         dependence: Optional[Dict] = None,
@@ -156,25 +153,20 @@ class Task(Base):
         self.timeout_flag = timeout_flag
         self.timeout_notify_strategy = timeout_notify_strategy
         self.timeout = timeout
-        self._process_definition = None
-        self.process_definition: ProcessDefinition = (
-            process_definition or ProcessDefinitionContext.get()
-        )
+        self._workflow = None
+        self.workflow: Workflow = workflow or WorkflowContext.get()
         self._upstream_task_codes: Set[int] = set()
         self._downstream_task_codes: Set[int] = set()
         self._task_relation: Set[TaskRelation] = set()
-        # move attribute code and version after _process_definition and process_definition declare
+        # move attribute code and version after _workflow and workflow declare
         self.code, self.version = self.gen_code_and_version()
-        # Add task to process definition, maybe we could put into property process_definition latter
+        # Add task to workflow, maybe we could put into property workflow latter
 
-        if (
-            self.process_definition is not None
-            and self.code not in self.process_definition.tasks
-        ):
-            self.process_definition.add_task(self)
+        if self.workflow is not None and self.code not in self.workflow.tasks:
+            self.workflow.add_task(self)
         else:
             logger.warning(
-                "Task code %d already in process definition, prohibit re-add task.",
+                "Task code %d already in workflow, prohibit re-add task.",
                 self.code,
             )
 
@@ -188,14 +180,14 @@ class Task(Base):
         self.get_content()
 
     @property
-    def process_definition(self) -> Optional[ProcessDefinition]:
-        """Get attribute process_definition."""
-        return self._process_definition
+    def workflow(self) -> Optional[Workflow]:
+        """Get attribute workflow."""
+        return self._workflow
 
-    @process_definition.setter
-    def process_definition(self, process_definition: Optional[ProcessDefinition]):
-        """Set attribute process_definition."""
-        self._process_definition = process_definition
+    @workflow.setter
+    def workflow(self, workflow: Optional[Workflow]):
+        """Set attribute workflow."""
+        self._workflow = workflow
 
     @property
     def resource_list(self) -> List:
@@ -217,9 +209,9 @@ class Task(Base):
 
     @property
     def user_name(self) -> Optional[str]:
-        """Return user name of process definition."""
-        if self.process_definition:
-            return self.process_definition.user.name
+        """Return user name of workflow."""
+        if self.workflow:
+            return self.workflow.user.name
         else:
             raise PyDSParamException("`user_name` cannot be empty.")
 
@@ -257,11 +249,11 @@ class Task(Base):
         """Return the resource plug-in.
 
         according to parameter resource_plugin and parameter
-        process_definition.resource_plugin.
+        workflow.resource_plugin.
         """
         if self.resource_plugin is None:
-            if self.process_definition.resource_plugin is not None:
-                return self.process_definition.resource_plugin
+            if self.workflow.resource_plugin is not None:
+                return self.workflow.resource_plugin
             else:
                 raise PyResPluginException(
                     "The execution command of this task is a file, but the resource plugin is empty"
@@ -281,8 +273,8 @@ class Task(Base):
                 setattr(self, self.ext_attr.lstrip(Symbol.UNDERLINE), content)
             else:
                 if self.resource_plugin is not None or (
-                    self.process_definition is not None
-                    and self.process_definition.resource_plugin is not None
+                    self.workflow is not None
+                    and self.workflow.resource_plugin is not None
                 ):
                     index = _ext_attr.rfind(Symbol.POINT)
                     if index != -1:
@@ -333,24 +325,24 @@ class Task(Base):
                 self._upstream_task_codes.add(task.code)
                 task._downstream_task_codes.add(self.code)
 
-                if self._process_definition:
+                if self._workflow:
                     task_relation = TaskRelation(
                         pre_task_code=task.code,
                         post_task_code=self.code,
                         name=f"{task.name} {Delimiter.DIRECTION} {self.name}",
                     )
-                    self.process_definition._task_relations.add(task_relation)
+                    self.workflow._task_relations.add(task_relation)
             else:
                 self._downstream_task_codes.add(task.code)
                 task._upstream_task_codes.add(self.code)
 
-                if self._process_definition:
+                if self._workflow:
                     task_relation = TaskRelation(
                         pre_task_code=self.code,
                         post_task_code=task.code,
                         name=f"{self.name} {Delimiter.DIRECTION} {task.name}",
                     )
-                    self.process_definition._task_relations.add(task_relation)
+                    self.workflow._task_relations.add(task_relation)
 
     def set_upstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
         """Set parameter tasks as upstream to current task."""
@@ -360,17 +352,17 @@ class Task(Base):
         """Set parameter tasks as downstream to current task."""
         self._set_deps(tasks, upstream=False)
 
-    # TODO code should better generate in bulk mode when :ref: processDefinition run submit or start
+    # TODO code should better generate in bulk mode when :ref: workflow run submit or start
     def gen_code_and_version(self) -> Tuple:
         """
         Generate task code and version from java gateway.
 
-        If task name do not exists in process definition before, if will generate new code and version id
+        If task name do not exists in workflow before, if will generate new code and version id
         equal to 0 by java gateway, otherwise if will return the exists code and version.
         """
-        # TODO get code from specific project process definition and task name
+        # TODO get code from specific project workflow and task name
         result = gateway.get_code_and_version(
-            self.process_definition._project, self.process_definition.name, self.name
+            self.workflow._project, self.workflow.name, self.name
         )
         # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
         # gateway_result_checker(result)
diff --git a/src/pydolphinscheduler/core/process_definition.py b/src/pydolphinscheduler/core/workflow.py
similarity index 82%
copy from src/pydolphinscheduler/core/process_definition.py
copy to src/pydolphinscheduler/core/workflow.py
index 3c4c46a..db916d8 100644
--- a/src/pydolphinscheduler/core/process_definition.py
+++ b/src/pydolphinscheduler/core/workflow.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Module process definition, core class for workflow define."""
+"""Module workflow, core class for workflow define."""
 
 import json
 from datetime import datetime
@@ -31,35 +31,35 @@ from pydolphinscheduler.models import Base, Project, Tenant, User
 from pydolphinscheduler.utils.date import MAX_DATETIME, conv_from_str, conv_to_schedule
 
 
-class ProcessDefinitionContext:
-    """Class process definition context, use when task get process definition from context expression."""
+class WorkflowContext:
+    """Class workflow context, use when task get workflow from context expression."""
 
-    _context_managed_process_definition: Optional["ProcessDefinition"] = None
+    _context_managed_workflow: Optional["Workflow"] = None
 
     @classmethod
-    def set(cls, pd: "ProcessDefinition") -> None:
-        """Set attribute self._context_managed_process_definition."""
-        cls._context_managed_process_definition = pd
+    def set(cls, pd: "Workflow") -> None:
+        """Set attribute self._context_managed_workflow."""
+        cls._context_managed_workflow = pd
 
     @classmethod
-    def get(cls) -> Optional["ProcessDefinition"]:
-        """Get attribute self._context_managed_process_definition."""
-        return cls._context_managed_process_definition
+    def get(cls) -> Optional["Workflow"]:
+        """Get attribute self._context_managed_workflow."""
+        return cls._context_managed_workflow
 
     @classmethod
     def delete(cls) -> None:
-        """Delete attribute self._context_managed_process_definition."""
-        cls._context_managed_process_definition = None
+        """Delete attribute self._context_managed_workflow."""
+        cls._context_managed_workflow = None
 
 
-class ProcessDefinition(Base):
-    """process definition object, will define process definition attribute, task, relation.
+class Workflow(Base):
+    """Workflow object, will define workflow attribute, task, relation.
 
     TODO: maybe we should rename this class, currently use DS object name.
 
-    :param execution_type: Decision which behavior to run when process definition have multiple instances.
-        when process definition schedule interval is too short, it may cause multiple instances run at the
-        same time. We can use this parameter to control the behavior about how to run those process definition
+    :param execution_type: Decision which behavior to run when workflow have multiple instances.
+        when workflow schedule interval is too short, it may cause multiple instances run at the
+        same time. We can use this parameter to control the behavior about how to run those workflows
         instances. Currently we have four execution type:
 
           * ``PARALLEL``: Default value, all instances will allow to run even though the previous
@@ -69,21 +69,21 @@ class ProcessDefinition(Base):
           * ``SERIAL_DISCARD``: All instances will be discard(abandon) if the previous instance is not
             finished.
           * ``SERIAL_PRIORITY``: means the all instance will wait for the previous instance to finish, and
-            all the waiting instances will be executed base on process definition priority order.
+            all the waiting instances will be executed base on workflow priority order.
 
-    :param user: The user for current process definition. Will create a new one if it do not exists. If your
+    :param user: The user for current workflow. Will create a new one if it do not exists. If your
         parameter ``project`` already exists but project's create do not belongs to ``user``, will grant
         ``project`` to ``user`` automatically.
-    :param project: The project for current process definition. You could see the workflow in this project
+    :param project: The project for current workflow. You could see the workflow in this project
         thought Web UI after it :func:`submit` or :func:`run`. It will create a new project belongs to
         ``user`` if it does not exists. And when ``project`` exists but project's create do not belongs
         to ``user``, will grant `project` to ``user`` automatically.
-    :param resource_list: Resource files required by the current process definition.You can create and modify
-        resource files from this field. When the process definition is submitted, these resource files are
+    :param resource_list: Resource files required by the current workflow.You can create and modify
+        resource files from this field. When the workflow is submitted, these resource files are
         also submitted along with it.
     """
 
-    # key attribute for identify ProcessDefinition object
+    # key attribute for identify Workflow object
     _KEY_ATTR = {
         "name",
         "project",
@@ -166,15 +166,15 @@ class ProcessDefinition(Base):
         self.resource_plugin = resource_plugin
         # TODO how to fix circle import
         self._task_relations: set["TaskRelation"] = set()  # noqa: F821
-        self._process_definition_code = None
+        self._workflow_code = None
         self.resource_list = resource_list or []
 
-    def __enter__(self) -> "ProcessDefinition":
-        ProcessDefinitionContext.set(self)
+    def __enter__(self) -> "Workflow":
+        WorkflowContext.set(self)
         return self
 
     def __exit__(self, exc_type, exc_val, exc_tb) -> None:
-        ProcessDefinitionContext.delete()
+        WorkflowContext.delete()
 
     @property
     def tenant(self) -> Tenant:
@@ -336,20 +336,20 @@ class ProcessDefinition(Base):
                 self._task_relations.add(root_relation)
 
     def add_task(self, task: "Task") -> None:  # noqa: F821
-        """Add a single task to process definition."""
+        """Add a single task to workflow."""
         self.tasks[task.code] = task
-        task._process_definition = self
+        task._workflow = self
 
     def add_tasks(self, tasks: List["Task"]) -> None:  # noqa: F821
-        """Add task sequence to process definition, it a wrapper of :func:`add_task`."""
+        """Add task sequence to workflow, it a wrapper of :func:`add_task`."""
         for task in tasks:
             self.add_task(task)
 
     def get_task(self, code: str) -> "Task":  # noqa: F821
-        """Get task object from process definition by given code."""
+        """Get task object from workflow by given code."""
         if code not in self.tasks:
             raise PyDSTaskNoFoundException(
-                "Task with code %s can not found in process definition %",
+                "Task with code %s can not found in workflow %",
                 (code, self.name),
             )
         return self.tasks[code]
@@ -364,9 +364,9 @@ class ProcessDefinition(Base):
         return find
 
     def get_one_task_by_name(self, name: str) -> "Task":  # noqa: F821
-        """Get exact one task from process definition by given name.
+        """Get exact one task from workflow by given name.
 
-        Function always return one task even though this process definition have more than one task with
+        Function always return one task even though this workflow have more than one task with
         this name.
         """
         tasks = self.get_tasks_by_name(name)
@@ -375,7 +375,7 @@ class ProcessDefinition(Base):
         return tasks.pop()
 
     def run(self):
-        """Submit and Start ProcessDefinition instance.
+        """Submit and Start Workflow instance.
 
         Shortcut for function :func:`submit` and function :func:`start`. Only support manual start workflow
         for now, and schedule run will coming soon.
@@ -385,12 +385,12 @@ class ProcessDefinition(Base):
         self.start()
 
     def _ensure_side_model_exists(self):
-        """Ensure process definition models model exists.
+        """Ensure workflow models model exists.
 
         For now, models object including :class:`pydolphinscheduler.models.project.Project`,
         :class:`pydolphinscheduler.models.tenant.Tenant`, :class:`pydolphinscheduler.models.user.User`.
-        If these model not exists, would create default value in
-        :class:`pydolphinscheduler.constants.ProcessDefinitionDefault`.
+        If these model not exists, would create default value according to
+        :class:`pydolphinscheduler.configuration`.
         """
         # TODO used metaclass for more pythonic
         self.user.create_if_not_exists()
@@ -400,7 +400,7 @@ class ProcessDefinition(Base):
     def _pre_submit_check(self):
         """Check specific condition satisfy before.
 
-        This method should be called before process definition submit to java gateway
+        This method should be called before workflow submit to java gateway
         For now, we have below checker:
         * `self.param` or at least one local param of task should be set if task `switch` in this workflow.
         """
@@ -411,15 +411,15 @@ class ProcessDefinition(Base):
         ):
             raise PyDSParamException(
                 "Parameter param or at least one local_param of task must "
-                "be provider if task Switch in process definition."
+                "be provider if task Switch in workflow."
             )
 
     def submit(self) -> int:
-        """Submit ProcessDefinition instance to java gateway."""
+        """Submit Workflow instance to java gateway."""
         self._ensure_side_model_exists()
         self._pre_submit_check()
 
-        self._process_definition_code = gateway.create_or_update_process_definition(
+        self._workflow_code = gateway.create_or_update_workflow(
             self._user,
             self._project,
             self.name,
@@ -442,14 +442,14 @@ class ProcessDefinition(Base):
             for res in self.resource_list:
                 res.user_name = self._user
                 res.create_or_update_resource()
-        return self._process_definition_code
+        return self._workflow_code
 
     def start(self) -> None:
-        """Create and start ProcessDefinition instance.
+        """Create and start Workflow instance.
 
         which post to `start-process-instance` to java gateway
         """
-        gateway.exec_process_instance(
+        gateway.exec_workflow_instance(
             self._user,
             self._project,
             self.name,
diff --git a/src/pydolphinscheduler/core/yaml_process_define.py b/src/pydolphinscheduler/core/yaml_workflow.py
similarity index 86%
rename from src/pydolphinscheduler/core/yaml_process_define.py
rename to src/pydolphinscheduler/core/yaml_workflow.py
index 0944925..5401112 100644
--- a/src/pydolphinscheduler/core/yaml_process_define.py
+++ b/src/pydolphinscheduler/core/yaml_workflow.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Parse YAML file to create process."""
+"""Parse YAML file to create workflow."""
 
 import logging
 import os
@@ -24,14 +24,14 @@ from pathlib import Path
 from typing import Any, Dict
 
 from pydolphinscheduler import configuration, tasks
-from pydolphinscheduler.core.process_definition import ProcessDefinition
 from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
 from pydolphinscheduler.utils.yaml_parser import YamlParser
 
 logger = logging.getLogger(__file__)
 
-KEY_PROCESS = "workflow"
+KEY_WORKFLOW = "workflow"
 KEY_TASK = "tasks"
 KEY_TASK_TYPE = "task_type"
 KEY_DEPS = "deps"
@@ -103,19 +103,19 @@ def get_task_cls(task_type) -> Task:
     return getattr(tasks, standard_name)
 
 
-class YamlProcess(YamlParser):
-    """Yaml parser for create process.
+class YamlWorkflow(YamlParser):
+    """Yaml parser for create workflow.
 
     :param yaml_file: yaml file path.
 
         examples1 ::
 
             parser = YamlParser(yaml_file=...)
-            parser.create_process_definition()
+            parser.create_workflow()
 
         examples2 ::
 
-            YamlParser(yaml_file=...).create_process_definition()
+            YamlParser(yaml_file=...).create_workflow()
 
     """
 
@@ -130,23 +130,23 @@ class YamlProcess(YamlParser):
             content = f.read()
 
         self._base_folder = Path(yaml_file).parent
-        content = self.prepare_refer_process(content)
+        content = self.prepare_refer_workflow(content)
         super().__init__(content)
 
-    def create_process_definition(self):
-        """Create process main function."""
-        # get process parameters with key "workflow"
-        process_params = self[KEY_PROCESS]
+    def create_workflow(self):
+        """Create workflow main function."""
+        # get workflow parameters with key "workflow"
+        workflow_params = self[KEY_WORKFLOW]
 
         # pop "run" parameter, used at the end
-        is_run = process_params.pop("run", False)
+        is_run = workflow_params.pop("run", False)
 
-        # use YamlProcess._parse_rules to parse special value of yaml file
-        process_params = self.parse_params(process_params)
+        # use YamlWorkflow._parse_rules to parse special value of yaml file
+        workflow_params = self.parse_params(workflow_params)
 
-        process_name = process_params["name"]
-        logger.info(f"Create Process: {process_name}")
-        with ProcessDefinition(**process_params) as pd:
+        workflow_name = workflow_params["name"]
+        logger.info(f"Create workflow: {workflow_name}")
+        with Workflow(**workflow_params) as pd:
 
             # save dependencies between tasks
             dependencies = {}
@@ -171,12 +171,12 @@ class YamlProcess(YamlParser):
                     upstream_task >> downstream_task
 
             pd.submit()
-            # if set is_run, run the process after submit
+            # if set is_run, run the workflow after submit
             if is_run:
                 logger.info(f"run workflow: {pd}")
                 pd.run()
 
-        return process_name
+        return workflow_name
 
     def parse_params(self, params: Any):
         """Recursively resolves the parameter values.
@@ -206,19 +206,21 @@ class YamlProcess(YamlParser):
 
         The function operates params only when it encounters a string; other types continue recursively.
         """
-        process_name = cls(yaml_file).create_process_definition()
-        return process_name
+        workflow_name = cls(yaml_file).create_workflow()
+        return workflow_name
 
-    def prepare_refer_process(self, content):
-        """Allow YAML files to reference process derived from other YAML files."""
-        process_paths = re.findall(r"\$WORKFLOW\{\"(.*?)\"\}", content)
-        for process_path in process_paths:
+    def prepare_refer_workflow(self, content):
+        """Allow YAML files to reference workflow derived from other YAML files."""
+        workflow_paths = re.findall(r"\$WORKFLOW\{\"(.*?)\"\}", content)
+        for workflow_path in workflow_paths:
             logger.info(
-                f"find special token {process_path}, load process form {process_path}"
+                f"find special token {workflow_path}, load workflow form {workflow_path}"
             )
-            possible_path = ParseTool.get_possible_path(process_path, self._base_folder)
-            process_name = YamlProcess.parse(possible_path)
-            content = content.replace('$WORKFLOW{"%s"}' % process_path, process_name)
+            possible_path = ParseTool.get_possible_path(
+                workflow_path, self._base_folder
+            )
+            workflow_name = YamlWorkflow.parse(possible_path)
+            content = content.replace('$WORKFLOW{"%s"}' % workflow_path, workflow_name)
 
         return content
 
@@ -246,7 +248,7 @@ class YamlProcess(YamlParser):
 
         task_cls = get_task_cls(task_type)
 
-        # use YamlProcess._parse_rules to parse special value of yaml file
+        # use YamlWorkflow._parse_rules to parse special value of yaml file
         task_params = self.parse_params(task_params)
 
         if task_cls == tasks.Switch:
@@ -404,7 +406,7 @@ class YamlProcess(YamlParser):
             Or,
         )
 
-        def process_dependent_date(dependent_date):
+        def workflow_dependent_date(dependent_date):
             """Parse dependent date (Compatible with key and value of DependentDate)."""
             dependent_date_upper = dependent_date.upper()
             if hasattr(DependentDate, dependent_date_upper):
@@ -425,19 +427,19 @@ class YamlProcess(YamlParser):
             """Parse dependent item.
 
             project_name: pydolphin
-            process_definition_name: task_dependent_external
+            workflow_name: task_dependent_external
             dependent_task_name: task_1
             dependent_date: LAST_WEDNESDAY
             """
             project_name = source_items["project_name"]
-            process_definition_name = source_items["process_definition_name"]
+            workflow_name = source_items["workflow_name"]
             dependent_task_name = source_items["dependent_task_name"]
             dependent_date = source_items.get("dependent_date", DependentDate.TODAY)
             dependent_item = DependentItem(
                 project_name=project_name,
-                process_definition_name=process_definition_name,
+                workflow_name=workflow_name,
                 dependent_task_name=dependent_task_name,
-                dependent_date=process_dependent_date(dependent_date),
+                dependent_date=workflow_dependent_date(dependent_date),
             )
 
             return dependent_item
@@ -461,6 +463,6 @@ class YamlProcess(YamlParser):
         return task
 
 
-def create_process_definition(yaml_file):
+def create_workflow(yaml_file):
     """CLI."""
-    YamlProcess.parse(yaml_file)
+    YamlWorkflow.parse(yaml_file)
diff --git a/src/pydolphinscheduler/examples/bulk_create_example.py b/src/pydolphinscheduler/examples/bulk_create_example.py
index 72bdb02..9b89f73 100644
--- a/src/pydolphinscheduler/examples/bulk_create_example.py
+++ b/src/pydolphinscheduler/examples/bulk_create_example.py
@@ -26,7 +26,7 @@ task:1-workflow:1 -> task:2-workflow:1 -> task:3-workflow:1
 Each workflow is linear since we set `IS_CHAIN=True`, you could change task to parallel by set it to `False`.
 """
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.shell import Shell
 
 NUM_WORKFLOWS = 10
@@ -41,7 +41,7 @@ IS_CHAIN = True
 for wf in range(0, NUM_WORKFLOWS):
     workflow_name = f"workflow:{wf}"
 
-    with ProcessDefinition(name=workflow_name, tenant=TENANT) as pd:
+    with Workflow(name=workflow_name, tenant=TENANT) as pd:
         for t in range(0, NUM_TASKS):
             task_name = f"task:{t}-{workflow_name}"
             command = f"echo This is task {task_name}"
diff --git a/src/pydolphinscheduler/examples/task_condition_example.py b/src/pydolphinscheduler/examples/task_condition_example.py
index 2d73df4..ea8ca3a 100644
--- a/src/pydolphinscheduler/examples/task_condition_example.py
+++ b/src/pydolphinscheduler/examples/task_condition_example.py
@@ -31,11 +31,11 @@ pre_task_3 ->                     -> fail_branch
 .
 """
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Condition
 from pydolphinscheduler.tasks.shell import Shell
 
-with ProcessDefinition(name="task_condition_example", tenant="tenant_exists") as pd:
+with Workflow(name="task_condition_example", tenant="tenant_exists") as pd:
     pre_task_1 = Shell(name="pre_task_1", command="echo pre_task_1")
     pre_task_2 = Shell(name="pre_task_2", command="echo pre_task_2")
     pre_task_3 = Shell(name="pre_task_3", command="echo pre_task_3")
diff --git a/src/pydolphinscheduler/examples/task_datax_example.py b/src/pydolphinscheduler/examples/task_datax_example.py
index 94bd449..aa4d00a 100644
--- a/src/pydolphinscheduler/examples/task_datax_example.py
+++ b/src/pydolphinscheduler/examples/task_datax_example.py
@@ -25,7 +25,7 @@ You can create data sources `first_mysql` and `first_mysql` through UI.
 It creates a task to synchronize datax from the source database to the target database.
 """
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.datax import CustomDataX, DataX
 
 # datax json template
@@ -72,7 +72,7 @@ JSON_TEMPLATE = {
     }
 }
 
-with ProcessDefinition(
+with Workflow(
     name="task_datax_example",
     tenant="tenant_exists",
 ) as pd:
diff --git a/src/pydolphinscheduler/examples/task_dependent_example.py b/src/pydolphinscheduler/examples/task_dependent_example.py
index db53bcc..93c607c 100644
--- a/src/pydolphinscheduler/examples/task_dependent_example.py
+++ b/src/pydolphinscheduler/examples/task_dependent_example.py
@@ -36,11 +36,11 @@ task_dependent:
 task_dependent(this task dependent on task_dependent_external.task_1 and task_dependent_external.task_2).
 """
 from pydolphinscheduler import configuration
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.dependent import And, Dependent, DependentItem, Or
 from pydolphinscheduler.tasks.shell import Shell
 
-with ProcessDefinition(
+with Workflow(
     name="task_dependent_external",
     tenant="tenant_exists",
 ) as pd:
@@ -49,7 +49,7 @@ with ProcessDefinition(
     task_3 = Shell(name="task_3", command="echo task 3")
     pd.submit()
 
-with ProcessDefinition(
+with Workflow(
     name="task_dependent_example",
     tenant="tenant_exists",
 ) as pd:
@@ -59,12 +59,12 @@ with ProcessDefinition(
             Or(
                 DependentItem(
                     project_name=configuration.WORKFLOW_PROJECT,
-                    process_definition_name="task_dependent_external",
+                    workflow_name="task_dependent_external",
                     dependent_task_name="task_1",
                 ),
                 DependentItem(
                     project_name=configuration.WORKFLOW_PROJECT,
-                    process_definition_name="task_dependent_external",
+                    workflow_name="task_dependent_external",
                     dependent_task_name="task_2",
                 ),
             )
diff --git a/src/pydolphinscheduler/examples/task_dvc_example.py b/src/pydolphinscheduler/examples/task_dvc_example.py
index 2b93cd1..8d1976a 100644
--- a/src/pydolphinscheduler/examples/task_dvc_example.py
+++ b/src/pydolphinscheduler/examples/task_dvc_example.py
@@ -18,12 +18,12 @@
 # [start workflow_declare]
 """A example workflow for task dvc."""
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks import DVCDownload, DVCInit, DVCUpload
 
 repository = "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git"
 
-with ProcessDefinition(
+with Workflow(
     name="task_dvc_example",
     tenant="tenant_exists",
 ) as pd:
diff --git a/src/pydolphinscheduler/examples/task_flink_example.py b/src/pydolphinscheduler/examples/task_flink_example.py
index 1e8a040..a00ba7b 100644
--- a/src/pydolphinscheduler/examples/task_flink_example.py
+++ b/src/pydolphinscheduler/examples/task_flink_example.py
@@ -18,10 +18,10 @@
 # [start workflow_declare]
 """A example workflow for task flink."""
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.flink import DeployMode, Flink, ProgramType
 
-with ProcessDefinition(name="task_flink_example", tenant="tenant_exists") as pd:
+with Workflow(name="task_flink_example", tenant="tenant_exists") as pd:
     task = Flink(
         name="task_flink",
         main_class="org.apache.flink.streaming.examples.wordcount.WordCount",
diff --git a/src/pydolphinscheduler/examples/task_kubernetes_example.py b/src/pydolphinscheduler/examples/task_kubernetes_example.py
index b7a6a8a..d1c807c 100644
--- a/src/pydolphinscheduler/examples/task_kubernetes_example.py
+++ b/src/pydolphinscheduler/examples/task_kubernetes_example.py
@@ -18,10 +18,10 @@
 # [start workflow_declare]
 """A example workflow for task kubernetes."""
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.kubernetes import Kubernetes
 
-with ProcessDefinition(
+with Workflow(
     name="task_kubernetes_example",
     tenant="tenant_exists",
 ) as pd:
diff --git a/src/pydolphinscheduler/examples/task_map_reduce_example.py b/src/pydolphinscheduler/examples/task_map_reduce_example.py
index 39b204f..117c503 100644
--- a/src/pydolphinscheduler/examples/task_map_reduce_example.py
+++ b/src/pydolphinscheduler/examples/task_map_reduce_example.py
@@ -19,10 +19,10 @@
 """A example workflow for task mr."""
 
 from pydolphinscheduler.core.engine import ProgramType
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.map_reduce import MR
 
-with ProcessDefinition(name="task_map_reduce_example", tenant="tenant_exists") as pd:
+with Workflow(name="task_map_reduce_example", tenant="tenant_exists") as pd:
     task = MR(
         name="task_mr",
         main_class="wordcount",
diff --git a/src/pydolphinscheduler/examples/task_mlflow_example.py b/src/pydolphinscheduler/examples/task_mlflow_example.py
index c2734bc..f0d51a2 100644
--- a/src/pydolphinscheduler/examples/task_mlflow_example.py
+++ b/src/pydolphinscheduler/examples/task_mlflow_example.py
@@ -18,7 +18,7 @@
 # [start workflow_declare]
 """A example workflow for task mlflow."""
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.mlflow import (
     MLflowDeployType,
     MLflowModels,
@@ -29,7 +29,7 @@ from pydolphinscheduler.tasks.mlflow import (
 
 mlflow_tracking_uri = "http://127.0.0.1:5000"
 
-with ProcessDefinition(
+with Workflow(
     name="task_mlflow_example",
     tenant="tenant_exists",
 ) as pd:
diff --git a/src/pydolphinscheduler/examples/task_openmldb_example.py b/src/pydolphinscheduler/examples/task_openmldb_example.py
index 5b90091..86454b7 100644
--- a/src/pydolphinscheduler/examples/task_openmldb_example.py
+++ b/src/pydolphinscheduler/examples/task_openmldb_example.py
@@ -18,7 +18,7 @@
 # [start workflow_declare]
 """A example workflow for task openmldb."""
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.openmldb import OpenMLDB
 
 sql = """USE demo_db;
@@ -27,7 +27,7 @@ LOAD DATA INFILE 'file:///tmp/train_sample.csv'
 INTO TABLE talkingdata OPTIONS(mode='overwrite');
 """
 
-with ProcessDefinition(
+with Workflow(
     name="task_openmldb_example",
     tenant="tenant_exists",
 ) as pd:
diff --git a/src/pydolphinscheduler/examples/task_pytorch_example.py b/src/pydolphinscheduler/examples/task_pytorch_example.py
index 6559c9a..8aa7ea9 100644
--- a/src/pydolphinscheduler/examples/task_pytorch_example.py
+++ b/src/pydolphinscheduler/examples/task_pytorch_example.py
@@ -18,10 +18,10 @@
 # [start workflow_declare]
 """A example workflow for task pytorch."""
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.pytorch import Pytorch
 
-with ProcessDefinition(
+with Workflow(
     name="task_pytorch_example",
     tenant="tenant_exists",
 ) as pd:
diff --git a/src/pydolphinscheduler/examples/task_sagemaker_example.py b/src/pydolphinscheduler/examples/task_sagemaker_example.py
index b056f61..f14ceb5 100644
--- a/src/pydolphinscheduler/examples/task_sagemaker_example.py
+++ b/src/pydolphinscheduler/examples/task_sagemaker_example.py
@@ -19,7 +19,7 @@
 """A example workflow for task sagemaker."""
 import json
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.sagemaker import SageMaker
 
 sagemaker_request_data = {
@@ -33,7 +33,7 @@ sagemaker_request_data = {
     ],
 }
 
-with ProcessDefinition(
+with Workflow(
     name="task_sagemaker_example",
     tenant="tenant_exists",
 ) as pd:
diff --git a/src/pydolphinscheduler/examples/task_spark_example.py b/src/pydolphinscheduler/examples/task_spark_example.py
index 594d95f..142f7d5 100644
--- a/src/pydolphinscheduler/examples/task_spark_example.py
+++ b/src/pydolphinscheduler/examples/task_spark_example.py
@@ -18,10 +18,10 @@
 # [start workflow_declare]
 """A example workflow for task spark."""
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark
 
-with ProcessDefinition(name="task_spark_example", tenant="tenant_exists") as pd:
+with Workflow(name="task_spark_example", tenant="tenant_exists") as pd:
     task = Spark(
         name="task_spark",
         main_class="org.apache.spark.examples.SparkPi",
diff --git a/src/pydolphinscheduler/examples/task_switch_example.py b/src/pydolphinscheduler/examples/task_switch_example.py
index 7966af3..d573342 100644
--- a/src/pydolphinscheduler/examples/task_switch_example.py
+++ b/src/pydolphinscheduler/examples/task_switch_example.py
@@ -30,11 +30,11 @@ parent -> switch ->
 .
 """
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.shell import Shell
 from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
 
-with ProcessDefinition(
+with Workflow(
     name="task_switch_example", tenant="tenant_exists", param={"var": "1"}
 ) as pd:
     parent = Shell(name="parent", command="echo parent")
diff --git a/src/pydolphinscheduler/examples/tutorial.py b/src/pydolphinscheduler/examples/tutorial.py
index 0478e68..cb6d47f 100644
--- a/src/pydolphinscheduler/examples/tutorial.py
+++ b/src/pydolphinscheduler/examples/tutorial.py
@@ -32,8 +32,8 @@ it will instantiate and run all the task it have.
 
 # [start tutorial]
 # [start package_import]
-# Import ProcessDefinition object to define your workflow attributes
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+# Import Workflow object to define your workflow attributes
+from pydolphinscheduler.core.workflow import Workflow
 
 # Import task Shell object cause we would create some shell tasks later
 from pydolphinscheduler.tasks.shell import Shell
@@ -41,7 +41,7 @@ from pydolphinscheduler.tasks.shell import Shell
 # [end package_import]
 
 # [start workflow_declare]
-with ProcessDefinition(
+with Workflow(
     name="tutorial",
     schedule="0 0 0 * * ? *",
     start_time="2021-01-01",
diff --git a/src/pydolphinscheduler/examples/tutorial_decorator.py b/src/pydolphinscheduler/examples/tutorial_decorator.py
index 986c1bb..9740af7 100644
--- a/src/pydolphinscheduler/examples/tutorial_decorator.py
+++ b/src/pydolphinscheduler/examples/tutorial_decorator.py
@@ -32,8 +32,8 @@ it will instantiate and run all the task it have.
 
 # [start tutorial]
 # [start package_import]
-# Import ProcessDefinition object to define your workflow attributes
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+# Import Workflow object to define your workflow attributes
+from pydolphinscheduler.core.workflow import Workflow
 
 # Import task Shell object cause we would create some shell tasks later
 from pydolphinscheduler.tasks.func_wrap import task
@@ -70,7 +70,7 @@ def task_union():
 
 
 # [start workflow_declare]
-with ProcessDefinition(
+with Workflow(
     name="tutorial_decorator",
     schedule="0 0 0 * * ? *",
     start_time="2021-01-01",
diff --git a/src/pydolphinscheduler/examples/tutorial_resource_plugin.py b/src/pydolphinscheduler/examples/tutorial_resource_plugin.py
index 5b02022..f336455 100644
--- a/src/pydolphinscheduler/examples/tutorial_resource_plugin.py
+++ b/src/pydolphinscheduler/examples/tutorial_resource_plugin.py
@@ -27,8 +27,8 @@ from pathlib import Path
 
 # [start tutorial_resource_plugin]
 # [start package_import]
-# Import ProcessDefinition object to define your workflow attributes
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+# Import Workflow object to define your workflow attributes
+from pydolphinscheduler.core.workflow import Workflow
 
 # Import task Shell object cause we would create some shell tasks later
 from pydolphinscheduler.resources_plugin.local import Local
@@ -37,13 +37,13 @@ from pydolphinscheduler.tasks.shell import Shell
 # [end package_import]
 
 # [start workflow_declare]
-with ProcessDefinition(
+with Workflow(
     name="tutorial_resource_plugin",
     schedule="0 0 0 * * ? *",
     start_time="2021-01-01",
     tenant="tenant_exists",
     resource_plugin=Local("/tmp"),
-) as process_definition:
+) as workflow:
     # [end workflow_declare]
     # [start task_declare]
     file = "resource.sh"
@@ -59,6 +59,6 @@ with ProcessDefinition(
     # [end task_declare]
 
     # [start submit_or_run]
-    process_definition.run()
+    workflow.run()
     # [end submit_or_run]
 # [end tutorial_resource_plugin]
diff --git a/src/pydolphinscheduler/exceptions.py b/src/pydolphinscheduler/exceptions.py
index 5b0d1bb..5902113 100644
--- a/src/pydolphinscheduler/exceptions.py
+++ b/src/pydolphinscheduler/exceptions.py
@@ -34,8 +34,8 @@ class PyDSJavaGatewayException(PyDSBaseException):
     """Exception for pydolphinscheduler Java gateway error."""
 
 
-class PyDSProcessDefinitionNotAssignException(PyDSBaseException):
-    """Exception for pydolphinscheduler process definition not assign error."""
+class PyDSWorkflowNotAssignException(PyDSBaseException):
+    """Exception for pydolphinscheduler workflow not assign error."""
 
 
 class PyDSConfException(PyDSBaseException):
diff --git a/src/pydolphinscheduler/java_gateway.py b/src/pydolphinscheduler/java_gateway.py
index 0db7acb..b96ce9c 100644
--- a/src/pydolphinscheduler/java_gateway.py
+++ b/src/pydolphinscheduler/java_gateway.py
@@ -127,11 +127,11 @@ class GatewayEntryPoint:
         return self.gateway.entry_point.getEnvironmentInfo(name)
 
     def get_code_and_version(
-        self, project_name: str, process_definition_name: str, task_name: str
+        self, project_name: str, workflow_name: str, task_name: str
     ):
         """Get code and version through java gateway."""
         return self.gateway.entry_point.getCodeAndVersion(
-            project_name, process_definition_name, task_name
+            project_name, workflow_name, task_name
         )
 
     def create_or_grant_project(
@@ -230,23 +230,21 @@ class GatewayEntryPoint:
     def get_dependent_info(
         self,
         project_name: str,
-        process_definition_name: str,
+        workflow_name: str,
         task_name: Optional[str] = None,
     ):
         """Get dependent info through java gateway."""
         return self.gateway.entry_point.getDependentInfo(
-            project_name, process_definition_name, task_name
+            project_name, workflow_name, task_name
         )
 
-    def get_process_definition_info(
-        self, user_name: str, project_name: str, process_definition_name: str
-    ):
-        """Get process definition info through java gateway."""
-        return self.gateway.entry_point.getProcessDefinitionInfo(
-            user_name, project_name, process_definition_name
+    def get_workflow_info(self, user_name: str, project_name: str, workflow_name: str):
+        """Get workflow info through java gateway."""
+        return self.gateway.entry_point.getWorkflowInfo(
+            user_name, project_name, workflow_name
         )
 
-    def create_or_update_process_definition(
+    def create_or_update_workflow(
         self,
         user_name: str,
         project_name: str,
@@ -265,8 +263,8 @@ class GatewayEntryPoint:
         schedule: Optional[str] = None,
         other_params_json: Optional[str] = None,
     ):
-        """Create or update process definition through java gateway."""
-        return self.gateway.entry_point.createOrUpdateProcessDefinition(
+        """Create or update workflow through java gateway."""
+        return self.gateway.entry_point.createOrUpdateWorkflow(
             user_name,
             project_name,
             name,
@@ -285,22 +283,22 @@ class GatewayEntryPoint:
             execution_type,
         )
 
-    def exec_process_instance(
+    def exec_workflow_instance(
         self,
         user_name: str,
         project_name: str,
-        process_definition_name: str,
+        workflow_name: str,
         cron_time: str,
         worker_group: str,
         warning_type: str,
         warning_group_id: int,
         timeout: int,
     ):
-        """Exec process instance through java gateway."""
-        return self.gateway.entry_point.execProcessInstance(
+        """Exec workflow instance through java gateway."""
+        return self.gateway.entry_point.execWorkflowInstance(
             user_name,
             project_name,
-            process_definition_name,
+            workflow_name,
             cron_time,
             worker_group,
             warning_type,
diff --git a/src/pydolphinscheduler/tasks/__init__.py b/src/pydolphinscheduler/tasks/__init__.py
index 4dc2a90..0c14aae 100644
--- a/src/pydolphinscheduler/tasks/__init__.py
+++ b/src/pydolphinscheduler/tasks/__init__.py
@@ -39,7 +39,7 @@ from pydolphinscheduler.tasks.sagemaker import SageMaker
 from pydolphinscheduler.tasks.shell import Shell
 from pydolphinscheduler.tasks.spark import Spark
 from pydolphinscheduler.tasks.sql import Sql
-from pydolphinscheduler.tasks.sub_process import SubProcess
+from pydolphinscheduler.tasks.sub_workflow import SubWorkflow
 from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
 
 __all__ = [
@@ -64,7 +64,7 @@ __all__ = [
     "Shell",
     "Spark",
     "Sql",
-    "SubProcess",
+    "SubWorkflow",
     "Switch",
     "SageMaker",
     "Kubernetes",
diff --git a/src/pydolphinscheduler/tasks/dependent.py b/src/pydolphinscheduler/tasks/dependent.py
index 75ef61f..98a76ef 100644
--- a/src/pydolphinscheduler/tasks/dependent.py
+++ b/src/pydolphinscheduler/tasks/dependent.py
@@ -17,6 +17,7 @@
 
 """Task dependent."""
 
+import warnings
 from typing import Dict, Optional, Tuple
 
 from pydolphinscheduler.constants import TaskType
@@ -73,7 +74,7 @@ class DependentDate(str):
 class DependentItem(Base):
     """Dependent item object, minimal unit for task dependent.
 
-    It declare which project, process_definition, task are dependent to this task.
+    It declares which project, workflow, task are dependent to this task.
     """
 
     _DEFINE_ATTR = {
@@ -89,14 +90,32 @@ class DependentItem(Base):
     def __init__(
         self,
         project_name: str,
-        process_definition_name: str,
+        # TODO zhongjiajie should be also depeloped in 4.1.0
+        workflow_name: Optional[str] = None,
         dependent_task_name: Optional[str] = DEPENDENT_ALL_TASK_IN_WORKFLOW,
         dependent_date: Optional[DependentDate] = DependentDate.TODAY,
+        *args,
+        **kwargs,
     ):
-        obj_name = f"{project_name}.{process_definition_name}.{dependent_task_name}.{dependent_date}"
+        obj_name = (
+            f"{project_name}.{workflow_name}.{dependent_task_name}.{dependent_date}"
+        )
         super().__init__(obj_name)
         self.project_name = project_name
-        self.process_definition_name = process_definition_name
+        if workflow_name is not None:
+            self.workflow_name = workflow_name
+        elif "process_definition_name" in kwargs:
+            warnings.warn(
+                "Parameter name `process_definition_name` is deprecated and will be remove in 4.1.0, "
+                "please use `workflow_name` instead.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+            self.workflow_name = kwargs.pop("process_definition_name")
+        else:
+            raise PyDSParamException(
+                "Parameter `workflow_name` or `process_definition_name` is required, but got None."
+            )
         self.dependent_task_name = dependent_task_name
         if dependent_date is None:
             raise PyDSParamException(
@@ -155,7 +174,7 @@ class DependentItem(Base):
         """Get name info parameter to query code."""
         param = (
             self.project_name,
-            self.process_definition_name,
+            self.workflow_name,
             self.dependent_task_name if not self.is_all_task else None,
         )
         return param
diff --git a/src/pydolphinscheduler/tasks/sub_process.py b/src/pydolphinscheduler/tasks/sub_process.py
index 287077b..5e1ab39 100644
--- a/src/pydolphinscheduler/tasks/sub_process.py
+++ b/src/pydolphinscheduler/tasks/sub_process.py
@@ -15,40 +15,28 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Task sub_process."""
+"""This module is deprecated. Please use `pydolphinscheduler.tasks.sub_workflow.SubWorkflow`."""
 
-from typing import Dict
+import warnings
 
-from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
-from pydolphinscheduler.exceptions import PyDSProcessDefinitionNotAssignException
-from pydolphinscheduler.java_gateway import gateway
+from pydolphinscheduler.tasks.sub_workflow import SubWorkflow
 
+warnings.warn(
+    "This module is deprecated and will be remove in 4.1.0. "
+    "Please use `pydolphinscheduler.tasks.sub_workflow.SubWorkflow` instead.",
+    DeprecationWarning,
+    stacklevel=2,
+)
 
-class SubProcess(Task):
-    """Task SubProcess object, declare behavior for SubProcess task to dolphinscheduler."""
 
-    _task_custom_attr = {"process_definition_code"}
+class SubProcess(SubWorkflow):
+    """Task SubProcess object, declare behavior for SubProcess task to dolphinscheduler.
 
-    def __init__(self, name: str, process_definition_name: str, *args, **kwargs):
-        super().__init__(name, TaskType.SUB_PROCESS, *args, **kwargs)
-        self.process_definition_name = process_definition_name
-
-    @property
-    def process_definition_code(self) -> str:
-        """Get process definition code, a wrapper for :func:`get_process_definition_info`."""
-        return self.get_process_definition_info(self.process_definition_name).get(
-            "code"
-        )
+    This module is deprecated and will be remove in 4.1.0. Please use
+    `pydolphinscheduler.tasks.sub_workflow.SubWorkflow` instead.
+    """
 
-    def get_process_definition_info(self, process_definition_name: str) -> Dict:
-        """Get process definition info from java gateway, contains process definition id, name, code."""
-        if not self.process_definition:
-            raise PyDSProcessDefinitionNotAssignException(
-                "ProcessDefinition must be provider for task SubProcess."
-            )
-        return gateway.get_process_definition_info(
-            self.process_definition.user.name,
-            self.process_definition.project.name,
-            process_definition_name,
+    def __init__(self, name: str, process_definition_name: str, *args, **kwargs):
+        super().__init__(
+            name=name, workflow_name=process_definition_name, *args, **kwargs
         )
diff --git a/src/pydolphinscheduler/tasks/sub_workflow.py b/src/pydolphinscheduler/tasks/sub_workflow.py
new file mode 100644
index 0000000..1cb579c
--- /dev/null
+++ b/src/pydolphinscheduler/tasks/sub_workflow.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task sub workflow."""
+
+from typing import Dict
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSWorkflowNotAssignException
+from pydolphinscheduler.java_gateway import gateway
+
+
+class SubWorkflow(Task):
+    """Task SubWorkflow object, declare behavior for SubWorkflow task to dolphinscheduler."""
+
+    _task_custom_attr = {"process_definition_code"}
+
+    def __init__(self, name: str, workflow_name: str, *args, **kwargs):
+        super().__init__(name, TaskType.SUB_WORKFLOW, *args, **kwargs)
+        self.workflow_name = workflow_name
+
+    @property
+    def process_definition_code(self) -> str:
+        """Get workflow code, a wrapper for :func:`get_workflow_info`.
+
+        We can not change this function name to workflow_code, because it is a keyword used in
+        dolphinscheduler itself.
+        """
+        return self.get_workflow_info(self.workflow_name).get("code")
+
+    def get_workflow_info(self, workflow_name: str) -> Dict:
+        """Get workflow info from java gateway, contains workflow id, name, code."""
+        if not self.workflow:
+            raise PyDSWorkflowNotAssignException(
+                "Workflow must be provider for task SubWorkflow."
+            )
+        return gateway.get_workflow_info(
+            self.workflow.user.name,
+            self.workflow.project.name,
+            workflow_name,
+        )
diff --git a/src/pydolphinscheduler/tasks/switch.py b/src/pydolphinscheduler/tasks/switch.py
index 45edaa9..acc23ba 100644
--- a/src/pydolphinscheduler/tasks/switch.py
+++ b/src/pydolphinscheduler/tasks/switch.py
@@ -130,7 +130,7 @@ class SwitchCondition(Base):
 class Switch(Task):
     """Task switch object, declare behavior for switch task to dolphinscheduler.
 
-    Param of process definition or at least one local param of task must be set
+    Param of workflow or at least one local param of task must be set
     if task `switch` in this workflow.
     """
 
diff --git a/tests/core/test_task.py b/tests/core/test_task.py
index c6ef777..892e2f0 100644
--- a/tests/core/test_task.py
+++ b/tests/core/test_task.py
@@ -23,8 +23,8 @@ from unittest.mock import PropertyMock, patch
 
 import pytest
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
 from pydolphinscheduler.core.task import Task, TaskRelation
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.exceptions import PyResPluginException
 from pydolphinscheduler.resources_plugin import Local
 from tests.testing.task import Task as TestTask
@@ -306,8 +306,8 @@ def test_tasks_list_shift(dep_expr: str, flag: str):
 
 
 def test_add_duplicate(caplog):
-    """Test add task which code already in process definition."""
-    with ProcessDefinition("test_add_duplicate_workflow") as _:
+    """Test add task which code already in workflow."""
+    with Workflow("test_add_duplicate_workflow") as _:
         TaskWithCode(name="test_task_1", task_type="test", code=123, version=1)
         with caplog.at_level(logging.WARNING):
             TaskWithCode(
@@ -316,7 +316,7 @@ def test_add_duplicate(caplog):
         assert all(
             [
                 caplog.text.startswith("WARNING  pydolphinscheduler"),
-                re.findall("already in process definition", caplog.text),
+                re.findall("already in workflow", caplog.text),
             ]
         )
 
@@ -367,8 +367,8 @@ def test_task_ext_attr(
                 "name": "test_task_abtain_res_plugin",
                 "task_type": "TaskType",
                 "resource_plugin": Local("prefix"),
-                "process_definition": ProcessDefinition(
-                    name="process_definition",
+                "workflow": Workflow(
+                    name="workflow",
                     resource_plugin=Local("prefix"),
                 ),
             },
@@ -386,8 +386,8 @@ def test_task_ext_attr(
             {
                 "name": "test_task_abtain_res_plugin",
                 "task_type": "TaskType",
-                "process_definition": ProcessDefinition(
-                    name="process_definition",
+                "workflow": Workflow(
+                    name="workflow",
                     resource_plugin=Local("prefix"),
                 ),
             },
@@ -412,8 +412,8 @@ def test_task_obtain_res_plugin(m_get_content, m_code_version, attr, expected):
         {
             "name": "test_task_abtain_res_plugin",
             "task_type": "TaskType",
-            "process_definition": ProcessDefinition(
-                name="process_definition",
+            "workflow": Workflow(
+                name="workflow",
             ),
         },
     ],
diff --git a/tests/core/test_process_definition.py b/tests/core/test_workflow.py
similarity index 74%
rename from tests/core/test_process_definition.py
rename to tests/core/test_workflow.py
index c8fffc2..e5c8601 100644
--- a/tests/core/test_process_definition.py
+++ b/tests/core/test_workflow.py
@@ -15,8 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test process definition."""
-
+"""Test workflow."""
+import warnings
 from datetime import datetime
 from typing import Any, List
 from unittest.mock import patch
@@ -25,25 +25,23 @@ import pytest
 from freezegun import freeze_time
 
 from pydolphinscheduler import configuration
-from pydolphinscheduler.core.process_definition import ProcessDefinition
 from pydolphinscheduler.core.resource import Resource
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.models import Project, Tenant, User
 from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
 from pydolphinscheduler.utils.date import conv_to_schedule
 from tests.testing.task import Task
 
-TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
+TEST_WORKFLOW_NAME = "simple-test-workflow"
 TEST_TASK_TYPE = "test-task-type"
 
 
 @pytest.mark.parametrize("func", ["run", "submit", "start"])
-def test_process_definition_key_attr(func):
-    """Test process definition have specific functions or attributes."""
-    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
-        assert hasattr(
-            pd, func
-        ), f"ProcessDefinition instance don't have attribute `{func}`"
+def test_workflow_key_attr(func):
+    """Test workflow have specific functions or attributes."""
+    with Workflow(TEST_WORKFLOW_NAME) as pd:
+        assert hasattr(pd, func), f"Workflow instance don't have attribute `{func}`"
 
 
 @pytest.mark.parametrize(
@@ -71,11 +69,11 @@ def test_process_definition_key_attr(func):
         ("release_state", 1),
     ],
 )
-def test_process_definition_default_value(name, value):
-    """Test process definition default attributes."""
-    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+def test_workflow_default_value(name, value):
+    """Test workflow default attributes."""
+    with Workflow(TEST_WORKFLOW_NAME) as pd:
         assert getattr(pd, name) == value, (
-            f"ProcessDefinition instance attribute `{name}` not with "
+            f"Workflow instance attribute `{name}` not with "
             f"except default value `{getattr(pd, name)}`"
         )
 
@@ -101,12 +99,12 @@ def test_process_definition_default_value(name, value):
     ],
 )
 def test_set_attr(name, cls, expect):
-    """Test process definition set attributes which get with same type."""
-    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+    """Test workflow set attributes which get with same type."""
+    with Workflow(TEST_WORKFLOW_NAME) as pd:
         setattr(pd, name, expect)
         assert (
             getattr(pd, name) == expect
-        ), f"ProcessDefinition set attribute `{name}` do not work expect"
+        ), f"Workflow set attribute `{name}` do not work expect"
 
 
 @pytest.mark.parametrize(
@@ -117,11 +115,11 @@ def test_set_attr(name, cls, expect):
     ],
 )
 def test_set_release_state(value, expect):
-    """Test process definition set release_state attributes."""
-    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value) as pd:
+    """Test workflow set release_state attributes."""
+    with Workflow(TEST_WORKFLOW_NAME, release_state=value) as pd:
         assert (
             getattr(pd, "release_state") == expect
-        ), "ProcessDefinition set attribute release_state do not return expect value."
+        ), "Workflow set attribute release_state do not return expect value."
 
 
 @pytest.mark.parametrize(
@@ -135,8 +133,8 @@ def test_set_release_state(value, expect):
     ],
 )
 def test_set_release_state_error(value):
-    """Test process definition set release_state attributes with error."""
-    pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value)
+    """Test workflow set release_state attributes with error."""
+    pd = Workflow(TEST_WORKFLOW_NAME, release_state=value)
     with pytest.raises(
         PyDSParamException,
         match="Parameter release_state only support `online` or `offline` but get.*",
@@ -154,8 +152,8 @@ def test_set_release_state_error(value):
     ],
 )
 def test_set_attr_return_special_object(set_attr, set_val, get_attr, get_val):
-    """Test process definition set attributes which get with different type."""
-    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+    """Test workflow set attributes which get with different type."""
+    with Workflow(TEST_WORKFLOW_NAME) as pd:
         setattr(pd, set_attr, set_val)
         assert get_val == getattr(
             pd, get_attr
@@ -172,11 +170,11 @@ def test_set_attr_return_special_object(set_attr, set_val, get_attr, get_val):
     ],
 )
 def test__parse_datetime(val, expect):
-    """Test process definition function _parse_datetime.
+    """Test workflow function _parse_datetime.
 
     Only two datetime test cases here because we have more test cases in tests/utils/test_date.py file.
     """
-    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+    with Workflow(TEST_WORKFLOW_NAME) as pd:
         assert expect == pd._parse_datetime(
             val
         ), f"Function _parse_datetime with unexpect value by {val}."
@@ -191,8 +189,8 @@ def test__parse_datetime(val, expect):
     ],
 )
 def test__parse_datetime_not_support_type(val: Any):
-    """Test process definition function _parse_datetime not support type error."""
-    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+    """Test workflow function _parse_datetime not support type error."""
+    with Workflow(TEST_WORKFLOW_NAME) as pd:
         with pytest.raises(PyDSParamException, match="Do not support value type.*?"):
             pd._parse_datetime(val)
 
@@ -205,11 +203,11 @@ def test__parse_datetime_not_support_type(val: Any):
     ],
 )
 def test_warn_type_not_support_type(val: str):
-    """Test process definition param warning_type not support type error."""
+    """Test workflow param warning_type not support type error."""
     with pytest.raises(
         PyDSParamException, match="Parameter `warning_type` with unexpect value.*?"
     ):
-        ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, warning_type=val)
+        Workflow(TEST_WORKFLOW_NAME, warning_type=val)
 
 
 @pytest.mark.parametrize(
@@ -221,11 +219,11 @@ def test_warn_type_not_support_type(val: str):
     ],
 )
 def test_execute_type_not_support_type(val: str):
-    """Test process definition param execute_type not support type error."""
+    """Test workflow param execute_type not support type error."""
     with pytest.raises(
         PyDSParamException, match="Parameter `execution_type` with unexpect value.*?"
     ):
-        ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, execution_type=val)
+        Workflow(TEST_WORKFLOW_NAME, execution_type=val)
 
 
 @pytest.mark.parametrize(
@@ -273,8 +271,8 @@ def test_execute_type_not_support_type(val: str):
     ],
 )
 def test_property_param_json(param, expect):
-    """Test ProcessDefinition's property param_json."""
-    pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, param=param)
+    """Test Workflow's property param_json."""
+    pd = Workflow(TEST_WORKFLOW_NAME, param=param)
     assert pd.param_json == expect
 
 
@@ -283,8 +281,8 @@ def test_property_param_json(param, expect):
     return_value=(123, 1),
 )
 def test__pre_submit_check_switch_without_param(mock_code_version):
-    """Test :func:`_pre_submit_check` if process definition with switch but without attribute param."""
-    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+    """Test :func:`_pre_submit_check` if workflow with switch but without attribute param."""
+    with Workflow(TEST_WORKFLOW_NAME) as pd:
         parent = Task(name="parent", task_type=TEST_TASK_TYPE)
         switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
         switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
@@ -298,7 +296,7 @@ def test__pre_submit_check_switch_without_param(mock_code_version):
         with pytest.raises(
             PyDSParamException,
             match="Parameter param or at least one local_param of task must "
-            "be provider if task Switch in process definition.",
+            "be provider if task Switch in workflow.",
         ):
             pd._pre_submit_check()
 
@@ -308,8 +306,8 @@ def test__pre_submit_check_switch_without_param(mock_code_version):
     return_value=(123, 1),
 )
 def test__pre_submit_check_switch_with_local_params(mock_code_version):
-    """Test :func:`_pre_submit_check` if process definition with switch with local params of task."""
-    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+    """Test :func:`_pre_submit_check` if workflow with switch with local params of task."""
+    with Workflow(TEST_WORKFLOW_NAME) as pd:
         parent = Task(
             name="parent",
             task_type=TEST_TASK_TYPE,
@@ -329,10 +327,10 @@ def test__pre_submit_check_switch_with_local_params(mock_code_version):
         pd._pre_submit_check()
 
 
-def test_process_definition_get_define_without_task():
-    """Test process definition function get_define without task."""
+def test_workflow_get_define_without_task():
+    """Test workflow function get_define without task."""
     expect = {
-        "name": TEST_PROCESS_DEFINITION_NAME,
+        "name": TEST_WORKFLOW_NAME,
         "description": None,
         "project": configuration.WORKFLOW_PROJECT,
         "tenant": configuration.WORKFLOW_TENANT,
@@ -348,14 +346,14 @@ def test_process_definition_get_define_without_task():
         "taskRelationJson": [{}],
         "resourceList": [],
     }
-    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+    with Workflow(TEST_WORKFLOW_NAME) as pd:
         assert pd.get_define() == expect
 
 
-def test_process_definition_simple_context_manager():
-    """Test simple create workflow in process definition context manager mode."""
+def test_workflow_simple_context_manager():
+    """Test simple create workflow in workflow context manager mode."""
     expect_tasks_num = 5
-    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+    with Workflow(TEST_WORKFLOW_NAME) as pd:
         for i in range(expect_tasks_num):
             curr_task = Task(name=f"task-{i}", task_type=f"type-{i}")
             # Set deps task i as i-1 parent
@@ -364,9 +362,9 @@ def test_process_definition_simple_context_manager():
                 curr_task.set_upstream(pre_task)
         assert len(pd.tasks) == expect_tasks_num
 
-        # Test if task process_definition same as origin one
+        # Test if task workflow same as origin one
         task: Task = pd.get_one_task_by_name("task-0")
-        assert pd is task.process_definition
+        assert pd is task.workflow
 
         # Test if all tasks with expect deps
         for i in range(expect_tasks_num):
@@ -390,19 +388,65 @@ def test_process_definition_simple_context_manager():
                 }
 
 
-def test_process_definition_simple_separate():
-    """Test process definition simple create workflow in separate mode.
+def test_deprecated_workflow_simple_context_manager():
+    """Test deprecated class ProcessDefinition still work and will raise warning."""
+    expect_tasks_num = 5
+
+    with warnings.catch_warnings(record=True) as w:
+        from pydolphinscheduler.core.process_definition import ProcessDefinition
+
+        assert len(w) == 1
+        assert issubclass(w[-1].category, DeprecationWarning)
+        assert "deprecated" in str(w[-1].message)
+
+        with ProcessDefinition(TEST_WORKFLOW_NAME) as pd:
+            for i in range(expect_tasks_num):
+                curr_task = Task(name=f"task-{i}", task_type=f"type-{i}")
+                # Set deps task i as i-1 parent
+                if i > 0:
+                    pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
+                    curr_task.set_upstream(pre_task)
+            assert len(pd.tasks) == expect_tasks_num
+
+            # Test if task workflow same as origin one
+            task: Task = pd.get_one_task_by_name("task-0")
+            assert pd is task.workflow
+
+            # Test if all tasks with expect deps
+            for i in range(expect_tasks_num):
+                task: Task = pd.get_one_task_by_name(f"task-{i}")
+                if i == 0:
+                    assert task._upstream_task_codes == set()
+                    assert task._downstream_task_codes == {
+                        pd.get_one_task_by_name("task-1").code
+                    }
+                elif i == expect_tasks_num - 1:
+                    assert task._upstream_task_codes == {
+                        pd.get_one_task_by_name(f"task-{i - 1}").code
+                    }
+                    assert task._downstream_task_codes == set()
+                else:
+                    assert task._upstream_task_codes == {
+                        pd.get_one_task_by_name(f"task-{i - 1}").code
+                    }
+                    assert task._downstream_task_codes == {
+                        pd.get_one_task_by_name(f"task-{i + 1}").code
+                    }
+
+
+def test_workflow_simple_separate():
+    """Test workflow simple create workflow in separate mode.
 
     This test just test basic information, cause most of test case is duplicate to
-    test_process_definition_simple_context_manager.
+    test_workflow_simple_context_manager.
     """
     expect_tasks_num = 5
-    pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME)
+    pd = Workflow(TEST_WORKFLOW_NAME)
     for i in range(expect_tasks_num):
         curr_task = Task(
             name=f"task-{i}",
             task_type=f"type-{i}",
-            process_definition=pd,
+            workflow=pd,
         )
         # Set deps task i as i-1 parent
         if i > 0:
@@ -418,12 +462,12 @@ def test_process_definition_simple_separate():
         {"tenant": "tenant_specific"},
     ],
 )
-def test_set_process_definition_user_attr(user_attrs):
-    """Test user with correct attributes if we specific assigned to process definition object."""
+def test_set_workflow_user_attr(user_attrs):
+    """Test user with correct attributes if we specific assigned to workflow object."""
     default_value = {
         "tenant": configuration.WORKFLOW_TENANT,
     }
-    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd:
+    with Workflow(TEST_WORKFLOW_NAME, **user_attrs) as pd:
         user = pd.user
         for attr in default_value:
             # Get assigned attribute if we specific, else get default value
@@ -439,8 +483,8 @@ def test_set_process_definition_user_attr(user_attrs):
 
 def test_schedule_json_none_schedule():
     """Test function schedule_json with None as schedule."""
-    with ProcessDefinition(
-        TEST_PROCESS_DEFINITION_NAME,
+    with Workflow(
+        TEST_WORKFLOW_NAME,
         schedule=None,
     ) as pd:
         assert pd.schedule_json is None
@@ -511,8 +555,8 @@ def test_schedule_json_start_and_end_time(start_time, end_time, expect_date):
         "endTime": expect_date["end_time"],
         "timezoneId": configuration.WORKFLOW_TIME_ZONE,
     }
-    with ProcessDefinition(
-        TEST_PROCESS_DEFINITION_NAME,
+    with Workflow(
+        TEST_WORKFLOW_NAME,
         schedule=schedule,
         start_time=start_time,
         end_time=end_time,
diff --git a/tests/core/test_yaml_process_define.py b/tests/core/test_yaml_workflow.py
similarity index 91%
rename from tests/core/test_yaml_process_define.py
rename to tests/core/test_yaml_workflow.py
index 99ad179..60cf813 100644
--- a/tests/core/test_yaml_process_define.py
+++ b/tests/core/test_yaml_workflow.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test YAML process."""
+"""Test YAML workflow."""
 
 import os
 from pathlib import Path
@@ -24,10 +24,10 @@ from unittest.mock import patch
 import pytest
 
 from pydolphinscheduler import configuration, tasks
-from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.core.yaml_process_define import (
+from pydolphinscheduler.core.workflow import Workflow
+from pydolphinscheduler.core.yaml_workflow import (
     ParseTool,
-    create_process_definition,
+    create_workflow,
     get_task_cls,
 )
 from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
@@ -117,7 +117,7 @@ def test_parse_tool_parse_possible_path_file():
         ("Shell", tasks.Shell),
         ("Spark", tasks.Spark),
         ("Sql", tasks.Sql),
-        ("SubProcess", tasks.SubProcess),
+        ("SubWorkflow", tasks.SubWorkflow),
         ("Switch", tasks.Switch),
         ("SageMaker", tasks.SageMaker),
     ],
@@ -156,7 +156,7 @@ def test_get_error(task_type):
         ("Shell.yaml"),
         ("Spark.yaml"),
         ("Sql.yaml"),
-        ("SubProcess.yaml"),
+        ("SubWorkflow.yaml"),
         # ("Switch.yaml"),
         ("MoreConfiguration.yaml"),
     ],
@@ -177,15 +177,15 @@ def test_get_error(task_type):
         "taskDefinitionCode": 0,
     },
 )
-@patch.object(ProcessDefinition, "run")
-@patch.object(ProcessDefinition, "submit")
-def test_get_create_process_definition(
+@patch.object(Workflow, "run")
+@patch.object(Workflow, "submit")
+def test_get_create_workflow(
     prun, psubmit, dep_item, db_info, resource_info, yaml_file
 ):
-    """Test create_process_definition function to parse example YAML file."""
+    """Test create_workflow function to parse example YAML file."""
     yaml_file_path = Path(path_yaml_example).joinpath(yaml_file)
     with patch(
         "pydolphinscheduler.core.task.Task.gen_code_and_version",
         side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
     ):
-        create_process_definition(yaml_file_path)
+        create_workflow(yaml_file_path)
diff --git a/tests/example/test_example.py b/tests/example/test_example.py
index 319ad96..dbe9c5f 100644
--- a/tests/example/test_example.py
+++ b/tests/example/test_example.py
@@ -27,7 +27,7 @@ from tests.testing.constants import task_without_example
 from tests.testing.path import get_all_examples, get_tasks
 from tests.testing.task import Task
 
-process_definition_name = set()
+workflow_name = set()
 
 
 def import_module(script_name, script_path):
@@ -44,7 +44,8 @@ def test_task_without_example():
     Avoiding add new type of tasks but without adding example describe how to use it.
     """
     # We use example/tutorial.py as shell task example
-    ignore_name = {"__init__.py", "shell.py", "func_wrap.py"}
+    # Task sub_process is deprecated and will be removed in the future, so we set it to ignore
+    ignore_name = {"__init__.py", "shell.py", "func_wrap.py", "sub_process.py"}
     all_tasks = {task.stem for task in get_tasks(ignore_name=ignore_name)}
 
     have_example_tasks = set()
@@ -63,23 +64,23 @@ def test_task_without_example():
 def setup_and_teardown_for_stuff():
     """Fixture of py.test handle setup and teardown."""
     yield
-    global process_definition_name
-    process_definition_name = set()
+    global workflow_name
+    workflow_name = set()
 
 
 def submit_check_without_same_name(self):
-    """Side effect for verifying process definition name and adding it to global variable."""
-    if self.name in process_definition_name:
+    """Side effect for verifying workflow name and adding it to global variable."""
+    if self.name in workflow_name:
         raise ValueError(
-            "Example process definition should not have same name, but get duplicate name: %s",
+            "Example workflow should not have same name, but get duplicate name: %s",
             self.name,
         )
-    submit_add_process_definition(self)
+    submit_add_workflow(self)
 
 
-def submit_add_process_definition(self):
-    """Side effect for adding process definition name to global variable."""
-    process_definition_name.add(self.name)
+def submit_add_workflow(self):
+    """Side effect for adding workflow name to global variable."""
+    workflow_name.add(self.name)
 
 
 def test_example_basic():
@@ -114,9 +115,9 @@ def test_example_basic():
         ), f"We expect all examples have __doc__, but {ex.name} do not."
 
 
-@patch("pydolphinscheduler.core.process_definition.ProcessDefinition.start")
+@patch("pydolphinscheduler.core.workflow.Workflow.start")
 @patch(
-    "pydolphinscheduler.core.process_definition.ProcessDefinition.submit",
+    "pydolphinscheduler.core.workflow.Workflow.submit",
     side_effect=submit_check_without_same_name,
     autospec=True,
 )
@@ -127,12 +128,10 @@ def test_example_basic():
     # using :arg:`return_value`
     side_effect=Task("test_example", "test_example").gen_code_and_version,
 )
-def test_example_process_definition_without_same_name(
-    mock_code_version, mock_submit, mock_start
-):
-    """Test all examples file without same process definition's name.
+def test_example_workflow_without_same_name(mock_code_version, mock_submit, mock_start):
+    """Test all examples file without same workflow's name.
 
-    Our process definition would compete with others if we have same process definition name. It will make
+    Our workflow would compete with others if we have same workflow name. It will make
     different between actually workflow and our workflow-as-code file which make users feel strange.
     """
     for ex in get_all_examples():
@@ -142,10 +141,10 @@ def test_example_process_definition_without_same_name(
     assert True
 
 
-@patch("pydolphinscheduler.core.process_definition.ProcessDefinition.start")
+@patch("pydolphinscheduler.core.workflow.Workflow.start")
 @patch(
-    "pydolphinscheduler.core.process_definition.ProcessDefinition.submit",
-    side_effect=submit_add_process_definition,
+    "pydolphinscheduler.core.workflow.Workflow.submit",
+    side_effect=submit_add_workflow,
     autospec=True,
 )
 @patch(
@@ -155,13 +154,13 @@ def test_example_process_definition_without_same_name(
     # using :arg:`return_value`
     side_effect=Task("test_example", "test_example").gen_code_and_version,
 )
-def test_file_name_in_process_definition(mock_code_version, mock_submit, mock_start):
+def test_file_name_in_workflow(mock_code_version, mock_submit, mock_start):
     """Test example file name in example definition name.
 
     We should not directly assert equal, because some of the examples contain
-    more than one process definition.
+    more than one workflow.
     """
-    global process_definition_name
+    global workflow_name
     for ex in get_all_examples():
         # Skip __init__ file
         if ex.stem == "__init__":
@@ -170,7 +169,7 @@ def test_file_name_in_process_definition(mock_code_version, mock_submit, mock_st
         # without one named bulk_create_example
         if ex.stem == "bulk_create_example":
             continue
-        process_definition_name = set()
-        assert ex.stem not in process_definition_name
+        workflow_name = set()
+        assert ex.stem not in workflow_name
         import_module(ex.name, str(ex))
-        assert ex.stem in process_definition_name
+        assert ex.stem in workflow_name
diff --git a/tests/integration/test_process_definition.py b/tests/integration/test_process_definition.py
index 1672bde..1ea0051 100644
--- a/tests/integration/test_process_definition.py
+++ b/tests/integration/test_process_definition.py
@@ -15,17 +15,17 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test process definition in integration."""
+"""Test workflow in integration."""
 
 from typing import Dict
 
 import pytest
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.tasks.shell import Shell
 
-PROCESS_DEFINITION_NAME = "test_change_exists_attr_pd"
-TASK_NAME = f"task_{PROCESS_DEFINITION_NAME}"
+WORKFLOW_NAME = "test_change_exists_attr_pd"
+TASK_NAME = f"task_{WORKFLOW_NAME}"
 
 
 @pytest.mark.parametrize(
@@ -41,10 +41,10 @@ TASK_NAME = f"task_{PROCESS_DEFINITION_NAME}"
         )
     ],
 )
-def test_change_process_definition_attr(pre: Dict, post: Dict):
-    """Test whether process definition success when specific attribute change."""
+def test_change_workflow_attr(pre: Dict, post: Dict):
+    """Test whether workflow success when specific attribute change."""
     assert pre.keys() == post.keys(), "Not equal keys for pre and post attribute."
     for attrs in [pre, post]:
-        with ProcessDefinition(name=PROCESS_DEFINITION_NAME, **attrs) as pd:
+        with Workflow(name=WORKFLOW_NAME, **attrs) as pd:
             Shell(name=TASK_NAME, command="echo 1")
             pd.submit()
diff --git a/tests/tasks/test_condition.py b/tests/tasks/test_condition.py
index 72eec28..700f418 100644
--- a/tests/tasks/test_condition.py
+++ b/tests/tasks/test_condition.py
@@ -21,7 +21,7 @@ from unittest.mock import patch
 
 import pytest
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.tasks.condition import (
     FAILURE,
@@ -36,7 +36,7 @@ from tests.testing.task import Task
 
 TEST_NAME = "test-name"
 TEST_PROJECT = "test-project"
-TEST_PROCESS_DEFINITION = "test-process-definition"
+TEST_WORKFLOW = "test-workflow"
 TEST_TYPE = "test-type"
 TEST_PROJECT_CODE, TEST_DEFINITION_CODE, TEST_TASK_CODE = 12345, 123456, 1234567
 
@@ -401,7 +401,7 @@ def test_condition_get_define(mock_condition_code_version, mock_task_code_versio
 )
 def test_condition_set_dep_workflow(mock_task_code_version):
     """Test task condition set dependence in workflow level."""
-    with ProcessDefinition(name="test-condition-set-dep-workflow") as pd:
+    with Workflow(name="test-condition-set-dep-workflow") as pd:
         pre_task_1 = Task(name="pre_task_1", task_type=TEST_TYPE)
         pre_task_2 = Task(name="pre_task_2", task_type=TEST_TYPE)
         pre_task_3 = Task(name="pre_task_3", task_type=TEST_TYPE)
@@ -443,7 +443,7 @@ def test_condition_set_dep_workflow(mock_task_code_version):
             fail_branch.code,
         }
 
-        # Condition task dep after ProcessDefinition function get_define called
+        # Condition task dep after Workflow function get_define called
         assert condition._upstream_task_codes == {
             pre_task_1.code,
             pre_task_2.code,
diff --git a/tests/tasks/test_dependent.py b/tests/tasks/test_dependent.py
index f55700e..63dfd54 100644
--- a/tests/tasks/test_dependent.py
+++ b/tests/tasks/test_dependent.py
@@ -17,6 +17,7 @@
 
 """Test Task dependent."""
 import itertools
+import warnings
 from typing import Dict, List, Optional, Tuple, Union
 from unittest.mock import patch
 
@@ -33,7 +34,7 @@ from pydolphinscheduler.tasks.dependent import (
 )
 
 TEST_PROJECT = "test-project"
-TEST_PROCESS_DEFINITION = "test-process-definition"
+TEST_WORKFLOW = "test-workflow"
 TEST_TASK = "test-task"
 TEST_PROJECT_CODE, TEST_DEFINITION_CODE, TEST_TASK_CODE = 12345, 123456, 1234567
 
@@ -96,7 +97,7 @@ def test_dependent_item_get_define(mock_task_info, dep_date, dep_cycle):
     """
     attr = {
         "project_name": TEST_PROJECT,
-        "process_definition_name": TEST_PROCESS_DEFINITION,
+        "workflow_name": TEST_WORKFLOW,
         "dependent_task_name": TEST_TASK,
         "dependent_date": dep_date,
     }
@@ -111,6 +112,37 @@ def test_dependent_item_get_define(mock_task_info, dep_date, dep_cycle):
     assert expect == task.get_define()
 
 
+@patch(
+    "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
+    return_value={
+        "projectCode": TEST_PROJECT_CODE,
+        "processDefinitionCode": TEST_DEFINITION_CODE,
+        "taskDefinitionCode": TEST_TASK_CODE,
+    },
+)
+def test_deprecated_dependent_when_process_definition_name(mock_task_info):
+    """Test deprecated task dependent DependentItem get define still work and raise warning."""
+    attr = {
+        "project_name": TEST_PROJECT,
+        "process_definition_name": TEST_WORKFLOW,
+        "dependent_task_name": TEST_TASK,
+        "dependent_date": DependentDate.THIS_WEEK,
+    }
+    expect = {
+        "projectCode": TEST_PROJECT_CODE,
+        "definitionCode": TEST_DEFINITION_CODE,
+        "depTaskCode": TEST_TASK_CODE,
+        "cycle": "week",
+        "dateValue": DependentDate.THIS_WEEK,
+    }
+    with warnings.catch_warnings(record=True) as w:
+        task = DependentItem(**attr)
+        assert len(w) == 1
+        assert issubclass(w[-1].category, DeprecationWarning)
+        assert "deprecated" in str(w[-1].message)
+        assert expect == task.get_define()
+
+
 def test_dependent_item_date_error():
     """Test error when pass None to dependent_date."""
     with pytest.raises(
@@ -118,7 +150,7 @@ def test_dependent_item_date_error():
     ):
         DependentItem(
             project_name=TEST_PROJECT,
-            process_definition_name=TEST_PROCESS_DEFINITION,
+            workflow_name=TEST_WORKFLOW,
             dependent_date=None,
         )
 
@@ -134,10 +166,10 @@ def test_dependent_item_code_parameter(task_name: dict, result: Optional[str]):
     """Test dependent item property code_parameter."""
     dependent_item = DependentItem(
         project_name=TEST_PROJECT,
-        process_definition_name=TEST_PROCESS_DEFINITION,
+        workflow_name=TEST_WORKFLOW,
         **task_name,
     )
-    expect = (TEST_PROJECT, TEST_PROCESS_DEFINITION, result)
+    expect = (TEST_PROJECT, TEST_WORKFLOW, result)
     assert dependent_item.code_parameter == expect
 
 
@@ -148,7 +180,7 @@ def test_dependent_item_code_parameter(task_name: dict, result: Optional[str]):
         [
             DependentItem(
                 project_name=TEST_PROJECT,
-                process_definition_name=TEST_PROCESS_DEFINITION,
+                workflow_name=TEST_WORKFLOW,
             ),
             1,
         ],
@@ -156,7 +188,7 @@ def test_dependent_item_code_parameter(task_name: dict, result: Optional[str]):
             And(
                 DependentItem(
                     project_name=TEST_PROJECT,
-                    process_definition_name=TEST_PROCESS_DEFINITION,
+                    workflow_name=TEST_WORKFLOW,
                 )
             ),
             1,
@@ -164,12 +196,12 @@ def test_dependent_item_code_parameter(task_name: dict, result: Optional[str]):
         [
             DependentItem(
                 project_name=TEST_PROJECT,
-                process_definition_name=TEST_PROCESS_DEFINITION,
+                workflow_name=TEST_WORKFLOW,
             ),
             And(
                 DependentItem(
                     project_name=TEST_PROJECT,
-                    process_definition_name=TEST_PROCESS_DEFINITION,
+                    workflow_name=TEST_WORKFLOW,
                 )
             ),
         ],
@@ -200,7 +232,7 @@ def test_dependent_operator_set_define_error(mock_code, arg_list):
             (
                 {
                     "project_name": TEST_PROJECT,
-                    "process_definition_name": TEST_PROCESS_DEFINITION,
+                    "workflow_name": TEST_WORKFLOW,
                     "dependent_task_name": TEST_TASK,
                     "dependent_date": DependentDate.LAST_MONTH_END,
                 },
@@ -227,13 +259,13 @@ def test_dependent_operator_set_define_error(mock_code, arg_list):
             (
                 {
                     "project_name": TEST_PROJECT,
-                    "process_definition_name": TEST_PROCESS_DEFINITION,
+                    "workflow_name": TEST_WORKFLOW,
                     "dependent_task_name": TEST_TASK,
                     "dependent_date": DependentDate.LAST_MONTH_END,
                 },
                 {
                     "project_name": TEST_PROJECT,
-                    "process_definition_name": TEST_PROCESS_DEFINITION,
+                    "workflow_name": TEST_WORKFLOW,
                     "dependent_task_name": TEST_TASK,
                     "dependent_date": DependentDate.LAST_WEEK,
                 },
@@ -267,19 +299,19 @@ def test_dependent_operator_set_define_error(mock_code, arg_list):
             (
                 {
                     "project_name": TEST_PROJECT,
-                    "process_definition_name": TEST_PROCESS_DEFINITION,
+                    "workflow_name": TEST_WORKFLOW,
                     "dependent_task_name": TEST_TASK,
                     "dependent_date": DependentDate.LAST_MONTH_END,
                 },
                 {
                     "project_name": TEST_PROJECT,
-                    "process_definition_name": TEST_PROCESS_DEFINITION,
+                    "workflow_name": TEST_WORKFLOW,
                     "dependent_task_name": TEST_TASK,
                     "dependent_date": DependentDate.LAST_WEEK,
                 },
                 {
                     "project_name": TEST_PROJECT,
-                    "process_definition_name": TEST_PROCESS_DEFINITION,
+                    "workflow_name": TEST_WORKFLOW,
                     "dependent_task_name": TEST_TASK,
                     "dependent_date": DependentDate.LAST_ONE_DAYS,
                 },
@@ -371,7 +403,7 @@ def test_operator_dependent_item(
                 (
                     {
                         "project_name": TEST_PROJECT,
-                        "process_definition_name": TEST_PROCESS_DEFINITION,
+                        "workflow_name": TEST_WORKFLOW,
                         "dependent_task_name": TEST_TASK,
                         "dependent_date": DependentDate.LAST_MONTH_END,
                     },
@@ -408,13 +440,13 @@ def test_operator_dependent_item(
                 (
                     {
                         "project_name": TEST_PROJECT,
-                        "process_definition_name": TEST_PROCESS_DEFINITION,
+                        "workflow_name": TEST_WORKFLOW,
                         "dependent_task_name": TEST_TASK,
                         "dependent_date": DependentDate.LAST_MONTH_END,
                     },
                     {
                         "project_name": TEST_PROJECT,
-                        "process_definition_name": TEST_PROCESS_DEFINITION,
+                        "workflow_name": TEST_WORKFLOW,
                         "dependent_task_name": TEST_TASK,
                         "dependent_date": DependentDate.LAST_WEEK,
                     },
@@ -458,19 +490,19 @@ def test_operator_dependent_item(
                 (
                     {
                         "project_name": TEST_PROJECT,
-                        "process_definition_name": TEST_PROCESS_DEFINITION,
+                        "workflow_name": TEST_WORKFLOW,
                         "dependent_task_name": TEST_TASK,
                         "dependent_date": DependentDate.LAST_MONTH_END,
                     },
                     {
                         "project_name": TEST_PROJECT,
-                        "process_definition_name": TEST_PROCESS_DEFINITION,
+                        "workflow_name": TEST_WORKFLOW,
                         "dependent_task_name": TEST_TASK,
                         "dependent_date": DependentDate.LAST_WEEK,
                     },
                     {
                         "project_name": TEST_PROJECT,
-                        "process_definition_name": TEST_PROCESS_DEFINITION,
+                        "workflow_name": TEST_WORKFLOW,
                         "dependent_task_name": TEST_TASK,
                         "dependent_date": DependentDate.LAST_ONE_DAYS,
                     },
@@ -604,7 +636,7 @@ def get_dep_task_list(*operator):
                 ((And, And), (And, Or), (Or, And), (Or, Or)),
                 {
                     "project_name": TEST_PROJECT,
-                    "process_definition_name": TEST_PROCESS_DEFINITION,
+                    "workflow_name": TEST_WORKFLOW,
                     "dependent_task_name": TEST_TASK,
                     "dependent_date": DependentDate.LAST_MONTH_END,
                 },
@@ -625,7 +657,7 @@ def get_dep_task_list(*operator):
                 ((And, And, And), (And, And, And, And), (And, And, And, And, And)),
                 {
                     "project_name": TEST_PROJECT,
-                    "process_definition_name": TEST_PROCESS_DEFINITION,
+                    "workflow_name": TEST_WORKFLOW,
                     "dependent_task_name": TEST_TASK,
                     "dependent_date": DependentDate.LAST_MONTH_END,
                 },
@@ -716,21 +748,21 @@ def test_operator_dependent_task_list_multi_dependent_list(
 def test_dependent_get_define(mock_code_version, mock_dep_code):
     """Test task dependent function get_define."""
     project_name = "test-dep-project"
-    process_definition_name = "test-dep-definition"
+    workflow_name = "test-dep-definition"
     dependent_task_name = "test-dep-task"
     dep_operator = And(
         Or(
             # test dependence with add tasks
             DependentItem(
                 project_name=project_name,
-                process_definition_name=process_definition_name,
+                workflow_name=workflow_name,
             )
         ),
         And(
             # test dependence with specific task
             DependentItem(
                 project_name=project_name,
-                process_definition_name=process_definition_name,
+                workflow_name=workflow_name,
                 dependent_task_name=dependent_task_name,
             )
         ),
diff --git a/tests/tasks/test_func_wrap.py b/tests/tasks/test_func_wrap.py
index 628b6e7..8c94c8e 100644
--- a/tests/tasks/test_func_wrap.py
+++ b/tests/tasks/test_func_wrap.py
@@ -21,13 +21,13 @@ from unittest.mock import patch
 
 import pytest
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.tasks.func_wrap import task
 from tests.testing.decorator import foo as foo_decorator
 from tests.testing.task import Task
 
-PD_NAME = "test_process_definition"
+WORKFLOW_NAME = "test_workflow"
 TASK_NAME = "test_task"
 
 
@@ -35,16 +35,16 @@ TASK_NAME = "test_task"
     "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
 )
 def test_single_task_outside(mock_code):
-    """Test single decorator task which outside process definition."""
+    """Test single decorator task which outside workflow."""
 
     @task
     def foo():
         print(TASK_NAME)
 
-    with ProcessDefinition(PD_NAME) as pd:
+    with Workflow(WORKFLOW_NAME) as pd:
         foo()
 
-    assert pd is not None and pd.name == PD_NAME
+    assert pd is not None and pd.name == WORKFLOW_NAME
     assert len(pd.tasks) == 1
 
     pd_task = pd.tasks[12345]
@@ -56,8 +56,8 @@ def test_single_task_outside(mock_code):
     "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
 )
 def test_single_task_inside(mock_code):
-    """Test single decorator task which inside process definition."""
-    with ProcessDefinition(PD_NAME) as pd:
+    """Test single decorator task which inside workflow."""
+    with Workflow(WORKFLOW_NAME) as pd:
 
         @task
         def foo():
@@ -65,7 +65,7 @@ def test_single_task_inside(mock_code):
 
         foo()
 
-    assert pd is not None and pd.name == PD_NAME
+    assert pd is not None and pd.name == WORKFLOW_NAME
     assert len(pd.tasks) == 1
 
     pd_task = pd.tasks[12345]
@@ -84,7 +84,7 @@ def test_addition_decorator_error(mock_code):
     def foo():
         print(TASK_NAME)
 
-    with ProcessDefinition(PD_NAME) as pd:  # noqa: F841
+    with Workflow(WORKFLOW_NAME) as pd:  # noqa: F841
         with pytest.raises(
             PyDSParamException, match="Do no support other decorators for.*"
         ):
@@ -96,7 +96,7 @@ def test_addition_decorator_error(mock_code):
     side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
 )
 def test_multiple_tasks_outside(mock_code):
-    """Test multiple decorator tasks which outside process definition."""
+    """Test multiple decorator tasks which outside workflow."""
 
     @task
     def foo():
@@ -106,13 +106,13 @@ def test_multiple_tasks_outside(mock_code):
     def bar():
         print(TASK_NAME)
 
-    with ProcessDefinition(PD_NAME) as pd:
+    with Workflow(WORKFLOW_NAME) as pd:
         foo = foo()
         bar = bar()
 
         foo >> bar
 
-    assert pd is not None and pd.name == PD_NAME
+    assert pd is not None and pd.name == WORKFLOW_NAME
     assert len(pd.tasks) == 2
 
     task_foo = pd.get_one_task_by_name("foo")
@@ -135,8 +135,8 @@ def test_multiple_tasks_outside(mock_code):
     side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
 )
 def test_multiple_tasks_inside(mock_code):
-    """Test multiple decorator tasks which inside process definition."""
-    with ProcessDefinition(PD_NAME) as pd:
+    """Test multiple decorator tasks which inside workflow."""
+    with Workflow(WORKFLOW_NAME) as pd:
 
         @task
         def foo():
@@ -151,7 +151,7 @@ def test_multiple_tasks_inside(mock_code):
 
         foo >> bar
 
-    assert pd is not None and pd.name == PD_NAME
+    assert pd is not None and pd.name == WORKFLOW_NAME
     assert len(pd.tasks) == 2
 
     task_foo = pd.get_one_task_by_name("foo")
diff --git a/tests/tasks/test_sub_process.py b/tests/tasks/test_sub_process.py
deleted file mode 100644
index 126ab10..0000000
--- a/tests/tasks/test_sub_process.py
+++ /dev/null
@@ -1,115 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""Test Task sub_process."""
-
-
-from unittest.mock import patch
-
-import pytest
-
-from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.tasks.sub_process import SubProcess
-
-TEST_SUB_PROCESS_DEFINITION_NAME = "sub-test-process-definition"
-TEST_SUB_PROCESS_DEFINITION_CODE = "3643589832320"
-TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
-
-
-@pytest.mark.parametrize(
-    "attr, expect",
-    [
-        (
-            {"process_definition_name": TEST_SUB_PROCESS_DEFINITION_NAME},
-            {
-                "processDefinitionCode": TEST_SUB_PROCESS_DEFINITION_CODE,
-                "localParams": [],
-                "resourceList": [],
-                "dependence": {},
-                "waitStartTimeout": {},
-                "conditionResult": {"successNode": [""], "failedNode": [""]},
-            },
-        )
-    ],
-)
-@patch(
-    "pydolphinscheduler.tasks.sub_process.SubProcess.get_process_definition_info",
-    return_value=(
-        {
-            "id": 1,
-            "name": TEST_SUB_PROCESS_DEFINITION_NAME,
-            "code": TEST_SUB_PROCESS_DEFINITION_CODE,
-        }
-    ),
-)
-@patch(
-    "pydolphinscheduler.core.task.Task.gen_code_and_version",
-    return_value=(123, 1),
-)
-def test_property_task_params(mock_code_version, mock_pd_info, attr, expect):
-    """Test task sub process property."""
-    task = SubProcess("test-sub-process-task-params", **attr)
-    assert expect == task.task_params
-
-
-@patch(
-    "pydolphinscheduler.tasks.sub_process.SubProcess.get_process_definition_info",
-    return_value=(
-        {
-            "id": 1,
-            "name": TEST_SUB_PROCESS_DEFINITION_NAME,
-            "code": TEST_SUB_PROCESS_DEFINITION_CODE,
-        }
-    ),
-)
-def test_sub_process_get_define(mock_process_definition):
-    """Test task sub_process function get_define."""
-    code = 123
-    version = 1
-    name = "test_sub_process_get_define"
-    expect = {
-        "code": code,
-        "name": name,
-        "version": 1,
-        "description": None,
-        "delayTime": 0,
-        "taskType": "SUB_PROCESS",
-        "taskParams": {
-            "resourceList": [],
-            "localParams": [],
-            "processDefinitionCode": TEST_SUB_PROCESS_DEFINITION_CODE,
-            "dependence": {},
-            "conditionResult": {"successNode": [""], "failedNode": [""]},
-            "waitStartTimeout": {},
-        },
-        "flag": "YES",
-        "taskPriority": "MEDIUM",
-        "workerGroup": "default",
-        "environmentCode": None,
-        "failRetryTimes": 0,
-        "failRetryInterval": 1,
-        "timeoutFlag": "CLOSE",
-        "timeoutNotifyStrategy": None,
-        "timeout": 0,
-    }
-    with patch(
-        "pydolphinscheduler.core.task.Task.gen_code_and_version",
-        return_value=(code, version),
-    ):
-        with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME):
-            sub_process = SubProcess(name, TEST_SUB_PROCESS_DEFINITION_NAME)
-            assert sub_process.get_define() == expect
diff --git a/tests/tasks/test_sub_workflow.py b/tests/tasks/test_sub_workflow.py
new file mode 100644
index 0000000..fc18e32
--- /dev/null
+++ b/tests/tasks/test_sub_workflow.py
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task sub workflow."""
+import warnings
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.core.workflow import Workflow
+from pydolphinscheduler.tasks.sub_workflow import SubWorkflow
+
+TEST_SUB_WORKFLOW_NAME = "sub-test-workflow"
+TEST_SUB_WORKFLOW_CODE = "3643589832320"
+TEST_WORKFLOW_NAME = "simple-test-workflow"
+
+
+@pytest.mark.parametrize(
+    "attr, expect",
+    [
+        (
+            {"workflow_name": TEST_SUB_WORKFLOW_NAME},
+            {
+                "processDefinitionCode": TEST_SUB_WORKFLOW_CODE,
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "waitStartTimeout": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+            },
+        )
+    ],
+)
+@patch(
+    "pydolphinscheduler.tasks.sub_workflow.SubWorkflow.get_workflow_info",
+    return_value=(
+        {
+            "id": 1,
+            "name": TEST_SUB_WORKFLOW_NAME,
+            "code": TEST_SUB_WORKFLOW_CODE,
+        }
+    ),
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test_property_task_params(mock_code_version, mock_pd_info, attr, expect):
+    """Test task sub workflow property."""
+    task = SubWorkflow("test-sub-workflow-task-params", **attr)
+    assert expect == task.task_params
+
+
+@patch(
+    "pydolphinscheduler.tasks.sub_workflow.SubWorkflow.get_workflow_info",
+    return_value=(
+        {
+            "id": 1,
+            "name": TEST_SUB_WORKFLOW_NAME,
+            "code": TEST_SUB_WORKFLOW_CODE,
+        }
+    ),
+)
+def test_sub_workflow_get_define(mock_workflow_definition):
+    """Test task sub_workflow function get_define."""
+    code = 123
+    version = 1
+    name = "test_sub_workflow_get_define"
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "SUB_PROCESS",
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "processDefinitionCode": TEST_SUB_WORKFLOW_CODE,
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "environmentCode": None,
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        with Workflow(TEST_WORKFLOW_NAME):
+            sub_workflow = SubWorkflow(name, TEST_SUB_WORKFLOW_NAME)
+            assert sub_workflow.get_define() == expect
+
+
+@patch(
+    "pydolphinscheduler.tasks.sub_workflow.SubWorkflow.get_workflow_info",
+    return_value=(
+        {
+            "id": 1,
+            "name": TEST_SUB_WORKFLOW_NAME,
+            "code": TEST_SUB_WORKFLOW_CODE,
+        }
+    ),
+)
+def test_deprecated_sub_workflow_get_define(mock_workflow_definition):
+    """Test deprecated task sub_process still work and raise warning."""
+    code = 123
+    version = 1
+    name = "test_sub_workflow_get_define"
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "SUB_PROCESS",
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "processDefinitionCode": TEST_SUB_WORKFLOW_CODE,
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "environmentCode": None,
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        with warnings.catch_warnings(record=True) as w:
+            from pydolphinscheduler.tasks.sub_process import SubProcess
+
+            assert len(w) == 1
+            assert issubclass(w[-1].category, DeprecationWarning)
+            assert "deprecated" in str(w[-1].message)
+
+            with Workflow(TEST_WORKFLOW_NAME):
+                sub_workflow = SubProcess(name, TEST_SUB_WORKFLOW_NAME)
+                assert sub_workflow.get_define() == expect
diff --git a/tests/tasks/test_switch.py b/tests/tasks/test_switch.py
index 6f9222c..37c3b44 100644
--- a/tests/tasks/test_switch.py
+++ b/tests/tasks/test_switch.py
@@ -22,7 +22,7 @@ from unittest.mock import patch
 
 import pytest
 
-from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.workflow import Workflow
 from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.tasks.switch import (
     Branch,
@@ -266,7 +266,7 @@ def test_switch_get_define(mock_task_code_version):
 )
 def test_switch_set_dep_workflow(mock_task_code_version):
     """Test task switch set dependence in workflow level."""
-    with ProcessDefinition(name="test-switch-set-dep-workflow") as pd:
+    with Workflow(name="test-switch-set-dep-workflow") as pd:
         parent = Task(name="parent", task_type=TEST_TYPE)
         switch_child_1 = Task(name="switch_child_1", task_type=TEST_TYPE)
         switch_child_2 = Task(name="switch_child_2", task_type=TEST_TYPE)
@@ -286,7 +286,7 @@ def test_switch_set_dep_workflow(mock_task_code_version):
         assert parent._downstream_task_codes == {switch.code}
         assert switch._upstream_task_codes == {parent.code}
 
-        # Switch task dep after ProcessDefinition function get_define called
+        # Switch task dep after Workflow function get_define called
         assert switch._downstream_task_codes == {
             switch_child_1.code,
             switch_child_2.code,
diff --git a/tests/test_docs.py b/tests/test_docs.py
index 930e4f7..0021524 100644
--- a/tests/test_docs.py
+++ b/tests/test_docs.py
@@ -22,7 +22,8 @@ import re
 from tests.testing.constants import task_without_example
 from tests.testing.path import get_doc_tasks, get_tasks
 
-ignore_code_file = {"__init__.py"}
+# Task sub_process is deprecated and will be removed in the future, so we set it to ignore
+ignore_code_file = {"__init__.py", "sub_process.py"}
 ignore_doc_file = {"index.rst"}
 
 
diff --git a/tests/testing/constants.py b/tests/testing/constants.py
index 6a4b6e4..291a5a1 100644
--- a/tests/testing/constants.py
+++ b/tests/testing/constants.py
@@ -24,7 +24,7 @@ import os
 task_without_example = {
     "sql",
     "http",
-    "sub_process",
+    "sub_workflow",
     "python",
     "procedure",
 }