You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/17 00:56:49 UTC

[dolphinscheduler] branch 3.1.0-prepare updated: [3.1.0-prepare][cherry-pick][DSIP-11][python] create workflows from YAML configuration (#11988)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.1.0-prepare by this push:
     new 5e42f52bdf [3.1.0-prepare][cherry-pick][DSIP-11][python] create workflows from YAML configuration (#11988)
5e42f52bdf is described below

commit 5e42f52bdfa4219132ae122a84f81990b8a8651c
Author: JieguangZhou <ji...@163.com>
AuthorDate: Sat Sep 17 08:56:37 2022 +0800

    [3.1.0-prepare][cherry-pick][DSIP-11][python] create workflows from YAML configuration (#11988)
    
    * [DSIP-11][python] create workflows from YAML configuration (#11611)
    
    (cherry picked from commit 38ee91fb1edf9fc5a86384cbcade4da7d003687a)
    
    * fix pydolphin yaml doc link (#11817)
    
    (cherry picked from commit c41fa5a8b14706702ded7b790a484a457498586e)
---
 .../docs/source/tasks/condition.rst                |   7 +
 .../pydolphinscheduler/docs/source/tasks/datax.rst |  13 +
 .../docs/source/tasks/dependent.rst                |  14 +
 .../pydolphinscheduler/docs/source/tasks/flink.rst |   7 +
 .../docs/source/tasks/func_wrap.rst                |   2 +-
 .../pydolphinscheduler/docs/source/tasks/http.rst  |   8 +
 .../docs/source/tasks/map_reduce.rst               |   8 +
 .../docs/source/tasks/procedure.rst                |   8 +
 .../docs/source/tasks/python.rst                   |   8 +
 .../pydolphinscheduler/docs/source/tasks/shell.rst |   8 +
 .../pydolphinscheduler/docs/source/tasks/spark.rst |   8 +
 .../pydolphinscheduler/docs/source/tasks/sql.rst   |  14 +
 .../docs/source/tasks/sub_process.rst              |  17 +
 .../docs/source/tasks/switch.rst                   |   9 +
 .../pydolphinscheduler/docs/source/tutorial.rst    |  96 +++++
 .../examples/yaml_define/Condition.yaml            |  43 ++
 .../examples/yaml_define/DataX.yaml                |  33 ++
 .../examples/yaml_define/Dependent.yaml            |  76 ++++
 .../examples/yaml_define/Dependent_External.yaml   |  26 ++
 .../examples/yaml_define/Flink.yaml                |  29 ++
 .../examples/yaml_define/Http.yaml                 |  37 ++
 .../examples/yaml_define/MapReduce.yaml            |  29 ++
 .../examples/yaml_define/MoreConfiguration.yaml    |  40 ++
 .../examples/yaml_define/Procedure.yaml            |  27 ++
 .../examples/yaml_define/Python.yaml               |  30 ++
 .../examples/yaml_define/Shell.yaml                |  40 ++
 .../examples/yaml_define/Spark.yaml                |  30 ++
 .../examples/yaml_define/Sql.yaml                  |  45 ++
 .../examples/yaml_define/SubProcess.yaml           |  27 ++
 .../examples/yaml_define/Switch.yaml               |  39 ++
 .../examples/yaml_define/example_datax.json        |  62 +++
 .../examples/yaml_define/example_sql.sql           |  22 +
 .../examples/yaml_define/example_sub_workflow.yaml |  26 ++
 .../examples/yaml_define/tutorial.yaml             |  46 ++
 .../src/pydolphinscheduler/cli/commands.py         |  14 +
 .../pydolphinscheduler/core/yaml_process_define.py | 466 +++++++++++++++++++++
 .../src/pydolphinscheduler/tasks/__init__.py       |   1 +
 .../tests/core/test_yaml_process_define.py         | 191 +++++++++
 .../pydolphinscheduler/tests/testing/path.py       |   1 +
 39 files changed, 1606 insertions(+), 1 deletion(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst
index 20b0350078..f6d7e6ad8f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst
@@ -31,3 +31,10 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.condition
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Condition.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst
index c07726941e..cb67a2fa9e 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst
@@ -31,3 +31,16 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.datax
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/DataX.yaml
+   :start-after: # under the License.
+   :language: yaml
+
+
+example_datax.json:
+
+.. literalinclude:: ../../../examples/yaml_define/example_datax.json
+   :language: json
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst
index fe26d0f30a..d8e1599b2d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst
@@ -31,3 +31,17 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.dependent
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Dependent.yaml
+   :start-after: # under the License.
+   :language: yaml
+
+Dependent_External.yaml:
+
+.. literalinclude:: ../../../examples/yaml_define/Dependent_External.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst
index 8db9ac266d..76eb484718 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst
@@ -31,3 +31,10 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.flink
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Flink.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst
index 5f41b80cfd..a4a2972933 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst
@@ -30,4 +30,4 @@ Example
 Dive Into
 ---------
 
-.. automodule:: pydolphinscheduler.tasks.func_wrap
\ No newline at end of file
+.. automodule:: pydolphinscheduler.tasks.func_wrap
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst
index 4c6d8f8e40..4e138c9989 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst
@@ -19,3 +19,11 @@ HTTP
 ====
 
 .. automodule:: pydolphinscheduler.tasks.http
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Http.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst
index 068b8d8b41..7356880b26 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst
@@ -32,3 +32,11 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.map_reduce
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/MapReduce.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst
index cd79eff140..2f28efc526 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst
@@ -19,3 +19,11 @@ Procedure
 =========
 
 .. automodule:: pydolphinscheduler.tasks.procedure
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Procedure.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst
index 660e46a6e5..1bf6210018 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst
@@ -19,3 +19,11 @@ Python
 ======
 
 .. automodule:: pydolphinscheduler.tasks.python
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Python.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst
index 5ce16c3c9f..2dd106a3b8 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst
@@ -31,3 +31,11 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.shell
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Shell.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst
index cdb5902c37..d5a51db91a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst
@@ -31,3 +31,11 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.spark
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Spark.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst
index 21eaec7ae9..52df042b74 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst
@@ -19,3 +19,17 @@ SQL
 ===
 
 .. automodule:: pydolphinscheduler.tasks.sql
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Sql.yaml
+   :start-after: # under the License.
+   :language: yaml
+
+example_sql.sql:
+
+.. literalinclude:: ../../../examples/yaml_define/example_sql.sql
+   :start-after: */
+   :language: sql
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst
index 8a9f562200..894dd0fbad 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst
@@ -19,3 +19,20 @@ Sub Process
 ===========
 
 .. automodule:: pydolphinscheduler.tasks.sub_process
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/SubProcess.yaml
+   :start-after: # under the License.
+   :language: yaml
+
+
+
+example_subprocess.yaml:
+
+.. literalinclude:: ../../../examples/yaml_define/example_sub_workflow.yaml
+   :start-after: # under the License.
+   :language: yaml
+
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst
index d8b34a4ac9..2fef589efb 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst
@@ -31,3 +31,12 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.switch
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Switch.yaml
+   :start-after: # under the License.
+   :language: yaml
+
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
index 6366c803bb..57d21b2d29 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
@@ -37,6 +37,8 @@ There are two types of tutorials: traditional and task decorator.
   versatility to the traditional way because it only supported Python functions and without build-in tasks
   supported. But it is helpful if your workflow is all built with Python or if you already have some Python
   workflow code and want to migrate them to pydolphinscheduler.
+- **YAML File**: We can use pydolphinscheduler CLI to create process using YAML file: :code:`pydolphinscheduler yaml -f tutorial.yaml`. 
+  We can find more YAML file examples in `examples/yaml_define <https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define>`_
 
 .. tab:: Tradition
 
@@ -52,6 +54,12 @@ There are two types of tutorials: traditional and task decorator.
       :start-after: [start tutorial]
       :end-before: [end tutorial]
 
+.. tab:: YAML File
+
+   .. literalinclude:: ../../examples/yaml_define/tutorial.yaml
+      :start-after: # under the License.
+      :language: yaml
+
 Import Necessary Module
 -----------------------
 
@@ -104,6 +112,13 @@ will be running this task in the DolphinScheduler worker. See :ref:`section tena
       :start-after: [start workflow_declare]
       :end-before: [end workflow_declare]
 
+.. tab:: YAML File
+
+   .. literalinclude:: ../../examples/yaml_define/tutorial.yaml
+      :start-after: # under the License.
+      :end-before: # Define the tasks under the workflow
+      :language: yaml
+
 We could find more detail about :code:`ProcessDefinition` in :ref:`concept about process definition <concept:process definition>`
 if you are interested in it. For all arguments of object process definition, you could find in the
 :class:`pydolphinscheduler.core.process_definition` API documentation.
@@ -139,6 +154,12 @@ Task Declaration
    It makes our workflow more Pythonic, but be careful that when we use task decorator mode mean we only use
    Python function as a task and could not use the :doc:`built-in tasks <tasks/index>` most of the cases.
 
+.. tab:: YAML File
+
+   .. literalinclude:: ../../examples/yaml_define/tutorial.yaml
+      :start-after: # Define the tasks under the workflow 
+      :language: yaml
+
 Setting Task Dependence
 -----------------------
 
@@ -167,6 +188,14 @@ and task `task_child_two` was done, because both two task is `task_union`'s upst
       :start-after: [start task_relation_declare]
       :end-before: [end task_relation_declare]
 
+.. tab:: YAML File
+
+   We can use :code:`deps:[]` to set task dependence
+
+   .. literalinclude:: ../../examples/yaml_define/tutorial.yaml
+      :start-after: # Define the tasks under the workflow 
+      :language: yaml
+
 .. note::
 
    We could set task dependence in batch mode if they have the same downstream or upstream by declaring those
@@ -198,6 +227,17 @@ will create workflow definition as well as workflow schedule.
       :start-after: [start submit_or_run]
       :end-before: [end submit_or_run]
 
+.. tab:: YAML File
+
+   pydolphinscheduler YAML CLI always submit workflow. We can run the workflow if we set :code:`run: true`
+
+   .. code-block:: yaml
+
+     # Define the workflow
+     workflow:
+       name: "tutorial"
+       run: true
+
 At last, we could execute this workflow code in your terminal like other Python scripts, running
 :code:`python tutorial.py` to trigger and execute it.
 
@@ -219,5 +259,61 @@ named "tutorial" or "tutorial_decorator". The task graph of workflow like below:
    :language: text
    :lines: 24-28
 
+Create Process Using YAML File
+------------------------------
+
+We can use pydolphinscheduler CLI to create process using YAML file
+
+.. code-block:: bash
+
+   pydolphinscheduler yaml -f Shell.yaml
+
+We can use the following four special grammars to define workflows more flexibly.
+
+- :code:`$FILE{"file_name"}`: Read the file (:code:`file_name`) contents and replace them to that location.
+- :code:`$WORKFLOW{"other_workflow.yaml"}`: Refer to another process defined using YAML file (:code:`other_workflow.yaml`) and replace the process name in this location.
+- :code:`$ENV{env_name}`: Read the environment variable (:code:`env_name`) and replace it to that location.
+- :code:`${CONFIG.key_name}`: Read the configuration value of key (:code:`key_name`) and it them to that location.
+
+
+In addition, when loading the file path use :code:`$FILE{"file_name"}` or :code:`$WORKFLOW{"other_workflow.yaml"}`, pydolphinscheduler will search in the path of the YAMl file if the file does not exist.
+
+For exmaples, our file directory structure is as follows:
+
+.. code-block:: bash
+
+   .
+   └── yaml_define
+       ├── Condition.yaml
+       ├── DataX.yaml
+       ├── Dependent_External.yaml
+       ├── Dependent.yaml
+       ├── example_datax.json
+       ├── example_sql.sql
+       ├── example_subprocess.yaml
+       ├── Flink.yaml
+       ├── Http.yaml
+       ├── MapReduce.yaml
+       ├── MoreConfiguration.yaml
+       ├── Procedure.yaml
+       ├── Python.yaml
+       ├── Shell.yaml
+       ├── Spark.yaml
+       ├── Sql.yaml
+       ├── SubProcess.yaml
+       └── Switch.yaml
+
+After we run
+
+.. code-block:: bash
+
+   pydolphinscheduler yaml -file yaml_define/SubProcess.yaml
+
+
+the :code:`$WORKFLOW{"example_sub_workflow.yaml"}` will be set to :code:`$WORKFLOW{"yaml_define/example_sub_workflow.yaml"}`, because :code:`./example_subprocess.yaml` does not exist and :code:`yaml_define/example_sub_workflow.yaml` does.
+
+Furthermore, this feature supports recursion all the way down.
+
+
 .. _`DolphinScheduler project page`: https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/project.html
 .. _`Python context manager`: https://docs.python.org/3/library/stdtypes.html#context-manager-types
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Condition.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Condition.yaml
new file mode 100644
index 0000000000..c65b8c7aeb
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Condition.yaml
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "Condition"
+
+# Define the tasks under the workflow
+tasks:
+  - { "task_type": "Shell", "name": "pre_task_1", "command": "echo pre_task_1" }
+  - { "task_type": "Shell", "name": "pre_task_2", "command": "echo pre_task_2" }
+  - { "task_type": "Shell", "name": "pre_task_3", "command": "echo pre_task_3" }
+  - { "task_type": "Shell", "name": "success_branch", "command": "echo success_branch" }
+  - { "task_type": "Shell", "name": "fail_branch", "command": "echo fail_branch" }
+
+  - name: condition
+    task_type: Condition
+    success_task: success_branch
+    failed_task: fail_branch
+    op: AND
+    groups:
+      - op: AND
+        groups:
+          - task: pre_task_1
+            flag: true
+          - task: pre_task_2
+            flag: true
+          - task: pre_task_3
+            flag: false
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/DataX.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/DataX.yaml
new file mode 100644
index 0000000000..00ecd54685
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/DataX.yaml
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "DataX"
+
+# Define the tasks under the workflow
+tasks:
+  - name: task
+    task_type: DataX
+    datasource_name: db
+    datatarget_name: db
+    sql: show tables;
+    target_table: table_test
+
+  - name: task_custon_config
+    task_type: CustomDataX
+    json: $FILE{"example_datax.json"}
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent.yaml
new file mode 100644
index 0000000000..d69fac05da
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent.yaml
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+workflow:
+  name: "Dependent"
+
+# Define the tasks under the workflow
+tasks:
+  - name: dependent
+    task_type: Dependent
+    denpendence:
+    op: and
+    groups:
+      - op: or
+        groups:
+          - project_name: pydolphin
+            process_definition_name: task_dependent_external
+            dependent_task_name: task_1
+
+          - project_name: pydolphin
+            process_definition_name: task_dependent_external
+            dependent_task_name: task_2
+
+      - op: and
+        groups:
+          - project_name: pydolphin
+            process_definition_name: task_dependent_external
+            dependent_task_name: task_1
+            dependent_date: LAST_WEDNESDAY 
+
+          - project_name: pydolphin
+            process_definition_name: task_dependent_external
+            dependent_task_name: task_2
+            dependent_date: last24Hours 
+
+  - name: dependent_var
+    task_type: Dependent
+    denpendence:
+    op: and
+    groups:
+      - op: or
+        # we can use ${CONFIG.WORKFLOW_PROJECT} to set the value to configuration.WORKFLOW_PROJECT
+        # we can use $WORKFLOW{"Dependent_External.yaml"} to create or update a workflow from dependent_external.yaml and set the value to that workflow name
+        groups:
+          - project_name: ${CONFIG.WORKFLOW_PROJECT} 
+            process_definition_name: $WORKFLOW{"Dependent_External.yaml"} 
+            dependent_task_name: task_1
+
+          - project_name: ${CONFIG.WORKFLOW_PROJECT} 
+            process_definition_name: $WORKFLOW{"Dependent_External.yaml"} 
+            dependent_task_name: task_2
+      - op: and
+        groups:
+          - project_name: ${CONFIG.WORKFLOW_PROJECT} 
+            process_definition_name: $WORKFLOW{"Dependent_External.yaml"} 
+            dependent_task_name: task_1
+            dependent_date: LAST_WEDNESDAY 
+
+          - project_name: ${CONFIG.WORKFLOW_PROJECT} 
+            process_definition_name: $WORKFLOW{"Dependent_External.yaml"} 
+            dependent_task_name: task_2
+            dependent_date: last24Hours 
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent_External.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent_External.yaml
new file mode 100644
index 0000000000..577ff6a807
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent_External.yaml
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "task_dependent_external"
+
+# Define the tasks under the workflow
+tasks:
+  - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
+  - { "task_type": "Shell", "name": "task_2", "command": "echo task 2" }
+  - { "task_type": "Shell", "name": "task_3", "command": "echo task 3" }
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Flink.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Flink.yaml
new file mode 100644
index 0000000000..2449d435a3
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Flink.yaml
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "Flink"
+
+# Define the tasks under the workflow
+tasks:
+  - name: task
+    task_type: Flink
+    main_class: org.apache.flink.streaming.examples.wordcount.WordCount
+    main_package: test_java.jar
+    program_type: JAVA
+    deploy_mode: local
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Http.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Http.yaml
new file mode 100644
index 0000000000..1483aeb3d8
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Http.yaml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "Http"
+
+# Define the tasks under the workflow
+tasks:
+  - name: task
+    task_type: Http
+    url: "https://httpbin.org/get"
+    http_method: "GET"
+    http_params:
+      - { "prop": "a", "httpParametersType": "PARAMETER", "value": "1" }
+      - { "prop": "b", "httpParametersType": "PARAMETER", "value": "2" }
+      - {
+          "prop": "Content-Type",
+          "httpParametersType": "header",
+          "value": "test",
+        }
+    http_check_condition: "STATUS_CODE_CUSTOM"
+    condition: "404"
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MapReduce.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MapReduce.yaml
new file mode 100644
index 0000000000..e1a2b5709c
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MapReduce.yaml
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "MapReduce"
+
+# Define the tasks under the workflow
+tasks:
+  - name: task
+    task_type: MR
+    main_class: wordcount
+    main_package: test_java.jar
+    program_type: SCALA
+    main_args: /dolphinscheduler/tenant_exists/resources/file.txt /output/ds
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MoreConfiguration.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MoreConfiguration.yaml
new file mode 100644
index 0000000000..258aa33433
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MoreConfiguration.yaml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "MoreConfiguration"
+  param:
+    n: 1
+
+# Define the tasks under the workflow
+tasks:
+  - name: shell_0
+    task_type: Shell
+    description: "yaml define task"
+    flag: "YES"
+    command: |
+      echo "$ENV{HOME}"
+      echo "${n}"
+    task_priority: "HIGH"
+    delay_time: 20
+    fail_retry_times: 30
+    fail_retry_interval: 5
+    timeout_flag: "CLOSE"
+    timeout: 60
+    local_params:
+      - { "prop": "n", "direct": "IN", "type": "VARCHAR", "value": "${n}" }
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Procedure.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Procedure.yaml
new file mode 100644
index 0000000000..829a961c1a
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Procedure.yaml
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "Procedure"
+
+# Define the tasks under the workflow
+tasks:
+  - name: task
+    task_type: Procedure
+    datasource_name: db
+    method: show tables;
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Python.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Python.yaml
new file mode 100644
index 0000000000..728b5c928e
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Python.yaml
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "Python"
+
+# Define the tasks under the workflow
+tasks:
+  - name: python
+    task_type: Python
+    definition: |
+      import os
+      print(os)
+      print("1")
+      print("2")
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Shell.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Shell.yaml
new file mode 100644
index 0000000000..fdbe126327
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Shell.yaml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "Shell"
+  release_state: "offline"
+  run: true
+
+# Define the tasks under the process
+tasks:
+  - name: task_parent
+    task_type: Shell
+    command: |
+      echo hello pydolphinscheduler 
+      echo run task parent
+
+  - name: task_child_one
+    task_type: Shell
+    deps: [task_parent]
+    command: echo "child one"
+
+  - name: task_child_two
+    task_type: Shell
+    deps: [task_parent]
+    command: echo "child two"
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Spark.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Spark.yaml
new file mode 100644
index 0000000000..6132b8d749
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Spark.yaml
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "Spark"
+
+# Define the tasks under the workflow
+tasks:
+  - name: task
+    task_type: Spark
+    main_class: org.apache.spark.examples.SparkPi
+    main_package: test_java.jar
+    program_type: SCALA
+    deploy_mode: local
+    spark_version: SPARK1
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Sql.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Sql.yaml
new file mode 100644
index 0000000000..c3c7e88ee1
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Sql.yaml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "Sql"
+
+# Define the tasks under the workflow
+tasks:
+  - name: task_base
+    task_type: Sql
+    datasource_name: "db"
+    sql: show tables;
+
+  - name: task_multi_line
+    task_type: Sql
+    datasource_name: "db"
+    sql: |
+      show tables;
+      select id from version where id=1;
+
+  - name: task_file
+    task_type: Sql
+    datasource_name: "db"
+    sql: $FILE{"example_sql.sql"}
+
+  # Or you can define task "task_union" it with one line
+  - { "task_type": "Sql", "name": "task_base_one_line", "datasource_name": "db", "sql": "select id from version where id=1;"}
+
+  # Or you can define task "task_union" it with one line
+  - { "task_type": "Sql", "name": "task_file_one_line", "datasource_name": "db", "sql": '$FILE{"example_sql.sql"}'}
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/SubProcess.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/SubProcess.yaml
new file mode 100644
index 0000000000..0ea7549db4
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/SubProcess.yaml
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "SubWorkflow"
+
+tasks:
+  - name: example_workflow
+    task_type: SubProcess
+    process_definition_name: $WORKFLOW{"example_sub_workflow.yaml"}
+
+  - { "task_type": "Shell", "deps": [example_workflow], "name": "task_3", "command": "echo task 3" }
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Switch.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Switch.yaml
new file mode 100644
index 0000000000..33ed68813e
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Switch.yaml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "Switch"
+  param:
+    var: 1
+
+# Define the tasks under the workflow
+tasks:
+  - name: switch_child_1
+    task_type: Shell
+    command: echo switch_child_1
+
+  - name: switch_child_2
+    task_type: Shell
+    command: echo switch_child_2
+
+  - name: switch
+    task_type: Switch
+    condition:
+      - task: switch_child_1
+        condition: "${var} > 1"
+      - task: switch_child_2
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_datax.json b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_datax.json
new file mode 100644
index 0000000000..3db8092cb6
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_datax.json
@@ -0,0 +1,62 @@
+{
+  "job": {
+    "content": [
+      {
+        "reader": {
+          "name": "mysqlreader",
+          "parameter": {
+            "username": "usr",
+            "password": "pwd",
+            "column": [
+              "id",
+              "name",
+              "code",
+              "description"
+            ],
+            "splitPk": "id",
+            "connection": [
+              {
+                "table": [
+                  "source_table"
+                ],
+                "jdbcUrl": [
+                  "jdbc:mysql://127.0.0.1:3306/source_db"
+                ]
+              }
+            ]
+          }
+        },
+        "writer": {
+          "name": "mysqlwriter",
+          "parameter": {
+            "writeMode": "insert",
+            "username": "usr",
+            "password": "pwd",
+            "column": [
+              "id",
+              "name"
+            ],
+            "connection": [
+              {
+                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db",
+                "table": [
+                  "target_table"
+                ]
+              }
+            ]
+          }
+        }
+      }
+    ],
+    "setting": {
+      "errorLimit": {
+        "percentage": 0,
+        "record": 0
+      },
+      "speed": {
+        "channel": 1,
+        "record": 1000
+      }
+    }
+  }
+}
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sql.sql b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sql.sql
new file mode 100644
index 0000000000..06b5c4c16c
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sql.sql
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+select id from version where id=1;
+select id from version where id=2;
+select id from version where id=3;
+select id from version where id=4;
+select id from version where id=5;
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sub_workflow.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sub_workflow.yaml
new file mode 100644
index 0000000000..af3a863da9
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sub_workflow.yaml
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "example_workflow_for_sub_workflow"
+
+# Define the tasks under the workflow
+tasks:
+  - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
+  - { "task_type": "Shell", "deps": [task_1], "name": "task_2", "command": "echo task 2" }
+  - { "task_type": "Shell", "deps": [task_2], "name": "task_3", "command": "echo task 3" }
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/tutorial.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/tutorial.yaml
new file mode 100644
index 0000000000..104a8c367b
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/tutorial.yaml
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "tutorial"
+  schedule: "0 0 0 * * ? *"
+  start_time: "2021-01-01"
+  tenant: "tenant_exists"
+  release_state: "offline"
+  run: true
+
+# Define the tasks under the workflow
+tasks:
+  - name: task_parent
+    task_type: Shell
+    command: echo hello pydolphinscheduler
+
+  - name: task_child_one
+    task_type: Shell
+    deps: [task_parent]
+    command: echo "child one"
+
+  - name: task_child_two
+    task_type: Shell
+    deps: [task_parent]
+    command: echo "child two"
+
+  - name: task_union
+    task_type: Shell
+    deps: [task_child_one, task_child_two]
+    command: echo "union"
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py
index d78e503dbb..8d923f1406 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py
@@ -26,6 +26,7 @@ from pydolphinscheduler.configuration import (
     init_config_file,
     set_single_config,
 )
+from pydolphinscheduler.core.yaml_process_define import create_process_definition
 
 version_option_val = ["major", "minor", "micro"]
 
@@ -90,3 +91,16 @@ def config(getter, setter, init) -> None:
         for key, val in setter:
             set_single_config(key, val)
         click.echo("Set configuration done.")
+
+
+@cli.command()
+@click.option(
+    "--yaml_file",
+    "-f",
+    required=True,
+    help="YAML file path",
+    type=click.Path(exists=True),
+)
+def yaml(yaml_file) -> None:
+    """Create process definition using YAML file."""
+    create_process_definition(yaml_file)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/yaml_process_define.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/yaml_process_define.py
new file mode 100644
index 0000000000..0944925a48
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/yaml_process_define.py
@@ -0,0 +1,466 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Parse YAML file to create process."""
+
+import logging
+import os
+import re
+from pathlib import Path
+from typing import Any, Dict
+
+from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
+from pydolphinscheduler.utils.yaml_parser import YamlParser
+
+logger = logging.getLogger(__file__)
+
+KEY_PROCESS = "workflow"
+KEY_TASK = "tasks"
+KEY_TASK_TYPE = "task_type"
+KEY_DEPS = "deps"
+KEY_OP = "op"
+
+TASK_SPECIAL_KEYS = [KEY_TASK_TYPE, KEY_DEPS]
+
+
+class ParseTool:
+    """Enhanced parsing tools."""
+
+    @staticmethod
+    def parse_string_param_if_file(string_param: str, **kwargs):
+        """Use $FILE{"data_path"} to load file from "data_path"."""
+        if string_param.startswith("$FILE"):
+            path = re.findall(r"\$FILE\{\"(.*?)\"\}", string_param)[0]
+            base_folder = kwargs.get("base_folder", ".")
+            path = ParseTool.get_possible_path(path, base_folder)
+            with open(path, "r") as read_file:
+                string_param = "".join(read_file)
+        return string_param
+
+    @staticmethod
+    def parse_string_param_if_env(string_param: str, **kwargs):
+        """Use $ENV{env_name} to load environment variable "env_name"."""
+        if "$ENV" in string_param:
+            key = re.findall(r"\$ENV\{(.*?)\}", string_param)[0]
+            env_value = os.environ.get(key, "$%s" % key)
+            string_param = string_param.replace("$ENV{%s}" % key, env_value)
+        return string_param
+
+    @staticmethod
+    def parse_string_param_if_config(string_param: str, **kwargs):
+        """Use ${CONFIG.var_name} to load variable "var_name" from configuration."""
+        if "${CONFIG" in string_param:
+            key = re.findall(r"\$\{CONFIG\.(.*?)\}", string_param)[0]
+            if hasattr(configuration, key):
+                string_param = getattr(configuration, key)
+            else:
+                string_param = configuration.get_single_config(key)
+
+        return string_param
+
+    @staticmethod
+    def get_possible_path(file_path, base_folder):
+        """Get file possible path.
+
+        Return new path if file_path is not exists, but base_folder + file_path exists
+        """
+        possible_path = file_path
+        if not Path(file_path).exists():
+            new_path = Path(base_folder).joinpath(file_path)
+            if new_path.exists():
+                possible_path = new_path
+                logger.info(f"{file_path} not exists, convert to {possible_path}")
+
+        return possible_path
+
+
+def get_task_cls(task_type) -> Task:
+    """Get the task class object by task_type (case compatible)."""
+    # only get task class from tasks.__all__
+    all_task_types = {type_.capitalize(): type_ for type_ in tasks.__all__}
+    task_type_cap = task_type.capitalize()
+    if task_type_cap not in all_task_types:
+        raise PyDSTaskNoFoundException("cant not find task %s" % task_type)
+
+    standard_name = all_task_types[task_type_cap]
+    return getattr(tasks, standard_name)
+
+
+class YamlProcess(YamlParser):
+    """Yaml parser for create process.
+
+    :param yaml_file: yaml file path.
+
+        examples1 ::
+
+            parser = YamlParser(yaml_file=...)
+            parser.create_process_definition()
+
+        examples2 ::
+
+            YamlParser(yaml_file=...).create_process_definition()
+
+    """
+
+    _parse_rules = [
+        ParseTool.parse_string_param_if_file,
+        ParseTool.parse_string_param_if_env,
+        ParseTool.parse_string_param_if_config,
+    ]
+
+    def __init__(self, yaml_file: str):
+        with open(yaml_file, "r") as f:
+            content = f.read()
+
+        self._base_folder = Path(yaml_file).parent
+        content = self.prepare_refer_process(content)
+        super().__init__(content)
+
+    def create_process_definition(self):
+        """Create process main function."""
+        # get process parameters with key "workflow"
+        process_params = self[KEY_PROCESS]
+
+        # pop "run" parameter, used at the end
+        is_run = process_params.pop("run", False)
+
+        # use YamlProcess._parse_rules to parse special value of yaml file
+        process_params = self.parse_params(process_params)
+
+        process_name = process_params["name"]
+        logger.info(f"Create Process: {process_name}")
+        with ProcessDefinition(**process_params) as pd:
+
+            # save dependencies between tasks
+            dependencies = {}
+
+            # save name and task mapping
+            name2task = {}
+
+            # get task datas with key "tasks"
+            for task_data in self[KEY_TASK]:
+                task = self.parse_task(task_data, name2task)
+
+                deps = task_data.get(KEY_DEPS, [])
+                if deps:
+                    dependencies[task.name] = deps
+                name2task[task.name] = task
+
+            # build dependencies between task
+            for downstream_task_name, deps in dependencies.items():
+                downstream_task = name2task[downstream_task_name]
+                for upstream_task_name in deps:
+                    upstream_task = name2task[upstream_task_name]
+                    upstream_task >> downstream_task
+
+            pd.submit()
+            # if set is_run, run the process after submit
+            if is_run:
+                logger.info(f"run workflow: {pd}")
+                pd.run()
+
+        return process_name
+
+    def parse_params(self, params: Any):
+        """Recursively resolves the parameter values.
+
+        The function operates params only when it encounters a string; other types continue recursively.
+        """
+        if isinstance(params, str):
+            for parse_rule in self._parse_rules:
+                params_ = params
+                params = parse_rule(params, base_folder=self._base_folder)
+                if params_ != params:
+                    logger.info(f"parse {params_} -> {params}")
+
+        elif isinstance(params, list):
+            for index in range(len(params)):
+                params[index] = self.parse_params(params[index])
+
+        elif isinstance(params, dict):
+            for key, value in params.items():
+                params[key] = self.parse_params(value)
+
+        return params
+
+    @classmethod
+    def parse(cls, yaml_file: str):
+        """Recursively resolves the parameter values.
+
+        The function operates params only when it encounters a string; other types continue recursively.
+        """
+        process_name = cls(yaml_file).create_process_definition()
+        return process_name
+
+    def prepare_refer_process(self, content):
+        """Allow YAML files to reference process derived from other YAML files."""
+        process_paths = re.findall(r"\$WORKFLOW\{\"(.*?)\"\}", content)
+        for process_path in process_paths:
+            logger.info(
+                f"find special token {process_path}, load process form {process_path}"
+            )
+            possible_path = ParseTool.get_possible_path(process_path, self._base_folder)
+            process_name = YamlProcess.parse(possible_path)
+            content = content.replace('$WORKFLOW{"%s"}' % process_path, process_name)
+
+        return content
+
+    def parse_task(self, task_data: dict, name2task: Dict[str, Task]):
+        """Parse various types of tasks.
+
+        :param task_data: dict.
+                {
+                    "task_type": "Shell",
+                    "params": {"name": "shell_task", "command":"ehco hellp"}
+                }
+
+        :param name2task: Dict[str, Task]), mapping of task_name and task
+
+
+        Some task type have special parse func:
+            if task type is Switch, use parse_switch;
+            if task type is Condition, use parse_condition;
+            if task type is Dependent, use parse_dependent;
+            other, we pass all task_params as input to task class, like "task_cls(**task_params)".
+        """
+        task_type = task_data["task_type"]
+        # get params without special key
+        task_params = {k: v for k, v in task_data.items() if k not in TASK_SPECIAL_KEYS}
+
+        task_cls = get_task_cls(task_type)
+
+        # use YamlProcess._parse_rules to parse special value of yaml file
+        task_params = self.parse_params(task_params)
+
+        if task_cls == tasks.Switch:
+            task = self.parse_switch(task_params, name2task)
+
+        elif task_cls == tasks.Condition:
+            task = self.parse_condition(task_params, name2task)
+
+        elif task_cls == tasks.Dependent:
+            task = self.parse_dependent(task_params, name2task)
+
+        else:
+            task = task_cls(**task_params)
+        logger.info(task_type, task)
+        return task
+
+    def parse_switch(self, task_params, name2task):
+        """Parse Switch Task.
+
+        This is an example Yaml fragment of task_params
+
+        name: switch
+        condition:
+          - ["${var} > 1", switch_child_1]
+          - switch_child_2
+        """
+        from pydolphinscheduler.tasks.switch import (
+            Branch,
+            Default,
+            Switch,
+            SwitchCondition,
+        )
+
+        condition_datas = task_params["condition"]
+        conditions = []
+        for condition_data in condition_datas:
+            assert "task" in condition_data, "task must be in %s" % condition_data
+            task_name = condition_data["task"]
+            condition_string = condition_data.get("condition", None)
+
+            # if condition_string is None, for example: {"task": "switch_child_2"}, set it to Default branch
+            if condition_string is None:
+                conditions.append(Default(task=name2task.get(task_name)))
+
+            # if condition_string is not None, for example:
+            # {"task": "switch_child_2", "condition": "${var} > 1"} set it to Branch
+            else:
+                conditions.append(
+                    Branch(condition_string, task=name2task.get(task_name))
+                )
+
+        switch = Switch(
+            name=task_params["name"], condition=SwitchCondition(*conditions)
+        )
+        return switch
+
+    def parse_condition(self, task_params, name2task):
+        """Parse Condition Task.
+
+        This is an example Yaml fragment of task_params
+
+        name: condition
+        success_task: success_branch
+        failed_task: fail_branch
+        OP: AND
+        groups:
+          -
+            OP: AND
+            groups:
+              - [pre_task_1, true]
+              - [pre_task_2, true]
+              - [pre_task_3, false]
+          -
+            OP: AND
+            groups:
+              - [pre_task_1, false]
+              - [pre_task_2, true]
+              - [pre_task_3, true]
+
+        """
+        from pydolphinscheduler.tasks.condition import (
+            FAILURE,
+            SUCCESS,
+            And,
+            Condition,
+            Or,
+        )
+
+        def get_op_cls(op):
+            cls = None
+            if op.lower() == "and":
+                cls = And
+            elif op.lower() == "or":
+                cls = Or
+            else:
+                raise Exception("OP must be in And or Or, but get: %s" % op)
+            return cls
+
+        second_cond_ops = []
+        for first_group in task_params["groups"]:
+            second_op = first_group["op"]
+            task_ops = []
+            for condition_data in first_group["groups"]:
+                assert "task" in condition_data, "task must be in %s" % condition_data
+                assert "flag" in condition_data, "flag must be in %s" % condition_data
+                task_name = condition_data["task"]
+                flag = condition_data["flag"]
+                task = name2task[task_name]
+
+                # for example: task = pre_task_1, flag = true
+                if flag:
+                    task_ops.append(SUCCESS(task))
+                else:
+                    task_ops.append(FAILURE(task))
+
+            second_cond_ops.append(get_op_cls(second_op)(*task_ops))
+
+        first_op = task_params["op"]
+        cond_operator = get_op_cls(first_op)(*second_cond_ops)
+
+        condition = Condition(
+            name=task_params["name"],
+            condition=cond_operator,
+            success_task=name2task[task_params["success_task"]],
+            failed_task=name2task[task_params["failed_task"]],
+        )
+        return condition
+
+    def parse_dependent(self, task_params, name2task):
+        """Parse Dependent Task.
+
+        This is an example Yaml fragment of task_params
+
+        name: dependent
+        denpendence:
+        OP: AND
+        groups:
+          -
+            OP: Or
+            groups:
+              - [pydolphin, task_dependent_external, task_1]
+              - [pydolphin, task_dependent_external, task_2]
+          -
+            OP: And
+            groups:
+              - [pydolphin, task_dependent_external, task_1, LAST_WEDNESDAY]
+              - [pydolphin, task_dependent_external, task_2, last24Hours]
+
+        """
+        from pydolphinscheduler.tasks.dependent import (
+            And,
+            Dependent,
+            DependentDate,
+            DependentItem,
+            Or,
+        )
+
+        def process_dependent_date(dependent_date):
+            """Parse dependent date (Compatible with key and value of DependentDate)."""
+            dependent_date_upper = dependent_date.upper()
+            if hasattr(DependentDate, dependent_date_upper):
+                dependent_date = getattr(DependentDate, dependent_date_upper)
+            return dependent_date
+
+        def get_op_cls(op):
+            cls = None
+            if op.lower() == "and":
+                cls = And
+            elif op.lower() == "or":
+                cls = Or
+            else:
+                raise Exception("OP must be in And or Or, but get: %s" % op)
+            return cls
+
+        def create_dependent_item(source_items):
+            """Parse dependent item.
+
+            project_name: pydolphin
+            process_definition_name: task_dependent_external
+            dependent_task_name: task_1
+            dependent_date: LAST_WEDNESDAY
+            """
+            project_name = source_items["project_name"]
+            process_definition_name = source_items["process_definition_name"]
+            dependent_task_name = source_items["dependent_task_name"]
+            dependent_date = source_items.get("dependent_date", DependentDate.TODAY)
+            dependent_item = DependentItem(
+                project_name=project_name,
+                process_definition_name=process_definition_name,
+                dependent_task_name=dependent_task_name,
+                dependent_date=process_dependent_date(dependent_date),
+            )
+
+            return dependent_item
+
+        second_dependences = []
+        for first_group in task_params["groups"]:
+            second_op = first_group[KEY_OP]
+            dependence_items = []
+            for source_items in first_group["groups"]:
+                dependence_items.append(create_dependent_item(source_items))
+
+            second_dependences.append(get_op_cls(second_op)(*dependence_items))
+
+        first_op = task_params[KEY_OP]
+        dependence = get_op_cls(first_op)(*second_dependences)
+
+        task = Dependent(
+            name=task_params["name"],
+            dependence=dependence,
+        )
+        return task
+
+
+def create_process_definition(yaml_file):
+    """CLI."""
+    YamlProcess.parse(yaml_file)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
index d49a1d394c..53b462ca90 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
@@ -35,6 +35,7 @@ from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondi
 __all__ = [
     "Condition",
     "DataX",
+    "CustomDataX",
     "Dependent",
     "Flink",
     "Http",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_yaml_process_define.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_yaml_process_define.py
new file mode 100644
index 0000000000..99ad179a5f
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_yaml_process_define.py
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test YAML process."""
+
+import os
+from pathlib import Path
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.yaml_process_define import (
+    ParseTool,
+    create_process_definition,
+    get_task_cls,
+)
+from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
+from tests.testing.path import path_yaml_example
+from tests.testing.task import Task
+
+
+@pytest.mark.parametrize(
+    "string_param, expect",
+    [
+        ("$ENV{PROJECT_NAME}", "~/pydolphinscheduler"),
+    ],
+)
+def test_parse_tool_env_exist(string_param, expect):
+    """Test parsing the environment variable."""
+    os.environ["PROJECT_NAME"] = expect
+    assert expect == ParseTool.parse_string_param_if_env(string_param)
+
+
+def test_parse_tool_env_not_exist():
+    """Test parsing the not exist environment variable."""
+    key = "THIS_ENV_NOT_EXIST_0000000"
+    string_param = "$ENV{%s}" % key
+    expect = "$" + key
+    assert expect == ParseTool.parse_string_param_if_env(string_param)
+
+
+@pytest.mark.parametrize(
+    "string_param, expect_key",
+    [
+        ("${CONFIG.java_gateway.address}", "java_gateway.address"),
+        ("${CONFIG.WORKFLOW_PROJECT}", "default.workflow.project"),
+    ],
+)
+def test_parse_tool_config(string_param, expect_key):
+    """Test parsing configuration."""
+    expect = configuration.get_single_config(expect_key)
+    assert expect == ParseTool.parse_string_param_if_config(string_param)
+
+
+def test_parse_possible_yaml_file():
+    """Test parsing possible path."""
+    folder = Path(path_yaml_example)
+    file_name = "Shell.yaml"
+    path = folder.joinpath(file_name)
+
+    with open(path, "r") as f:
+        expect = "".join(f)
+
+    string_param = '$FILE{"%s"}' % file_name
+    content_ = ParseTool.parse_string_param_if_file(string_param, base_folder=folder)
+
+    assert expect == content_
+
+
+def test_parse_tool_parse_possible_path_file():
+    """Test parsing possible path."""
+    folder = Path(path_yaml_example)
+    file_name = "Shell.yaml"
+    path = folder.joinpath(file_name)
+
+    possible_path = ParseTool.get_possible_path(path, base_folder=folder)
+    assert path == possible_path
+
+    possible_path = ParseTool.get_possible_path(file_name, base_folder=folder)
+    assert path == possible_path
+
+    possible_path = ParseTool.get_possible_path(file_name, base_folder=".")
+    assert path != possible_path
+
+
+@pytest.mark.parametrize(
+    "task_type, expect",
+    [
+        ("shell", tasks.Shell),
+        ("Shell", tasks.Shell),
+        ("ShEll", tasks.Shell),
+        ("Condition", tasks.Condition),
+        ("DataX", tasks.DataX),
+        ("CustomDataX", tasks.CustomDataX),
+        ("Dependent", tasks.Dependent),
+        ("Flink", tasks.Flink),
+        ("Http", tasks.Http),
+        ("MR", tasks.MR),
+        ("Procedure", tasks.Procedure),
+        ("Python", tasks.Python),
+        ("Shell", tasks.Shell),
+        ("Spark", tasks.Spark),
+        ("Sql", tasks.Sql),
+        ("SubProcess", tasks.SubProcess),
+        ("Switch", tasks.Switch),
+        ("SageMaker", tasks.SageMaker),
+    ],
+)
+def test_get_task(task_type, expect):
+    """Test get task function."""
+    assert expect == get_task_cls(task_type)
+
+
+@pytest.mark.parametrize(
+    "task_type",
+    [
+        ("MYSQL"),
+    ],
+)
+def test_get_error(task_type):
+    """Test get task cls error."""
+    with pytest.raises(
+        PyDSTaskNoFoundException,
+        match=f"not find task {task_type}",
+    ):
+        get_task_cls(task_type)
+
+
+@pytest.mark.parametrize(
+    "yaml_file",
+    [
+        ("Condition.yaml"),
+        ("DataX.yaml"),
+        ("Dependent.yaml"),
+        ("Flink.yaml"),
+        ("Procedure.yaml"),
+        ("Http.yaml"),
+        ("MapReduce.yaml"),
+        ("Python.yaml"),
+        ("Shell.yaml"),
+        ("Spark.yaml"),
+        ("Sql.yaml"),
+        ("SubProcess.yaml"),
+        # ("Switch.yaml"),
+        ("MoreConfiguration.yaml"),
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.engine.Engine.get_resource_info",
+    return_value=({"id": 1, "name": "test"}),
+)
+@patch(
+    "pydolphinscheduler.core.database.Database.get_database_info",
+    return_value=({"id": 1, "type": "mock_type"}),
+)
+@patch(
+    "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
+    return_value={
+        "projectCode": 0,
+        "processDefinitionCode": 0,
+        "taskDefinitionCode": 0,
+    },
+)
+@patch.object(ProcessDefinition, "run")
+@patch.object(ProcessDefinition, "submit")
+def test_get_create_process_definition(
+    prun, psubmit, dep_item, db_info, resource_info, yaml_file
+):
+    """Test create_process_definition function to parse example YAML file."""
+    yaml_file_path = Path(path_yaml_example).joinpath(yaml_file)
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
+    ):
+        create_process_definition(yaml_file_path)
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py b/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py
index 68d93c404b..974ab3d47c 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py
@@ -24,6 +24,7 @@ project_root = Path(__file__).parent.parent.parent
 
 path_code_tasks = project_root.joinpath("src", "pydolphinscheduler", "tasks")
 path_example = project_root.joinpath("src", "pydolphinscheduler", "examples")
+path_yaml_example = project_root.joinpath("examples", "yaml_define")
 path_doc_tasks = project_root.joinpath("docs", "source", "tasks")
 path_default_config_yaml = project_root.joinpath(
     "src", "pydolphinscheduler", "default_config.yaml"