You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/17 00:56:49 UTC
[dolphinscheduler] branch 3.1.0-prepare updated: [3.1.0-prepare][cherry-pick][DSIP-11][python] create workflows from YAML configuration (#11988)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch 3.1.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.1.0-prepare by this push:
new 5e42f52bdf [3.1.0-prepare][cherry-pick][DSIP-11][python] create workflows from YAML configuration (#11988)
5e42f52bdf is described below
commit 5e42f52bdfa4219132ae122a84f81990b8a8651c
Author: JieguangZhou <ji...@163.com>
AuthorDate: Sat Sep 17 08:56:37 2022 +0800
[3.1.0-prepare][cherry-pick][DSIP-11][python] create workflows from YAML configuration (#11988)
* [DSIP-11][python] create workflows from YAML configuration (#11611)
(cherry picked from commit 38ee91fb1edf9fc5a86384cbcade4da7d003687a)
* fix pydolphin yaml doc link (#11817)
(cherry picked from commit c41fa5a8b14706702ded7b790a484a457498586e)
---
.../docs/source/tasks/condition.rst | 7 +
.../pydolphinscheduler/docs/source/tasks/datax.rst | 13 +
.../docs/source/tasks/dependent.rst | 14 +
.../pydolphinscheduler/docs/source/tasks/flink.rst | 7 +
.../docs/source/tasks/func_wrap.rst | 2 +-
.../pydolphinscheduler/docs/source/tasks/http.rst | 8 +
.../docs/source/tasks/map_reduce.rst | 8 +
.../docs/source/tasks/procedure.rst | 8 +
.../docs/source/tasks/python.rst | 8 +
.../pydolphinscheduler/docs/source/tasks/shell.rst | 8 +
.../pydolphinscheduler/docs/source/tasks/spark.rst | 8 +
.../pydolphinscheduler/docs/source/tasks/sql.rst | 14 +
.../docs/source/tasks/sub_process.rst | 17 +
.../docs/source/tasks/switch.rst | 9 +
.../pydolphinscheduler/docs/source/tutorial.rst | 96 +++++
.../examples/yaml_define/Condition.yaml | 43 ++
.../examples/yaml_define/DataX.yaml | 33 ++
.../examples/yaml_define/Dependent.yaml | 76 ++++
.../examples/yaml_define/Dependent_External.yaml | 26 ++
.../examples/yaml_define/Flink.yaml | 29 ++
.../examples/yaml_define/Http.yaml | 37 ++
.../examples/yaml_define/MapReduce.yaml | 29 ++
.../examples/yaml_define/MoreConfiguration.yaml | 40 ++
.../examples/yaml_define/Procedure.yaml | 27 ++
.../examples/yaml_define/Python.yaml | 30 ++
.../examples/yaml_define/Shell.yaml | 40 ++
.../examples/yaml_define/Spark.yaml | 30 ++
.../examples/yaml_define/Sql.yaml | 45 ++
.../examples/yaml_define/SubProcess.yaml | 27 ++
.../examples/yaml_define/Switch.yaml | 39 ++
.../examples/yaml_define/example_datax.json | 62 +++
.../examples/yaml_define/example_sql.sql | 22 +
.../examples/yaml_define/example_sub_workflow.yaml | 26 ++
.../examples/yaml_define/tutorial.yaml | 46 ++
.../src/pydolphinscheduler/cli/commands.py | 14 +
.../pydolphinscheduler/core/yaml_process_define.py | 466 +++++++++++++++++++++
.../src/pydolphinscheduler/tasks/__init__.py | 1 +
.../tests/core/test_yaml_process_define.py | 191 +++++++++
.../pydolphinscheduler/tests/testing/path.py | 1 +
39 files changed, 1606 insertions(+), 1 deletion(-)
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst
index 20b0350078..f6d7e6ad8f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst
@@ -31,3 +31,10 @@ Dive Into
---------
.. automodule:: pydolphinscheduler.tasks.condition
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Condition.yaml
+ :start-after: # under the License.
+ :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst
index c07726941e..cb67a2fa9e 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst
@@ -31,3 +31,16 @@ Dive Into
---------
.. automodule:: pydolphinscheduler.tasks.datax
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/DataX.yaml
+ :start-after: # under the License.
+ :language: yaml
+
+
+example_datax.json:
+
+.. literalinclude:: ../../../examples/yaml_define/example_datax.json
+ :language: json
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst
index fe26d0f30a..d8e1599b2d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst
@@ -31,3 +31,17 @@ Dive Into
---------
.. automodule:: pydolphinscheduler.tasks.dependent
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Dependent.yaml
+ :start-after: # under the License.
+ :language: yaml
+
+Dependent_External.yaml:
+
+.. literalinclude:: ../../../examples/yaml_define/Dependent_External.yaml
+ :start-after: # under the License.
+ :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst
index 8db9ac266d..76eb484718 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst
@@ -31,3 +31,10 @@ Dive Into
---------
.. automodule:: pydolphinscheduler.tasks.flink
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Flink.yaml
+ :start-after: # under the License.
+ :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst
index 5f41b80cfd..a4a2972933 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst
@@ -30,4 +30,4 @@ Example
Dive Into
---------
-.. automodule:: pydolphinscheduler.tasks.func_wrap
\ No newline at end of file
+.. automodule:: pydolphinscheduler.tasks.func_wrap
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst
index 4c6d8f8e40..4e138c9989 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst
@@ -19,3 +19,11 @@ HTTP
====
.. automodule:: pydolphinscheduler.tasks.http
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Http.yaml
+ :start-after: # under the License.
+ :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst
index 068b8d8b41..7356880b26 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst
@@ -32,3 +32,11 @@ Dive Into
---------
.. automodule:: pydolphinscheduler.tasks.map_reduce
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/MapReduce.yaml
+ :start-after: # under the License.
+ :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst
index cd79eff140..2f28efc526 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst
@@ -19,3 +19,11 @@ Procedure
=========
.. automodule:: pydolphinscheduler.tasks.procedure
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Procedure.yaml
+ :start-after: # under the License.
+ :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst
index 660e46a6e5..1bf6210018 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst
@@ -19,3 +19,11 @@ Python
======
.. automodule:: pydolphinscheduler.tasks.python
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Python.yaml
+ :start-after: # under the License.
+ :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst
index 5ce16c3c9f..2dd106a3b8 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst
@@ -31,3 +31,11 @@ Dive Into
---------
.. automodule:: pydolphinscheduler.tasks.shell
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Shell.yaml
+ :start-after: # under the License.
+ :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst
index cdb5902c37..d5a51db91a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst
@@ -31,3 +31,11 @@ Dive Into
---------
.. automodule:: pydolphinscheduler.tasks.spark
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Spark.yaml
+ :start-after: # under the License.
+ :language: yaml
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst
index 21eaec7ae9..52df042b74 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst
@@ -19,3 +19,17 @@ SQL
===
.. automodule:: pydolphinscheduler.tasks.sql
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Sql.yaml
+ :start-after: # under the License.
+ :language: yaml
+
+example_sql.sql:
+
+.. literalinclude:: ../../../examples/yaml_define/example_sql.sql
+ :start-after: */
+ :language: sql
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst
index 8a9f562200..894dd0fbad 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst
@@ -19,3 +19,20 @@ Sub Process
===========
.. automodule:: pydolphinscheduler.tasks.sub_process
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/SubProcess.yaml
+ :start-after: # under the License.
+ :language: yaml
+
+
+
+example_subprocess.yaml:
+
+.. literalinclude:: ../../../examples/yaml_define/example_sub_workflow.yaml
+ :start-after: # under the License.
+ :language: yaml
+
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst
index d8b34a4ac9..2fef589efb 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst
@@ -31,3 +31,12 @@ Dive Into
---------
.. automodule:: pydolphinscheduler.tasks.switch
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Switch.yaml
+ :start-after: # under the License.
+ :language: yaml
+
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
index 6366c803bb..57d21b2d29 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
@@ -37,6 +37,8 @@ There are two types of tutorials: traditional and task decorator.
versatility to the traditional way because it only supported Python functions and without build-in tasks
supported. But it is helpful if your workflow is all built with Python or if you already have some Python
workflow code and want to migrate them to pydolphinscheduler.
+- **YAML File**: We can use pydolphinscheduler CLI to create process using YAML file: :code:`pydolphinscheduler yaml -f tutorial.yaml`.
+ We can find more YAML file examples in `examples/yaml_define <https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define>`_
.. tab:: Tradition
@@ -52,6 +54,12 @@ There are two types of tutorials: traditional and task decorator.
:start-after: [start tutorial]
:end-before: [end tutorial]
+.. tab:: YAML File
+
+ .. literalinclude:: ../../examples/yaml_define/tutorial.yaml
+ :start-after: # under the License.
+ :language: yaml
+
Import Necessary Module
-----------------------
@@ -104,6 +112,13 @@ will be running this task in the DolphinScheduler worker. See :ref:`section tena
:start-after: [start workflow_declare]
:end-before: [end workflow_declare]
+.. tab:: YAML File
+
+ .. literalinclude:: ../../examples/yaml_define/tutorial.yaml
+ :start-after: # under the License.
+ :end-before: # Define the tasks under the workflow
+ :language: yaml
+
We could find more detail about :code:`ProcessDefinition` in :ref:`concept about process definition <concept:process definition>`
if you are interested in it. For all arguments of object process definition, you could find in the
:class:`pydolphinscheduler.core.process_definition` API documentation.
@@ -139,6 +154,12 @@ Task Declaration
It makes our workflow more Pythonic, but be careful that when we use task decorator mode mean we only use
Python function as a task and could not use the :doc:`built-in tasks <tasks/index>` most of the cases.
+.. tab:: YAML File
+
+ .. literalinclude:: ../../examples/yaml_define/tutorial.yaml
+ :start-after: # Define the tasks under the workflow
+ :language: yaml
+
Setting Task Dependence
-----------------------
@@ -167,6 +188,14 @@ and task `task_child_two` was done, because both two task is `task_union`'s upst
:start-after: [start task_relation_declare]
:end-before: [end task_relation_declare]
+.. tab:: YAML File
+
+ We can use :code:`deps:[]` to set task dependence
+
+ .. literalinclude:: ../../examples/yaml_define/tutorial.yaml
+ :start-after: # Define the tasks under the workflow
+ :language: yaml
+
.. note::
We could set task dependence in batch mode if they have the same downstream or upstream by declaring those
@@ -198,6 +227,17 @@ will create workflow definition as well as workflow schedule.
:start-after: [start submit_or_run]
:end-before: [end submit_or_run]
+.. tab:: YAML File
+
+ pydolphinscheduler YAML CLI always submit workflow. We can run the workflow if we set :code:`run: true`
+
+ .. code-block:: yaml
+
+ # Define the workflow
+ workflow:
+ name: "tutorial"
+ run: true
+
At last, we could execute this workflow code in your terminal like other Python scripts, running
:code:`python tutorial.py` to trigger and execute it.
@@ -219,5 +259,61 @@ named "tutorial" or "tutorial_decorator". The task graph of workflow like below:
:language: text
:lines: 24-28
+Create Process Using YAML File
+------------------------------
+
+We can use pydolphinscheduler CLI to create process using YAML file
+
+.. code-block:: bash
+
+ pydolphinscheduler yaml -f Shell.yaml
+
+We can use the following four special grammars to define workflows more flexibly.
+
+- :code:`$FILE{"file_name"}`: Read the file (:code:`file_name`) contents and replace them to that location.
+- :code:`$WORKFLOW{"other_workflow.yaml"}`: Refer to another process defined using YAML file (:code:`other_workflow.yaml`) and replace the process name in this location.
+- :code:`$ENV{env_name}`: Read the environment variable (:code:`env_name`) and replace it to that location.
+- :code:`${CONFIG.key_name}`: Read the configuration value of key (:code:`key_name`) and it them to that location.
+
+
+In addition, when loading the file path use :code:`$FILE{"file_name"}` or :code:`$WORKFLOW{"other_workflow.yaml"}`, pydolphinscheduler will search in the path of the YAMl file if the file does not exist.
+
+For exmaples, our file directory structure is as follows:
+
+.. code-block:: bash
+
+ .
+ └── yaml_define
+ ├── Condition.yaml
+ ├── DataX.yaml
+ ├── Dependent_External.yaml
+ ├── Dependent.yaml
+ ├── example_datax.json
+ ├── example_sql.sql
+ ├── example_subprocess.yaml
+ ├── Flink.yaml
+ ├── Http.yaml
+ ├── MapReduce.yaml
+ ├── MoreConfiguration.yaml
+ ├── Procedure.yaml
+ ├── Python.yaml
+ ├── Shell.yaml
+ ├── Spark.yaml
+ ├── Sql.yaml
+ ├── SubProcess.yaml
+ └── Switch.yaml
+
+After we run
+
+.. code-block:: bash
+
+ pydolphinscheduler yaml -file yaml_define/SubProcess.yaml
+
+
+the :code:`$WORKFLOW{"example_sub_workflow.yaml"}` will be set to :code:`$WORKFLOW{"yaml_define/example_sub_workflow.yaml"}`, because :code:`./example_subprocess.yaml` does not exist and :code:`yaml_define/example_sub_workflow.yaml` does.
+
+Furthermore, this feature supports recursion all the way down.
+
+
.. _`DolphinScheduler project page`: https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/project.html
.. _`Python context manager`: https://docs.python.org/3/library/stdtypes.html#context-manager-types
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Condition.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Condition.yaml
new file mode 100644
index 0000000000..c65b8c7aeb
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Condition.yaml
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "Condition"
+
+# Define the tasks under the workflow
+tasks:
+ - { "task_type": "Shell", "name": "pre_task_1", "command": "echo pre_task_1" }
+ - { "task_type": "Shell", "name": "pre_task_2", "command": "echo pre_task_2" }
+ - { "task_type": "Shell", "name": "pre_task_3", "command": "echo pre_task_3" }
+ - { "task_type": "Shell", "name": "success_branch", "command": "echo success_branch" }
+ - { "task_type": "Shell", "name": "fail_branch", "command": "echo fail_branch" }
+
+ - name: condition
+ task_type: Condition
+ success_task: success_branch
+ failed_task: fail_branch
+ op: AND
+ groups:
+ - op: AND
+ groups:
+ - task: pre_task_1
+ flag: true
+ - task: pre_task_2
+ flag: true
+ - task: pre_task_3
+ flag: false
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/DataX.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/DataX.yaml
new file mode 100644
index 0000000000..00ecd54685
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/DataX.yaml
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "DataX"
+
+# Define the tasks under the workflow
+tasks:
+ - name: task
+ task_type: DataX
+ datasource_name: db
+ datatarget_name: db
+ sql: show tables;
+ target_table: table_test
+
+ - name: task_custon_config
+ task_type: CustomDataX
+ json: $FILE{"example_datax.json"}
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent.yaml
new file mode 100644
index 0000000000..d69fac05da
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent.yaml
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+workflow:
+ name: "Dependent"
+
+# Define the tasks under the workflow
+tasks:
+ - name: dependent
+ task_type: Dependent
+ denpendence:
+ op: and
+ groups:
+ - op: or
+ groups:
+ - project_name: pydolphin
+ process_definition_name: task_dependent_external
+ dependent_task_name: task_1
+
+ - project_name: pydolphin
+ process_definition_name: task_dependent_external
+ dependent_task_name: task_2
+
+ - op: and
+ groups:
+ - project_name: pydolphin
+ process_definition_name: task_dependent_external
+ dependent_task_name: task_1
+ dependent_date: LAST_WEDNESDAY
+
+ - project_name: pydolphin
+ process_definition_name: task_dependent_external
+ dependent_task_name: task_2
+ dependent_date: last24Hours
+
+ - name: dependent_var
+ task_type: Dependent
+ denpendence:
+ op: and
+ groups:
+ - op: or
+ # we can use ${CONFIG.WORKFLOW_PROJECT} to set the value to configuration.WORKFLOW_PROJECT
+ # we can use $WORKFLOW{"Dependent_External.yaml"} to create or update a workflow from dependent_external.yaml and set the value to that workflow name
+ groups:
+ - project_name: ${CONFIG.WORKFLOW_PROJECT}
+ process_definition_name: $WORKFLOW{"Dependent_External.yaml"}
+ dependent_task_name: task_1
+
+ - project_name: ${CONFIG.WORKFLOW_PROJECT}
+ process_definition_name: $WORKFLOW{"Dependent_External.yaml"}
+ dependent_task_name: task_2
+ - op: and
+ groups:
+ - project_name: ${CONFIG.WORKFLOW_PROJECT}
+ process_definition_name: $WORKFLOW{"Dependent_External.yaml"}
+ dependent_task_name: task_1
+ dependent_date: LAST_WEDNESDAY
+
+ - project_name: ${CONFIG.WORKFLOW_PROJECT}
+ process_definition_name: $WORKFLOW{"Dependent_External.yaml"}
+ dependent_task_name: task_2
+ dependent_date: last24Hours
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent_External.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent_External.yaml
new file mode 100644
index 0000000000..577ff6a807
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Dependent_External.yaml
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "task_dependent_external"
+
+# Define the tasks under the workflow
+tasks:
+ - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
+ - { "task_type": "Shell", "name": "task_2", "command": "echo task 2" }
+ - { "task_type": "Shell", "name": "task_3", "command": "echo task 3" }
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Flink.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Flink.yaml
new file mode 100644
index 0000000000..2449d435a3
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Flink.yaml
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "Flink"
+
+# Define the tasks under the workflow
+tasks:
+ - name: task
+ task_type: Flink
+ main_class: org.apache.flink.streaming.examples.wordcount.WordCount
+ main_package: test_java.jar
+ program_type: JAVA
+ deploy_mode: local
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Http.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Http.yaml
new file mode 100644
index 0000000000..1483aeb3d8
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Http.yaml
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "Http"
+
+# Define the tasks under the workflow
+tasks:
+ - name: task
+ task_type: Http
+ url: "https://httpbin.org/get"
+ http_method: "GET"
+ http_params:
+ - { "prop": "a", "httpParametersType": "PARAMETER", "value": "1" }
+ - { "prop": "b", "httpParametersType": "PARAMETER", "value": "2" }
+ - {
+ "prop": "Content-Type",
+ "httpParametersType": "header",
+ "value": "test",
+ }
+ http_check_condition: "STATUS_CODE_CUSTOM"
+ condition: "404"
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MapReduce.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MapReduce.yaml
new file mode 100644
index 0000000000..e1a2b5709c
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MapReduce.yaml
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "MapReduce"
+
+# Define the tasks under the workflow
+tasks:
+ - name: task
+ task_type: MR
+ main_class: wordcount
+ main_package: test_java.jar
+ program_type: SCALA
+ main_args: /dolphinscheduler/tenant_exists/resources/file.txt /output/ds
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MoreConfiguration.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MoreConfiguration.yaml
new file mode 100644
index 0000000000..258aa33433
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/MoreConfiguration.yaml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "MoreConfiguration"
+ param:
+ n: 1
+
+# Define the tasks under the workflow
+tasks:
+ - name: shell_0
+ task_type: Shell
+ description: "yaml define task"
+ flag: "YES"
+ command: |
+ echo "$ENV{HOME}"
+ echo "${n}"
+ task_priority: "HIGH"
+ delay_time: 20
+ fail_retry_times: 30
+ fail_retry_interval: 5
+ timeout_flag: "CLOSE"
+ timeout: 60
+ local_params:
+ - { "prop": "n", "direct": "IN", "type": "VARCHAR", "value": "${n}" }
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Procedure.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Procedure.yaml
new file mode 100644
index 0000000000..829a961c1a
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Procedure.yaml
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "Procedure"
+
+# Define the tasks under the workflow
+tasks:
+ - name: task
+ task_type: Procedure
+ datasource_name: db
+ method: show tables;
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Python.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Python.yaml
new file mode 100644
index 0000000000..728b5c928e
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Python.yaml
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "Python"
+
+# Define the tasks under the workflow
+tasks:
+ - name: python
+ task_type: Python
+ definition: |
+ import os
+ print(os)
+ print("1")
+ print("2")
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Shell.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Shell.yaml
new file mode 100644
index 0000000000..fdbe126327
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Shell.yaml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "Shell"
+ release_state: "offline"
+ run: true
+
+# Define the tasks under the process
+tasks:
+ - name: task_parent
+ task_type: Shell
+ command: |
+ echo hello pydolphinscheduler
+ echo run task parent
+
+ - name: task_child_one
+ task_type: Shell
+ deps: [task_parent]
+ command: echo "child one"
+
+ - name: task_child_two
+ task_type: Shell
+ deps: [task_parent]
+ command: echo "child two"
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Spark.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Spark.yaml
new file mode 100644
index 0000000000..6132b8d749
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Spark.yaml
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "Spark"
+
+# Define the tasks under the workflow
+tasks:
+ - name: task
+ task_type: Spark
+ main_class: org.apache.spark.examples.SparkPi
+ main_package: test_java.jar
+ program_type: SCALA
+ deploy_mode: local
+ spark_version: SPARK1
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Sql.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Sql.yaml
new file mode 100644
index 0000000000..c3c7e88ee1
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Sql.yaml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "Sql"
+
+# Define the tasks under the workflow
+tasks:
+ - name: task_base
+ task_type: Sql
+ datasource_name: "db"
+ sql: show tables;
+
+ - name: task_multi_line
+ task_type: Sql
+ datasource_name: "db"
+ sql: |
+ show tables;
+ select id from version where id=1;
+
+ - name: task_file
+ task_type: Sql
+ datasource_name: "db"
+ sql: $FILE{"example_sql.sql"}
+
+ # Or you can define task "task_union" it with one line
+ - { "task_type": "Sql", "name": "task_base_one_line", "datasource_name": "db", "sql": "select id from version where id=1;"}
+
+ # Or you can define task "task_union" it with one line
+ - { "task_type": "Sql", "name": "task_file_one_line", "datasource_name": "db", "sql": '$FILE{"example_sql.sql"}'}
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/SubProcess.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/SubProcess.yaml
new file mode 100644
index 0000000000..0ea7549db4
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/SubProcess.yaml
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "SubWorkflow"
+
+tasks:
+ - name: example_workflow
+ task_type: SubProcess
+ process_definition_name: $WORKFLOW{"example_sub_workflow.yaml"}
+
+ - { "task_type": "Shell", "deps": [example_workflow], "name": "task_3", "command": "echo task 3" }
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Switch.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Switch.yaml
new file mode 100644
index 0000000000..33ed68813e
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/Switch.yaml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "Switch"
+ param:
+ var: 1
+
+# Define the tasks under the workflow
+tasks:
+ - name: switch_child_1
+ task_type: Shell
+ command: echo switch_child_1
+
+ - name: switch_child_2
+ task_type: Shell
+ command: echo switch_child_2
+
+ - name: switch
+ task_type: Switch
+ condition:
+ - task: switch_child_1
+ condition: "${var} > 1"
+ - task: switch_child_2
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_datax.json b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_datax.json
new file mode 100644
index 0000000000..3db8092cb6
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_datax.json
@@ -0,0 +1,62 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "mysqlreader",
+ "parameter": {
+ "username": "usr",
+ "password": "pwd",
+ "column": [
+ "id",
+ "name",
+ "code",
+ "description"
+ ],
+ "splitPk": "id",
+ "connection": [
+ {
+ "table": [
+ "source_table"
+ ],
+ "jdbcUrl": [
+ "jdbc:mysql://127.0.0.1:3306/source_db"
+ ]
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "mysqlwriter",
+ "parameter": {
+ "writeMode": "insert",
+ "username": "usr",
+ "password": "pwd",
+ "column": [
+ "id",
+ "name"
+ ],
+ "connection": [
+ {
+ "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db",
+ "table": [
+ "target_table"
+ ]
+ }
+ ]
+ }
+ }
+ }
+ ],
+ "setting": {
+ "errorLimit": {
+ "percentage": 0,
+ "record": 0
+ },
+ "speed": {
+ "channel": 1,
+ "record": 1000
+ }
+ }
+ }
+}
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sql.sql b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sql.sql
new file mode 100644
index 0000000000..06b5c4c16c
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sql.sql
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+select id from version where id=1;
+select id from version where id=2;
+select id from version where id=3;
+select id from version where id=4;
+select id from version where id=5;
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sub_workflow.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sub_workflow.yaml
new file mode 100644
index 0000000000..af3a863da9
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/example_sub_workflow.yaml
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "example_workflow_for_sub_workflow"
+
+# Define the tasks under the workflow
+tasks:
+ - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
+ - { "task_type": "Shell", "deps": [task_1], "name": "task_2", "command": "echo task 2" }
+ - { "task_type": "Shell", "deps": [task_2], "name": "task_3", "command": "echo task 3" }
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/tutorial.yaml b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/tutorial.yaml
new file mode 100644
index 0000000000..104a8c367b
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define/tutorial.yaml
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+ name: "tutorial"
+ schedule: "0 0 0 * * ? *"
+ start_time: "2021-01-01"
+ tenant: "tenant_exists"
+ release_state: "offline"
+ run: true
+
+# Define the tasks under the workflow
+tasks:
+ - name: task_parent
+ task_type: Shell
+ command: echo hello pydolphinscheduler
+
+ - name: task_child_one
+ task_type: Shell
+ deps: [task_parent]
+ command: echo "child one"
+
+ - name: task_child_two
+ task_type: Shell
+ deps: [task_parent]
+ command: echo "child two"
+
+ - name: task_union
+ task_type: Shell
+ deps: [task_child_one, task_child_two]
+ command: echo "union"
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py
index d78e503dbb..8d923f1406 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/cli/commands.py
@@ -26,6 +26,7 @@ from pydolphinscheduler.configuration import (
init_config_file,
set_single_config,
)
+from pydolphinscheduler.core.yaml_process_define import create_process_definition
version_option_val = ["major", "minor", "micro"]
@@ -90,3 +91,16 @@ def config(getter, setter, init) -> None:
for key, val in setter:
set_single_config(key, val)
click.echo("Set configuration done.")
+
+
+@cli.command()
+@click.option(
+ "--yaml_file",
+ "-f",
+ required=True,
+ help="YAML file path",
+ type=click.Path(exists=True),
+)
+def yaml(yaml_file) -> None:
+ """Create process definition using YAML file."""
+ create_process_definition(yaml_file)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/yaml_process_define.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/yaml_process_define.py
new file mode 100644
index 0000000000..0944925a48
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/yaml_process_define.py
@@ -0,0 +1,466 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Parse YAML file to create process."""
+
+import logging
+import os
+import re
+from pathlib import Path
+from typing import Any, Dict
+
+from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
+from pydolphinscheduler.utils.yaml_parser import YamlParser
+
+logger = logging.getLogger(__file__)
+
+KEY_PROCESS = "workflow"
+KEY_TASK = "tasks"
+KEY_TASK_TYPE = "task_type"
+KEY_DEPS = "deps"
+KEY_OP = "op"
+
+TASK_SPECIAL_KEYS = [KEY_TASK_TYPE, KEY_DEPS]
+
+
+class ParseTool:
+ """Enhanced parsing tools."""
+
+ @staticmethod
+ def parse_string_param_if_file(string_param: str, **kwargs):
+ """Use $FILE{"data_path"} to load file from "data_path"."""
+ if string_param.startswith("$FILE"):
+ path = re.findall(r"\$FILE\{\"(.*?)\"\}", string_param)[0]
+ base_folder = kwargs.get("base_folder", ".")
+ path = ParseTool.get_possible_path(path, base_folder)
+ with open(path, "r") as read_file:
+ string_param = "".join(read_file)
+ return string_param
+
+ @staticmethod
+ def parse_string_param_if_env(string_param: str, **kwargs):
+ """Use $ENV{env_name} to load environment variable "env_name"."""
+ if "$ENV" in string_param:
+ key = re.findall(r"\$ENV\{(.*?)\}", string_param)[0]
+ env_value = os.environ.get(key, "$%s" % key)
+ string_param = string_param.replace("$ENV{%s}" % key, env_value)
+ return string_param
+
+ @staticmethod
+ def parse_string_param_if_config(string_param: str, **kwargs):
+ """Use ${CONFIG.var_name} to load variable "var_name" from configuration."""
+ if "${CONFIG" in string_param:
+ key = re.findall(r"\$\{CONFIG\.(.*?)\}", string_param)[0]
+ if hasattr(configuration, key):
+ string_param = getattr(configuration, key)
+ else:
+ string_param = configuration.get_single_config(key)
+
+ return string_param
+
+ @staticmethod
+ def get_possible_path(file_path, base_folder):
+ """Get file possible path.
+
+ Return new path if file_path is not exists, but base_folder + file_path exists
+ """
+ possible_path = file_path
+ if not Path(file_path).exists():
+ new_path = Path(base_folder).joinpath(file_path)
+ if new_path.exists():
+ possible_path = new_path
+ logger.info(f"{file_path} not exists, convert to {possible_path}")
+
+ return possible_path
+
+
+def get_task_cls(task_type) -> Task:
+ """Get the task class object by task_type (case compatible)."""
+ # only get task class from tasks.__all__
+ all_task_types = {type_.capitalize(): type_ for type_ in tasks.__all__}
+ task_type_cap = task_type.capitalize()
+ if task_type_cap not in all_task_types:
+ raise PyDSTaskNoFoundException("cant not find task %s" % task_type)
+
+ standard_name = all_task_types[task_type_cap]
+ return getattr(tasks, standard_name)
+
+
+class YamlProcess(YamlParser):
+ """Yaml parser for create process.
+
+ :param yaml_file: yaml file path.
+
+ examples1 ::
+
+ parser = YamlParser(yaml_file=...)
+ parser.create_process_definition()
+
+ examples2 ::
+
+ YamlParser(yaml_file=...).create_process_definition()
+
+ """
+
+ _parse_rules = [
+ ParseTool.parse_string_param_if_file,
+ ParseTool.parse_string_param_if_env,
+ ParseTool.parse_string_param_if_config,
+ ]
+
+ def __init__(self, yaml_file: str):
+ with open(yaml_file, "r") as f:
+ content = f.read()
+
+ self._base_folder = Path(yaml_file).parent
+ content = self.prepare_refer_process(content)
+ super().__init__(content)
+
+ def create_process_definition(self):
+ """Create process main function."""
+ # get process parameters with key "workflow"
+ process_params = self[KEY_PROCESS]
+
+ # pop "run" parameter, used at the end
+ is_run = process_params.pop("run", False)
+
+ # use YamlProcess._parse_rules to parse special value of yaml file
+ process_params = self.parse_params(process_params)
+
+ process_name = process_params["name"]
+ logger.info(f"Create Process: {process_name}")
+ with ProcessDefinition(**process_params) as pd:
+
+ # save dependencies between tasks
+ dependencies = {}
+
+ # save name and task mapping
+ name2task = {}
+
+ # get task datas with key "tasks"
+ for task_data in self[KEY_TASK]:
+ task = self.parse_task(task_data, name2task)
+
+ deps = task_data.get(KEY_DEPS, [])
+ if deps:
+ dependencies[task.name] = deps
+ name2task[task.name] = task
+
+ # build dependencies between task
+ for downstream_task_name, deps in dependencies.items():
+ downstream_task = name2task[downstream_task_name]
+ for upstream_task_name in deps:
+ upstream_task = name2task[upstream_task_name]
+ upstream_task >> downstream_task
+
+ pd.submit()
+ # if set is_run, run the process after submit
+ if is_run:
+ logger.info(f"run workflow: {pd}")
+ pd.run()
+
+ return process_name
+
+ def parse_params(self, params: Any):
+ """Recursively resolves the parameter values.
+
+ The function operates params only when it encounters a string; other types continue recursively.
+ """
+ if isinstance(params, str):
+ for parse_rule in self._parse_rules:
+ params_ = params
+ params = parse_rule(params, base_folder=self._base_folder)
+ if params_ != params:
+ logger.info(f"parse {params_} -> {params}")
+
+ elif isinstance(params, list):
+ for index in range(len(params)):
+ params[index] = self.parse_params(params[index])
+
+ elif isinstance(params, dict):
+ for key, value in params.items():
+ params[key] = self.parse_params(value)
+
+ return params
+
+ @classmethod
+ def parse(cls, yaml_file: str):
+ """Recursively resolves the parameter values.
+
+ The function operates params only when it encounters a string; other types continue recursively.
+ """
+ process_name = cls(yaml_file).create_process_definition()
+ return process_name
+
+ def prepare_refer_process(self, content):
+ """Allow YAML files to reference process derived from other YAML files."""
+ process_paths = re.findall(r"\$WORKFLOW\{\"(.*?)\"\}", content)
+ for process_path in process_paths:
+ logger.info(
+ f"find special token {process_path}, load process form {process_path}"
+ )
+ possible_path = ParseTool.get_possible_path(process_path, self._base_folder)
+ process_name = YamlProcess.parse(possible_path)
+ content = content.replace('$WORKFLOW{"%s"}' % process_path, process_name)
+
+ return content
+
+ def parse_task(self, task_data: dict, name2task: Dict[str, Task]):
+ """Parse various types of tasks.
+
+ :param task_data: dict.
+ {
+ "task_type": "Shell",
+ "params": {"name": "shell_task", "command":"ehco hellp"}
+ }
+
+ :param name2task: Dict[str, Task]), mapping of task_name and task
+
+
+ Some task type have special parse func:
+ if task type is Switch, use parse_switch;
+ if task type is Condition, use parse_condition;
+ if task type is Dependent, use parse_dependent;
+ other, we pass all task_params as input to task class, like "task_cls(**task_params)".
+ """
+ task_type = task_data["task_type"]
+ # get params without special key
+ task_params = {k: v for k, v in task_data.items() if k not in TASK_SPECIAL_KEYS}
+
+ task_cls = get_task_cls(task_type)
+
+ # use YamlProcess._parse_rules to parse special value of yaml file
+ task_params = self.parse_params(task_params)
+
+ if task_cls == tasks.Switch:
+ task = self.parse_switch(task_params, name2task)
+
+ elif task_cls == tasks.Condition:
+ task = self.parse_condition(task_params, name2task)
+
+ elif task_cls == tasks.Dependent:
+ task = self.parse_dependent(task_params, name2task)
+
+ else:
+ task = task_cls(**task_params)
+ logger.info(task_type, task)
+ return task
+
+ def parse_switch(self, task_params, name2task):
+ """Parse Switch Task.
+
+ This is an example Yaml fragment of task_params
+
+ name: switch
+ condition:
+ - ["${var} > 1", switch_child_1]
+ - switch_child_2
+ """
+ from pydolphinscheduler.tasks.switch import (
+ Branch,
+ Default,
+ Switch,
+ SwitchCondition,
+ )
+
+ condition_datas = task_params["condition"]
+ conditions = []
+ for condition_data in condition_datas:
+ assert "task" in condition_data, "task must be in %s" % condition_data
+ task_name = condition_data["task"]
+ condition_string = condition_data.get("condition", None)
+
+ # if condition_string is None, for example: {"task": "switch_child_2"}, set it to Default branch
+ if condition_string is None:
+ conditions.append(Default(task=name2task.get(task_name)))
+
+ # if condition_string is not None, for example:
+ # {"task": "switch_child_2", "condition": "${var} > 1"} set it to Branch
+ else:
+ conditions.append(
+ Branch(condition_string, task=name2task.get(task_name))
+ )
+
+ switch = Switch(
+ name=task_params["name"], condition=SwitchCondition(*conditions)
+ )
+ return switch
+
+ def parse_condition(self, task_params, name2task):
+ """Parse Condition Task.
+
+ This is an example Yaml fragment of task_params
+
+ name: condition
+ success_task: success_branch
+ failed_task: fail_branch
+ OP: AND
+ groups:
+ -
+ OP: AND
+ groups:
+ - [pre_task_1, true]
+ - [pre_task_2, true]
+ - [pre_task_3, false]
+ -
+ OP: AND
+ groups:
+ - [pre_task_1, false]
+ - [pre_task_2, true]
+ - [pre_task_3, true]
+
+ """
+ from pydolphinscheduler.tasks.condition import (
+ FAILURE,
+ SUCCESS,
+ And,
+ Condition,
+ Or,
+ )
+
+ def get_op_cls(op):
+ cls = None
+ if op.lower() == "and":
+ cls = And
+ elif op.lower() == "or":
+ cls = Or
+ else:
+ raise Exception("OP must be in And or Or, but get: %s" % op)
+ return cls
+
+ second_cond_ops = []
+ for first_group in task_params["groups"]:
+ second_op = first_group["op"]
+ task_ops = []
+ for condition_data in first_group["groups"]:
+ assert "task" in condition_data, "task must be in %s" % condition_data
+ assert "flag" in condition_data, "flag must be in %s" % condition_data
+ task_name = condition_data["task"]
+ flag = condition_data["flag"]
+ task = name2task[task_name]
+
+ # for example: task = pre_task_1, flag = true
+ if flag:
+ task_ops.append(SUCCESS(task))
+ else:
+ task_ops.append(FAILURE(task))
+
+ second_cond_ops.append(get_op_cls(second_op)(*task_ops))
+
+ first_op = task_params["op"]
+ cond_operator = get_op_cls(first_op)(*second_cond_ops)
+
+ condition = Condition(
+ name=task_params["name"],
+ condition=cond_operator,
+ success_task=name2task[task_params["success_task"]],
+ failed_task=name2task[task_params["failed_task"]],
+ )
+ return condition
+
+ def parse_dependent(self, task_params, name2task):
+ """Parse Dependent Task.
+
+ This is an example Yaml fragment of task_params
+
+ name: dependent
+ denpendence:
+ OP: AND
+ groups:
+ -
+ OP: Or
+ groups:
+ - [pydolphin, task_dependent_external, task_1]
+ - [pydolphin, task_dependent_external, task_2]
+ -
+ OP: And
+ groups:
+ - [pydolphin, task_dependent_external, task_1, LAST_WEDNESDAY]
+ - [pydolphin, task_dependent_external, task_2, last24Hours]
+
+ """
+ from pydolphinscheduler.tasks.dependent import (
+ And,
+ Dependent,
+ DependentDate,
+ DependentItem,
+ Or,
+ )
+
+ def process_dependent_date(dependent_date):
+ """Parse dependent date (Compatible with key and value of DependentDate)."""
+ dependent_date_upper = dependent_date.upper()
+ if hasattr(DependentDate, dependent_date_upper):
+ dependent_date = getattr(DependentDate, dependent_date_upper)
+ return dependent_date
+
+ def get_op_cls(op):
+ cls = None
+ if op.lower() == "and":
+ cls = And
+ elif op.lower() == "or":
+ cls = Or
+ else:
+ raise Exception("OP must be in And or Or, but get: %s" % op)
+ return cls
+
+ def create_dependent_item(source_items):
+ """Parse dependent item.
+
+ project_name: pydolphin
+ process_definition_name: task_dependent_external
+ dependent_task_name: task_1
+ dependent_date: LAST_WEDNESDAY
+ """
+ project_name = source_items["project_name"]
+ process_definition_name = source_items["process_definition_name"]
+ dependent_task_name = source_items["dependent_task_name"]
+ dependent_date = source_items.get("dependent_date", DependentDate.TODAY)
+ dependent_item = DependentItem(
+ project_name=project_name,
+ process_definition_name=process_definition_name,
+ dependent_task_name=dependent_task_name,
+ dependent_date=process_dependent_date(dependent_date),
+ )
+
+ return dependent_item
+
+ second_dependences = []
+ for first_group in task_params["groups"]:
+ second_op = first_group[KEY_OP]
+ dependence_items = []
+ for source_items in first_group["groups"]:
+ dependence_items.append(create_dependent_item(source_items))
+
+ second_dependences.append(get_op_cls(second_op)(*dependence_items))
+
+ first_op = task_params[KEY_OP]
+ dependence = get_op_cls(first_op)(*second_dependences)
+
+ task = Dependent(
+ name=task_params["name"],
+ dependence=dependence,
+ )
+ return task
+
+
+def create_process_definition(yaml_file):
+ """CLI."""
+ YamlProcess.parse(yaml_file)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
index d49a1d394c..53b462ca90 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
@@ -35,6 +35,7 @@ from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondi
__all__ = [
"Condition",
"DataX",
+ "CustomDataX",
"Dependent",
"Flink",
"Http",
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_yaml_process_define.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_yaml_process_define.py
new file mode 100644
index 0000000000..99ad179a5f
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_yaml_process_define.py
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test YAML process."""
+
+import os
+from pathlib import Path
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.yaml_process_define import (
+ ParseTool,
+ create_process_definition,
+ get_task_cls,
+)
+from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
+from tests.testing.path import path_yaml_example
+from tests.testing.task import Task
+
+
+@pytest.mark.parametrize(
+ "string_param, expect",
+ [
+ ("$ENV{PROJECT_NAME}", "~/pydolphinscheduler"),
+ ],
+)
+def test_parse_tool_env_exist(string_param, expect):
+ """Test parsing the environment variable."""
+ os.environ["PROJECT_NAME"] = expect
+ assert expect == ParseTool.parse_string_param_if_env(string_param)
+
+
+def test_parse_tool_env_not_exist():
+ """Test parsing the not exist environment variable."""
+ key = "THIS_ENV_NOT_EXIST_0000000"
+ string_param = "$ENV{%s}" % key
+ expect = "$" + key
+ assert expect == ParseTool.parse_string_param_if_env(string_param)
+
+
+@pytest.mark.parametrize(
+ "string_param, expect_key",
+ [
+ ("${CONFIG.java_gateway.address}", "java_gateway.address"),
+ ("${CONFIG.WORKFLOW_PROJECT}", "default.workflow.project"),
+ ],
+)
+def test_parse_tool_config(string_param, expect_key):
+ """Test parsing configuration."""
+ expect = configuration.get_single_config(expect_key)
+ assert expect == ParseTool.parse_string_param_if_config(string_param)
+
+
+def test_parse_possible_yaml_file():
+ """Test parsing possible path."""
+ folder = Path(path_yaml_example)
+ file_name = "Shell.yaml"
+ path = folder.joinpath(file_name)
+
+ with open(path, "r") as f:
+ expect = "".join(f)
+
+ string_param = '$FILE{"%s"}' % file_name
+ content_ = ParseTool.parse_string_param_if_file(string_param, base_folder=folder)
+
+ assert expect == content_
+
+
+def test_parse_tool_parse_possible_path_file():
+ """Test parsing possible path."""
+ folder = Path(path_yaml_example)
+ file_name = "Shell.yaml"
+ path = folder.joinpath(file_name)
+
+ possible_path = ParseTool.get_possible_path(path, base_folder=folder)
+ assert path == possible_path
+
+ possible_path = ParseTool.get_possible_path(file_name, base_folder=folder)
+ assert path == possible_path
+
+ possible_path = ParseTool.get_possible_path(file_name, base_folder=".")
+ assert path != possible_path
+
+
+@pytest.mark.parametrize(
+ "task_type, expect",
+ [
+ ("shell", tasks.Shell),
+ ("Shell", tasks.Shell),
+ ("ShEll", tasks.Shell),
+ ("Condition", tasks.Condition),
+ ("DataX", tasks.DataX),
+ ("CustomDataX", tasks.CustomDataX),
+ ("Dependent", tasks.Dependent),
+ ("Flink", tasks.Flink),
+ ("Http", tasks.Http),
+ ("MR", tasks.MR),
+ ("Procedure", tasks.Procedure),
+ ("Python", tasks.Python),
+ ("Shell", tasks.Shell),
+ ("Spark", tasks.Spark),
+ ("Sql", tasks.Sql),
+ ("SubProcess", tasks.SubProcess),
+ ("Switch", tasks.Switch),
+ ("SageMaker", tasks.SageMaker),
+ ],
+)
+def test_get_task(task_type, expect):
+ """Test get task function."""
+ assert expect == get_task_cls(task_type)
+
+
+@pytest.mark.parametrize(
+ "task_type",
+ [
+ ("MYSQL"),
+ ],
+)
+def test_get_error(task_type):
+ """Test get task cls error."""
+ with pytest.raises(
+ PyDSTaskNoFoundException,
+ match=f"not find task {task_type}",
+ ):
+ get_task_cls(task_type)
+
+
+@pytest.mark.parametrize(
+ "yaml_file",
+ [
+ ("Condition.yaml"),
+ ("DataX.yaml"),
+ ("Dependent.yaml"),
+ ("Flink.yaml"),
+ ("Procedure.yaml"),
+ ("Http.yaml"),
+ ("MapReduce.yaml"),
+ ("Python.yaml"),
+ ("Shell.yaml"),
+ ("Spark.yaml"),
+ ("Sql.yaml"),
+ ("SubProcess.yaml"),
+ # ("Switch.yaml"),
+ ("MoreConfiguration.yaml"),
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.engine.Engine.get_resource_info",
+ return_value=({"id": 1, "name": "test"}),
+)
+@patch(
+ "pydolphinscheduler.core.database.Database.get_database_info",
+ return_value=({"id": 1, "type": "mock_type"}),
+)
+@patch(
+ "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
+ return_value={
+ "projectCode": 0,
+ "processDefinitionCode": 0,
+ "taskDefinitionCode": 0,
+ },
+)
+@patch.object(ProcessDefinition, "run")
+@patch.object(ProcessDefinition, "submit")
+def test_get_create_process_definition(
+ prun, psubmit, dep_item, db_info, resource_info, yaml_file
+):
+ """Test create_process_definition function to parse example YAML file."""
+ yaml_file_path = Path(path_yaml_example).joinpath(yaml_file)
+ with patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
+ ):
+ create_process_definition(yaml_file_path)
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py b/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py
index 68d93c404b..974ab3d47c 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py
@@ -24,6 +24,7 @@ project_root = Path(__file__).parent.parent.parent
path_code_tasks = project_root.joinpath("src", "pydolphinscheduler", "tasks")
path_example = project_root.joinpath("src", "pydolphinscheduler", "examples")
+path_yaml_example = project_root.joinpath("examples", "yaml_define")
path_doc_tasks = project_root.joinpath("docs", "source", "tasks")
path_default_config_yaml = project_root.joinpath(
"src", "pydolphinscheduler", "default_config.yaml"