You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/11/09 03:51:57 UTC

[dolphinscheduler-sdk-python] branch main updated: [migrate] Add tag 3.1.0 source code (#6)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 6cd9784  [migrate] Add tag 3.1.0 source code (#6)
6cd9784 is described below

commit 6cd9784534eb3c78dbbf0daf53bf457a597fe3be
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Wed Nov 9 11:51:52 2022 +0800

    [migrate] Add tag 3.1.0 source code (#6)
    
    Add version 3.1.0 code from https://github.com/apache/dolphinscheduler/tree/3.1.0/dolphinscheduler-python/pydolphinscheduler
---
 DEVELOP.md                                         |  57 ++-
 README.md                                          |  36 +-
 RELEASE.md                                         |  32 +-
 UPDATING.md                                        |  40 +-
 docs/source/tasks/condition.rst                    |   7 +
 docs/source/tasks/datax.rst                        |  13 +
 docs/source/tasks/dependent.rst                    |  14 +
 docs/source/tasks/{map_reduce.rst => dvc.rst}      |  19 +-
 docs/source/tasks/flink.rst                        |   7 +
 docs/source/tasks/func_wrap.rst                    |   2 +-
 docs/source/tasks/http.rst                         |   8 +
 docs/source/tasks/index.rst                        |   6 +
 docs/source/tasks/map_reduce.rst                   |   8 +
 docs/source/tasks/{switch.rst => mlflow.rst}       |  19 +-
 docs/source/tasks/{condition.rst => openmldb.rst}  |  17 +-
 docs/source/tasks/procedure.rst                    |   8 +
 docs/source/tasks/python.rst                       |   8 +
 docs/source/tasks/{map_reduce.rst => pytorch.rst}  |  18 +-
 docs/source/tasks/{condition.rst => sagemaker.rst} |  21 +-
 docs/source/tasks/shell.rst                        |   8 +
 docs/source/tasks/spark.rst                        |   8 +
 docs/source/tasks/sql.rst                          |  14 +
 docs/source/tasks/sub_process.rst                  |  17 +
 docs/source/tasks/switch.rst                       |   9 +
 docs/source/tutorial.rst                           |  96 +++++
 examples/yaml_define/Condition.yaml                |  43 ++
 .../yaml_define/DataX.yaml                         |  33 +-
 examples/yaml_define/Dependent.yaml                |  76 ++++
 .../yaml_define/Dependent_External.yaml            |  28 +-
 .../yaml_define/Dvc.yaml                           |  44 +-
 .../yaml_define/Flink.yaml                         |  31 +-
 .../yaml_define/Http.yaml                          |  39 +-
 .../yaml_define/MapReduce.yaml                     |  31 +-
 .../yaml_define/MoreConfiguration.yaml             |  42 +-
 .../yaml_define/OpenMLDB.yaml                      |  35 +-
 .../yaml_define/Procedure.yaml                     |  29 +-
 .../yaml_define/Python.yaml                        |  32 +-
 examples/yaml_define/Pytorch.yaml                  |  53 +++
 .../yaml_define/Sagemaker.yaml                     |  28 +-
 .../yaml_define/Shell.yaml                         |  38 +-
 .../yaml_define/Spark.yaml                         |  32 +-
 .../resource.py => examples/yaml_define/Sql.yaml   |  54 +--
 .../yaml_define/SubProcess.yaml                    |  27 +-
 .../yaml_define/Switch.yaml                        |  37 +-
 examples/yaml_define/example_datax.json            |  62 +++
 examples/yaml_define/example_sagemaker_params.json |  18 +
 examples/yaml_define/example_sql.sql               |  22 +
 .../yaml_define/example_sub_workflow.yaml          |  28 +-
 examples/yaml_define/mlflow.yaml                   |  69 +++
 .../yaml_define/tutorial.yaml                      |  44 +-
 setup.py                                           |   2 +-
 src/pydolphinscheduler/cli/commands.py             |  14 +
 src/pydolphinscheduler/constants.py                |   5 +
 src/pydolphinscheduler/core/database.py            |   5 +-
 src/pydolphinscheduler/core/engine.py              |   5 +-
 src/pydolphinscheduler/core/process_definition.py  |  36 +-
 src/pydolphinscheduler/core/resource.py            |  34 +-
 src/pydolphinscheduler/core/task.py                |  76 +++-
 src/pydolphinscheduler/core/yaml_process_define.py | 466 +++++++++++++++++++++
 .../examples/task_dvc_example.py                   |  52 +++
 .../examples/task_mlflow_example.py                |  93 ++++
 .../examples/task_openmldb_example.py              |  39 +-
 .../examples/task_pytorch_example.py               |  62 +++
 .../examples/task_sagemaker_example.py             |  46 ++
 src/pydolphinscheduler/java_gateway.py             | 158 +++++++
 src/pydolphinscheduler/models/project.py           |   5 +-
 src/pydolphinscheduler/models/queue.py             |   8 -
 src/pydolphinscheduler/models/tenant.py            |   5 +-
 src/pydolphinscheduler/models/user.py              |   5 +-
 src/pydolphinscheduler/tasks/__init__.py           |  21 +
 src/pydolphinscheduler/tasks/dependent.py          |   5 +-
 src/pydolphinscheduler/tasks/dvc.py                | 124 ++++++
 src/pydolphinscheduler/tasks/mlflow.py             | 256 +++++++++++
 src/pydolphinscheduler/tasks/openmldb.py           |  48 +++
 src/pydolphinscheduler/tasks/pytorch.py            |  95 +++++
 .../{core/resource.py => tasks/sagemaker.py}       |  35 +-
 src/pydolphinscheduler/tasks/sub_process.py        |   5 +-
 src/pydolphinscheduler/tasks/switch.py             |   5 +
 tests/core/test_engine.py                          |   1 +
 tests/core/test_resource_definition.py             |  36 +-
 tests/core/test_task.py                            | 105 ++++-
 tests/core/test_yaml_process_define.py             | 191 +++++++++
 tests/integration/conftest.py                      |  27 +-
 tests/tasks/test_condition.py                      |   1 +
 tests/tasks/test_datax.py                          |   2 +
 tests/tasks/test_dependent.py                      |   1 +
 tests/tasks/test_dvc.py                            | 173 ++++++++
 tests/tasks/test_flink.py                          |   1 +
 tests/tasks/test_http.py                           |   1 +
 tests/tasks/test_map_reduce.py                     |   1 +
 tests/tasks/test_mlflow.py                         | 205 +++++++++
 .../tasks/{test_map_reduce.py => test_openmldb.py} |  52 ++-
 tests/tasks/test_procedure.py                      |   1 +
 tests/tasks/test_python.py                         |   1 +
 tests/tasks/test_pytorch.py                        | 124 ++++++
 tests/tasks/{test_shell.py => test_sagemaker.py}   |  45 +-
 tests/tasks/test_shell.py                          |   3 +
 tests/tasks/test_spark.py                          |   1 +
 tests/tasks/test_sql.py                            |   1 +
 tests/tasks/test_sub_process.py                    |   1 +
 tests/tasks/test_switch.py                         |   3 +-
 tests/testing/path.py                              |   1 +
 tox.ini                                            |   8 +
 103 files changed, 3517 insertions(+), 646 deletions(-)

diff --git a/DEVELOP.md b/DEVELOP.md
index 9972094..efa25a2 100644
--- a/DEVELOP.md
+++ b/DEVELOP.md
@@ -1,20 +1,20 @@
 <!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
 -->
 
 # Develop
@@ -41,13 +41,12 @@ Next, we have to open pydolphinscheduler project in you editor. We recommend you
 instead of [IntelliJ IDEA][idea] to open it. And you could just open directory
 `dolphinscheduler-python/pydolphinscheduler` instead of `dolphinscheduler-python`.
 
-
 ## Brief Concept
 
 Apache DolphinScheduler is design to define workflow by UI, and pydolphinscheduler try to define it by code. When
 define by code, user usually do not care user, tenant, or queue exists or not. All user care about is created
 a new workflow by the code his/her definition. So we have some **side object** in `pydolphinscheduler/side`
-directory, their only check object exists or not, and create them if not exists. 
+directory, their only check object exists or not, and create them if not exists.
 
 ### Process Definition
 
@@ -106,7 +105,7 @@ and the next time you run it will be faster.
 
 If you failed section `lint` when you run command `tox -e local-ci`, you could try to run command `tox -e auto-lint`
 which we provider fix as many lints as possible. When I finish, you could run command `tox -e local-ci` to see
-whether the linter pass or not, you have to fix it by yourself if linter still fail. 
+whether the linter pass or not, you have to fix it by yourself if linter still fail.
 
 ### Manually
 
@@ -119,7 +118,7 @@ maybe you could follow [Black-integration][black-editor] to configure them in yo
 Our Python API CI would automatically run code style checker and unittest when you submit pull request in
 GitHub, you could also run static check locally.
 
-We recommend [pre-commit](https://pre-commit.com/) to do the checker mentioned above before you develop locally. 
+We recommend [pre-commit](https://pre-commit.com/) to do the checker mentioned above before you develop locally.
 You should install `pre-commit` by running
 
 ```shell
@@ -148,7 +147,7 @@ python -m flake8
 
 ## Build Document
 
-We use [sphinx][sphinx] to build docs. Dolphinscheduler Python API CI would automatically build docs when you submit pull request in 
+We use [sphinx][sphinx] to build docs. Dolphinscheduler Python API CI would automatically build docs when you submit pull request in
 GitHub. You may locally ensure docs could be built successfully in case the failure blocks CI, you can build by tox or manual.
 
 ### Build Document Automatically with tox
@@ -169,7 +168,7 @@ To build docs locally, install sphinx and related python modules first via:
 
 ```shell
 python -m pip install '.[doc]'
-``` 
+```
 
 Then go to document directory and execute the build command
 
@@ -205,11 +204,14 @@ It would not only run unit test but also show each file coverage which cover rat
 line show you total coverage of you code. If your CI failed with coverage you could go and find some reason by
 this command output.
 
-#### Integrate Test
+### Integrate Test
 
 Integrate Test can not run when you execute command `tox -e local-ci` because it needs external environment
 including [Docker](https://docs.docker.com/get-docker/) and specific image build by [maven](https://maven.apache.org/install.html).
 Here we would show you the step to run integrate test in directory `dolphinscheduler-python/pydolphinscheduler/tests/integration`.
+There are two ways to run integrate tests.
+
+#### Method 1: Launch Docker Container Locally
 
 ```shell
 # Go to project root directory and build Docker image
@@ -227,6 +229,15 @@ cd ../../
 tox -e integrate-test
 ```
 
+#### Method 2: Start Standalone Server in IntelliJ IDEA
+
+```shell
+# Start the standalone server in IDEA
+
+# Go to pydolphinscheduler root directory and run integrate tests
+tox -e local-integrate-test
+```
+
 ## Add LICENSE When New Dependencies Adding
 
 When you add a new package in pydolphinscheduler, you should also add the package's LICENSE to directory
diff --git a/README.md b/README.md
index 71119bd..7fc73d6 100644
--- a/README.md
+++ b/README.md
@@ -1,20 +1,20 @@
 <!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
 -->
 
 # pydolphinscheduler
@@ -71,10 +71,10 @@ python ./tutorial.py
 
 > **_NOTICE:_** Since Apache DolphinScheduler's tenant is requests while running command, you might need to change
 > tenant value in `example/tutorial.py`. For now the value is `tenant_exists`, please change it to username exists
-> in you environment. 
+> in you environment.
 
 After command execute, you could see a new project with single process definition named *tutorial* in the
-[UI-project list](https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/project/project-list.html). 
+[UI-project list](https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/project/project-list.html).
 
 ## Develop
 
diff --git a/RELEASE.md b/RELEASE.md
index c90107a..e00ef05 100644
--- a/RELEASE.md
+++ b/RELEASE.md
@@ -1,20 +1,20 @@
 <!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
 -->
 
 # Release
diff --git a/UPDATING.md b/UPDATING.md
index d772c6f..e918b1e 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -1,20 +1,20 @@
 <!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
 -->
 
 # UPDATING
@@ -24,10 +24,9 @@ It started after version 2.0.5 released
 
 ## dev
 
-* Change variable about where to keep pydolphinscheduler configuration from ``PYDOLPHINSCHEDULER_HOME`` to
-  ``PYDS_HOME`` which is same as other environment variable name.
+* Remove parameter ``task_location`` in process definition and Java Gateway service ([#11681](https://github.com/apache/dolphinscheduler/pull/11681))
 
-## 3.0.0a0
+## 3.0.0
 
 * Integrate Python gateway server into Dolphinscheduler API server, and you could start Python gateway service by command
   `./bin/dolphinscheduler-daemon.sh start api-server` instead of independent command
@@ -35,3 +34,6 @@ It started after version 2.0.5 released
 * Remove parameter `queue` from class `ProcessDefinition` to avoid confuse user when it change but not work
 * Change `yaml_parser.py` method `to_string` to magic method `__str__` make it more pythonic.
 * Use package ``ruamel.yaml`` replace ``pyyaml`` for write yaml file with comment.
+* Change variable about where to keep pydolphinscheduler configuration from ``PYDOLPHINSCHEDULER_HOME`` to
+  ``PYDS_HOME`` which is same as other environment variable name.
+
diff --git a/docs/source/tasks/condition.rst b/docs/source/tasks/condition.rst
index 20b0350..f6d7e6a 100644
--- a/docs/source/tasks/condition.rst
+++ b/docs/source/tasks/condition.rst
@@ -31,3 +31,10 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.condition
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Condition.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/docs/source/tasks/datax.rst b/docs/source/tasks/datax.rst
index c077269..cb67a2f 100644
--- a/docs/source/tasks/datax.rst
+++ b/docs/source/tasks/datax.rst
@@ -31,3 +31,16 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.datax
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/DataX.yaml
+   :start-after: # under the License.
+   :language: yaml
+
+
+example_datax.json:
+
+.. literalinclude:: ../../../examples/yaml_define/example_datax.json
+   :language: json
diff --git a/docs/source/tasks/dependent.rst b/docs/source/tasks/dependent.rst
index fe26d0f..d8e1599 100644
--- a/docs/source/tasks/dependent.rst
+++ b/docs/source/tasks/dependent.rst
@@ -31,3 +31,17 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.dependent
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Dependent.yaml
+   :start-after: # under the License.
+   :language: yaml
+
+Dependent_External.yaml:
+
+.. literalinclude:: ../../../examples/yaml_define/Dependent_External.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/docs/source/tasks/map_reduce.rst b/docs/source/tasks/dvc.rst
similarity index 76%
copy from docs/source/tasks/map_reduce.rst
copy to docs/source/tasks/dvc.rst
index 068b8d8..0127a98 100644
--- a/docs/source/tasks/map_reduce.rst
+++ b/docs/source/tasks/dvc.rst
@@ -15,20 +15,27 @@
    specific language governing permissions and limitations
    under the License.
 
-Map Reduce
-==========
+DVC
+===
 
-
-A Map Reduce task type's example and dive into information of **PyDolphinScheduler**.
+A DVC task type's example and dive into information of **PyDolphinScheduler**.
 
 Example
 -------
 
-.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_map_reduce_example.py
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_dvc_example.py
    :start-after: [start workflow_declare]
    :end-before: [end workflow_declare]
 
 Dive Into
 ---------
 
-.. automodule:: pydolphinscheduler.tasks.map_reduce
+.. automodule:: pydolphinscheduler.tasks.dvc
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Dvc.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/docs/source/tasks/flink.rst b/docs/source/tasks/flink.rst
index 8db9ac2..76eb484 100644
--- a/docs/source/tasks/flink.rst
+++ b/docs/source/tasks/flink.rst
@@ -31,3 +31,10 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.flink
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Flink.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/docs/source/tasks/func_wrap.rst b/docs/source/tasks/func_wrap.rst
index 5f41b80..a4a2972 100644
--- a/docs/source/tasks/func_wrap.rst
+++ b/docs/source/tasks/func_wrap.rst
@@ -30,4 +30,4 @@ Example
 Dive Into
 ---------
 
-.. automodule:: pydolphinscheduler.tasks.func_wrap
\ No newline at end of file
+.. automodule:: pydolphinscheduler.tasks.func_wrap
diff --git a/docs/source/tasks/http.rst b/docs/source/tasks/http.rst
index 4c6d8f8..4e138c9 100644
--- a/docs/source/tasks/http.rst
+++ b/docs/source/tasks/http.rst
@@ -19,3 +19,11 @@ HTTP
 ====
 
 .. automodule:: pydolphinscheduler.tasks.http
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Http.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/docs/source/tasks/index.rst b/docs/source/tasks/index.rst
index d6bbb96..3f83f92 100644
--- a/docs/source/tasks/index.rst
+++ b/docs/source/tasks/index.rst
@@ -40,3 +40,9 @@ In this section
 
    datax
    sub_process
+
+   sagemaker
+   mlflow
+   openmldb
+   pytorch
+   dvc
diff --git a/docs/source/tasks/map_reduce.rst b/docs/source/tasks/map_reduce.rst
index 068b8d8..7356880 100644
--- a/docs/source/tasks/map_reduce.rst
+++ b/docs/source/tasks/map_reduce.rst
@@ -32,3 +32,11 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.map_reduce
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/MapReduce.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/docs/source/tasks/switch.rst b/docs/source/tasks/mlflow.rst
similarity index 76%
copy from docs/source/tasks/switch.rst
copy to docs/source/tasks/mlflow.rst
index d8b34a4..b83903c 100644
--- a/docs/source/tasks/switch.rst
+++ b/docs/source/tasks/mlflow.rst
@@ -15,19 +15,28 @@
    specific language governing permissions and limitations
    under the License.
 
-Switch
-======
+MLflow
+=========
 
-A switch task type's example and dive into information of **PyDolphinScheduler**.
+
+A MLflow task type's example and dive into information of **PyDolphinScheduler**.
 
 Example
 -------
 
-.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_switch_example.py
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_mlflow_example.py
    :start-after: [start workflow_declare]
    :end-before: [end workflow_declare]
 
 Dive Into
 ---------
 
-.. automodule:: pydolphinscheduler.tasks.switch
+.. automodule:: pydolphinscheduler.tasks.mlflow
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/mlflow.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/docs/source/tasks/condition.rst b/docs/source/tasks/openmldb.rst
similarity index 75%
copy from docs/source/tasks/condition.rst
copy to docs/source/tasks/openmldb.rst
index 20b0350..125313d 100644
--- a/docs/source/tasks/condition.rst
+++ b/docs/source/tasks/openmldb.rst
@@ -15,19 +15,28 @@
    specific language governing permissions and limitations
    under the License.
 
-Condition
+OpenMLDB
 =========
 
-A condition task type's example and dive into information of **PyDolphinScheduler**.
+
+A OpenMLDB task type's example and dive into information of **PyDolphinScheduler**.
 
 Example
 -------
 
-.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_condition_example.py
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_openmldb_example.py
    :start-after: [start workflow_declare]
    :end-before: [end workflow_declare]
 
 Dive Into
 ---------
 
-.. automodule:: pydolphinscheduler.tasks.condition
+.. automodule:: pydolphinscheduler.tasks.openmldb
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/OpenMLDB.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/docs/source/tasks/procedure.rst b/docs/source/tasks/procedure.rst
index cd79eff..2f28efc 100644
--- a/docs/source/tasks/procedure.rst
+++ b/docs/source/tasks/procedure.rst
@@ -19,3 +19,11 @@ Procedure
 =========
 
 .. automodule:: pydolphinscheduler.tasks.procedure
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Procedure.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/docs/source/tasks/python.rst b/docs/source/tasks/python.rst
index 660e46a..1bf6210 100644
--- a/docs/source/tasks/python.rst
+++ b/docs/source/tasks/python.rst
@@ -19,3 +19,11 @@ Python
 ======
 
 .. automodule:: pydolphinscheduler.tasks.python
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Python.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/docs/source/tasks/map_reduce.rst b/docs/source/tasks/pytorch.rst
similarity index 75%
copy from docs/source/tasks/map_reduce.rst
copy to docs/source/tasks/pytorch.rst
index 068b8d8..4c7a552 100644
--- a/docs/source/tasks/map_reduce.rst
+++ b/docs/source/tasks/pytorch.rst
@@ -15,20 +15,28 @@
    specific language governing permissions and limitations
    under the License.
 
-Map Reduce
-==========
+Pytorch
+=======
 
 
-A Map Reduce task type's example and dive into information of **PyDolphinScheduler**.
+A Pytorch task type's example and dive into information of **PyDolphinScheduler**.
 
 Example
 -------
 
-.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_map_reduce_example.py
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_pytorch_example.py
    :start-after: [start workflow_declare]
    :end-before: [end workflow_declare]
 
 Dive Into
 ---------
 
-.. automodule:: pydolphinscheduler.tasks.map_reduce
+.. automodule:: pydolphinscheduler.tasks.pytorch
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Pytorch.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/docs/source/tasks/condition.rst b/docs/source/tasks/sagemaker.rst
similarity index 69%
copy from docs/source/tasks/condition.rst
copy to docs/source/tasks/sagemaker.rst
index 20b0350..36880d9 100644
--- a/docs/source/tasks/condition.rst
+++ b/docs/source/tasks/sagemaker.rst
@@ -15,19 +15,32 @@
    specific language governing permissions and limitations
    under the License.
 
-Condition
+SageMaker
 =========
 
-A condition task type's example and dive into information of **PyDolphinScheduler**.
+
+A SageMaker task type's example and dive into information of **PyDolphinScheduler**.
 
 Example
 -------
 
-.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_condition_example.py
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_sagemaker_example.py
    :start-after: [start workflow_declare]
    :end-before: [end workflow_declare]
 
 Dive Into
 ---------
 
-.. automodule:: pydolphinscheduler.tasks.condition
+.. automodule:: pydolphinscheduler.tasks.sagemaker
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Sagemaker.yaml
+   :start-after: # under the License.
+   :language: yaml
+
+example_sagemaker_params.json:
+
+.. literalinclude:: ../../../examples/yaml_define/example_sagemaker_params.json
+   :language: json
diff --git a/docs/source/tasks/shell.rst b/docs/source/tasks/shell.rst
index 5ce16c3..2dd106a 100644
--- a/docs/source/tasks/shell.rst
+++ b/docs/source/tasks/shell.rst
@@ -31,3 +31,11 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.shell
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Shell.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/docs/source/tasks/spark.rst b/docs/source/tasks/spark.rst
index cdb5902..d5a51db 100644
--- a/docs/source/tasks/spark.rst
+++ b/docs/source/tasks/spark.rst
@@ -31,3 +31,11 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.spark
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Spark.yaml
+   :start-after: # under the License.
+   :language: yaml
diff --git a/docs/source/tasks/sql.rst b/docs/source/tasks/sql.rst
index 21eaec7..52df042 100644
--- a/docs/source/tasks/sql.rst
+++ b/docs/source/tasks/sql.rst
@@ -19,3 +19,17 @@ SQL
 ===
 
 .. automodule:: pydolphinscheduler.tasks.sql
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Sql.yaml
+   :start-after: # under the License.
+   :language: yaml
+
+example_sql.sql:
+
+.. literalinclude:: ../../../examples/yaml_define/example_sql.sql
+   :start-after: */
+   :language: sql
diff --git a/docs/source/tasks/sub_process.rst b/docs/source/tasks/sub_process.rst
index 8a9f562..894dd0f 100644
--- a/docs/source/tasks/sub_process.rst
+++ b/docs/source/tasks/sub_process.rst
@@ -19,3 +19,20 @@ Sub Process
 ===========
 
 .. automodule:: pydolphinscheduler.tasks.sub_process
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/SubProcess.yaml
+   :start-after: # under the License.
+   :language: yaml
+
+
+
+example_subprocess.yaml:
+
+.. literalinclude:: ../../../examples/yaml_define/example_sub_workflow.yaml
+   :start-after: # under the License.
+   :language: yaml
+
diff --git a/docs/source/tasks/switch.rst b/docs/source/tasks/switch.rst
index d8b34a4..2fef589 100644
--- a/docs/source/tasks/switch.rst
+++ b/docs/source/tasks/switch.rst
@@ -31,3 +31,12 @@ Dive Into
 ---------
 
 .. automodule:: pydolphinscheduler.tasks.switch
+
+
+YAML file example
+-----------------
+
+.. literalinclude:: ../../../examples/yaml_define/Switch.yaml
+   :start-after: # under the License.
+   :language: yaml
+
diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst
index 6366c80..57d21b2 100644
--- a/docs/source/tutorial.rst
+++ b/docs/source/tutorial.rst
@@ -37,6 +37,8 @@ There are two types of tutorials: traditional and task decorator.
   versatility to the traditional way because it only supported Python functions and without build-in tasks
   supported. But it is helpful if your workflow is all built with Python or if you already have some Python
   workflow code and want to migrate them to pydolphinscheduler.
+- **YAML File**: We can use pydolphinscheduler CLI to create process using YAML file: :code:`pydolphinscheduler yaml -f tutorial.yaml`. 
+  We can find more YAML file examples in `examples/yaml_define <https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-python/pydolphinscheduler/examples/yaml_define>`_
 
 .. tab:: Tradition
 
@@ -52,6 +54,12 @@ There are two types of tutorials: traditional and task decorator.
       :start-after: [start tutorial]
       :end-before: [end tutorial]
 
+.. tab:: YAML File
+
+   .. literalinclude:: ../../examples/yaml_define/tutorial.yaml
+      :start-after: # under the License.
+      :language: yaml
+
 Import Necessary Module
 -----------------------
 
@@ -104,6 +112,13 @@ will be running this task in the DolphinScheduler worker. See :ref:`section tena
       :start-after: [start workflow_declare]
       :end-before: [end workflow_declare]
 
+.. tab:: YAML File
+
+   .. literalinclude:: ../../examples/yaml_define/tutorial.yaml
+      :start-after: # under the License.
+      :end-before: # Define the tasks under the workflow
+      :language: yaml
+
 We could find more detail about :code:`ProcessDefinition` in :ref:`concept about process definition <concept:process definition>`
 if you are interested in it. For all arguments of object process definition, you could find in the
 :class:`pydolphinscheduler.core.process_definition` API documentation.
@@ -139,6 +154,12 @@ Task Declaration
    It makes our workflow more Pythonic, but be careful that when we use task decorator mode mean we only use
    Python function as a task and could not use the :doc:`built-in tasks <tasks/index>` most of the cases.
 
+.. tab:: YAML File
+
+   .. literalinclude:: ../../examples/yaml_define/tutorial.yaml
+      :start-after: # Define the tasks under the workflow 
+      :language: yaml
+
 Setting Task Dependence
 -----------------------
 
@@ -167,6 +188,14 @@ and task `task_child_two` was done, because both two task is `task_union`'s upst
       :start-after: [start task_relation_declare]
       :end-before: [end task_relation_declare]
 
+.. tab:: YAML File
+
+   We can use :code:`deps:[]` to set task dependence
+
+   .. literalinclude:: ../../examples/yaml_define/tutorial.yaml
+      :start-after: # Define the tasks under the workflow 
+      :language: yaml
+
 .. note::
 
    We could set task dependence in batch mode if they have the same downstream or upstream by declaring those
@@ -198,6 +227,17 @@ will create workflow definition as well as workflow schedule.
       :start-after: [start submit_or_run]
       :end-before: [end submit_or_run]
 
+.. tab:: YAML File
+
+   pydolphinscheduler YAML CLI always submit workflow. We can run the workflow if we set :code:`run: true`
+
+   .. code-block:: yaml
+
+     # Define the workflow
+     workflow:
+       name: "tutorial"
+       run: true
+
 At last, we could execute this workflow code in your terminal like other Python scripts, running
 :code:`python tutorial.py` to trigger and execute it.
 
@@ -219,5 +259,61 @@ named "tutorial" or "tutorial_decorator". The task graph of workflow like below:
    :language: text
    :lines: 24-28
 
+Create Process Using YAML File
+------------------------------
+
+We can use pydolphinscheduler CLI to create process using YAML file
+
+.. code-block:: bash
+
+   pydolphinscheduler yaml -f Shell.yaml
+
+We can use the following four special grammars to define workflows more flexibly.
+
+- :code:`$FILE{"file_name"}`: Read the file (:code:`file_name`) contents and replace them to that location.
+- :code:`$WORKFLOW{"other_workflow.yaml"}`: Refer to another process defined using YAML file (:code:`other_workflow.yaml`) and replace the process name in this location.
+- :code:`$ENV{env_name}`: Read the environment variable (:code:`env_name`) and replace it to that location.
+- :code:`${CONFIG.key_name}`: Read the configuration value of key (:code:`key_name`) and it them to that location.
+
+
+In addition, when loading the file path use :code:`$FILE{"file_name"}` or :code:`$WORKFLOW{"other_workflow.yaml"}`, pydolphinscheduler will search in the path of the YAMl file if the file does not exist.
+
+For exmaples, our file directory structure is as follows:
+
+.. code-block:: bash
+
+   .
+   └── yaml_define
+       ├── Condition.yaml
+       ├── DataX.yaml
+       ├── Dependent_External.yaml
+       ├── Dependent.yaml
+       ├── example_datax.json
+       ├── example_sql.sql
+       ├── example_subprocess.yaml
+       ├── Flink.yaml
+       ├── Http.yaml
+       ├── MapReduce.yaml
+       ├── MoreConfiguration.yaml
+       ├── Procedure.yaml
+       ├── Python.yaml
+       ├── Shell.yaml
+       ├── Spark.yaml
+       ├── Sql.yaml
+       ├── SubProcess.yaml
+       └── Switch.yaml
+
+After we run
+
+.. code-block:: bash
+
+   pydolphinscheduler yaml -file yaml_define/SubProcess.yaml
+
+
+the :code:`$WORKFLOW{"example_sub_workflow.yaml"}` will be set to :code:`$WORKFLOW{"yaml_define/example_sub_workflow.yaml"}`, because :code:`./example_subprocess.yaml` does not exist and :code:`yaml_define/example_sub_workflow.yaml` does.
+
+Furthermore, this feature supports recursion all the way down.
+
+
 .. _`DolphinScheduler project page`: https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/project.html
 .. _`Python context manager`: https://docs.python.org/3/library/stdtypes.html#context-manager-types
diff --git a/examples/yaml_define/Condition.yaml b/examples/yaml_define/Condition.yaml
new file mode 100644
index 0000000..c65b8c7
--- /dev/null
+++ b/examples/yaml_define/Condition.yaml
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "Condition"
+
+# Define the tasks under the workflow
+tasks:
+  - { "task_type": "Shell", "name": "pre_task_1", "command": "echo pre_task_1" }
+  - { "task_type": "Shell", "name": "pre_task_2", "command": "echo pre_task_2" }
+  - { "task_type": "Shell", "name": "pre_task_3", "command": "echo pre_task_3" }
+  - { "task_type": "Shell", "name": "success_branch", "command": "echo success_branch" }
+  - { "task_type": "Shell", "name": "fail_branch", "command": "echo fail_branch" }
+
+  - name: condition
+    task_type: Condition
+    success_task: success_branch
+    failed_task: fail_branch
+    op: AND
+    groups:
+      - op: AND
+        groups:
+          - task: pre_task_1
+            flag: true
+          - task: pre_task_2
+            flag: true
+          - task: pre_task_3
+            flag: false
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/DataX.yaml
similarity index 58%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/DataX.yaml
index ebfb893..00ecd54 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/DataX.yaml
@@ -15,24 +15,19 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "DataX"
 
-from pydolphinscheduler.core.resource import Resource
+# Define the tasks under the workflow
+tasks:
+  - name: task
+    task_type: DataX
+    datasource_name: db
+    datatarget_name: db
+    sql: show tables;
+    target_table: table_test
 
-
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+  - name: task_custon_config
+    task_type: CustomDataX
+    json: $FILE{"example_datax.json"}
diff --git a/examples/yaml_define/Dependent.yaml b/examples/yaml_define/Dependent.yaml
new file mode 100644
index 0000000..d69fac0
--- /dev/null
+++ b/examples/yaml_define/Dependent.yaml
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+workflow:
+  name: "Dependent"
+
+# Define the tasks under the workflow
+tasks:
+  - name: dependent
+    task_type: Dependent
+    denpendence:
+    op: and
+    groups:
+      - op: or
+        groups:
+          - project_name: pydolphin
+            process_definition_name: task_dependent_external
+            dependent_task_name: task_1
+
+          - project_name: pydolphin
+            process_definition_name: task_dependent_external
+            dependent_task_name: task_2
+
+      - op: and
+        groups:
+          - project_name: pydolphin
+            process_definition_name: task_dependent_external
+            dependent_task_name: task_1
+            dependent_date: LAST_WEDNESDAY 
+
+          - project_name: pydolphin
+            process_definition_name: task_dependent_external
+            dependent_task_name: task_2
+            dependent_date: last24Hours 
+
+  - name: dependent_var
+    task_type: Dependent
+    denpendence:
+    op: and
+    groups:
+      - op: or
+        # we can use ${CONFIG.WORKFLOW_PROJECT} to set the value to configuration.WORKFLOW_PROJECT
+        # we can use $WORKFLOW{"Dependent_External.yaml"} to create or update a workflow from dependent_external.yaml and set the value to that workflow name
+        groups:
+          - project_name: ${CONFIG.WORKFLOW_PROJECT} 
+            process_definition_name: $WORKFLOW{"Dependent_External.yaml"} 
+            dependent_task_name: task_1
+
+          - project_name: ${CONFIG.WORKFLOW_PROJECT} 
+            process_definition_name: $WORKFLOW{"Dependent_External.yaml"} 
+            dependent_task_name: task_2
+      - op: and
+        groups:
+          - project_name: ${CONFIG.WORKFLOW_PROJECT} 
+            process_definition_name: $WORKFLOW{"Dependent_External.yaml"} 
+            dependent_task_name: task_1
+            dependent_date: LAST_WEDNESDAY 
+
+          - project_name: ${CONFIG.WORKFLOW_PROJECT} 
+            process_definition_name: $WORKFLOW{"Dependent_External.yaml"} 
+            dependent_task_name: task_2
+            dependent_date: last24Hours 
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/Dependent_External.yaml
similarity index 58%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/Dependent_External.yaml
index ebfb893..577ff6a 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/Dependent_External.yaml
@@ -15,24 +15,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "task_dependent_external"
 
-from pydolphinscheduler.core.resource import Resource
-
-
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+# Define the tasks under the workflow
+tasks:
+  - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
+  - { "task_type": "Shell", "name": "task_2", "command": "echo task 2" }
+  - { "task_type": "Shell", "name": "task_3", "command": "echo task 3" }
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/Dvc.yaml
similarity index 52%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/Dvc.yaml
index ebfb893..a6ec18c 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/Dvc.yaml
@@ -15,24 +15,32 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define variable `repository`
+repository: &repository "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git" 
 
-from pydolphinscheduler.core.resource import Resource
+# Define the workflow
+workflow:
+  name: "DVC"
+  release_state: "offline"
 
+# Define the tasks under the process
+tasks:
+  - name: init_dvc 
+    task_type: DVCInit
+    repository: *repository
+    store_url: ~/dvc_data
 
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+  - name: upload_data
+    task_type: DVCUpload
+    repository: *repository
+    data_path_in_dvc_repository: "iris"
+    data_path_in_worker: ~/source/iris
+    version: v1
+    message: upload iris data v1
+
+  - name: download_data
+    task_type: DVCDownload
+    repository: *repository
+    data_path_in_dvc_repository: "iris"
+    data_path_in_worker: ~/target/iris
+    version: v1
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/Flink.yaml
similarity index 58%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/Flink.yaml
index ebfb893..2449d43 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/Flink.yaml
@@ -15,24 +15,15 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "Flink"
 
-from pydolphinscheduler.core.resource import Resource
-
-
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+# Define the tasks under the workflow
+tasks:
+  - name: task
+    task_type: Flink
+    main_class: org.apache.flink.streaming.examples.wordcount.WordCount
+    main_package: test_java.jar
+    program_type: JAVA
+    deploy_mode: local
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/Http.yaml
similarity index 58%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/Http.yaml
index ebfb893..1483aeb 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/Http.yaml
@@ -15,24 +15,23 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "Http"
 
-from pydolphinscheduler.core.resource import Resource
-
-
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+# Define the tasks under the workflow
+tasks:
+  - name: task
+    task_type: Http
+    url: "https://httpbin.org/get"
+    http_method: "GET"
+    http_params:
+      - { "prop": "a", "httpParametersType": "PARAMETER", "value": "1" }
+      - { "prop": "b", "httpParametersType": "PARAMETER", "value": "2" }
+      - {
+          "prop": "Content-Type",
+          "httpParametersType": "header",
+          "value": "test",
+        }
+    http_check_condition: "STATUS_CODE_CUSTOM"
+    condition: "404"
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/MapReduce.yaml
similarity index 58%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/MapReduce.yaml
index ebfb893..e1a2b57 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/MapReduce.yaml
@@ -15,24 +15,15 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "MapReduce"
 
-from pydolphinscheduler.core.resource import Resource
-
-
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+# Define the tasks under the workflow
+tasks:
+  - name: task
+    task_type: MR
+    main_class: wordcount
+    main_package: test_java.jar
+    program_type: SCALA
+    main_args: /dolphinscheduler/tenant_exists/resources/file.txt /output/ds
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/MoreConfiguration.yaml
similarity index 58%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/MoreConfiguration.yaml
index ebfb893..258aa33 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/MoreConfiguration.yaml
@@ -15,24 +15,26 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "MoreConfiguration"
+  param:
+    n: 1
 
-from pydolphinscheduler.core.resource import Resource
-
-
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+# Define the tasks under the workflow
+tasks:
+  - name: shell_0
+    task_type: Shell
+    description: "yaml define task"
+    flag: "YES"
+    command: |
+      echo "$ENV{HOME}"
+      echo "${n}"
+    task_priority: "HIGH"
+    delay_time: 20
+    fail_retry_times: 30
+    fail_retry_interval: 5
+    timeout_flag: "CLOSE"
+    timeout: 60
+    local_params:
+      - { "prop": "n", "direct": "IN", "type": "VARCHAR", "value": "${n}" }
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/OpenMLDB.yaml
similarity index 58%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/OpenMLDB.yaml
index ebfb893..b455cb0 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/OpenMLDB.yaml
@@ -15,24 +15,19 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "OpenMLDB"
 
-from pydolphinscheduler.core.resource import Resource
-
-
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+# Define the tasks under the workflow
+tasks:
+  - name: OpenMLDB
+    task_type: OpenMLDB
+    zookeeper: "127.0.0.1:2181"
+    zookeeper_path: "/openmldb"
+    execute_mode: "online"
+    sql: |
+      USE demo_db;
+      set @@job_timeout=200000;
+      LOAD DATA INFILE 'file:///tmp/train_sample.csv'
+      INTO TABLE talkingdata OPTIONS(mode='overwrite');
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/Procedure.yaml
similarity index 58%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/Procedure.yaml
index ebfb893..829a961 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/Procedure.yaml
@@ -15,24 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "Procedure"
 
-from pydolphinscheduler.core.resource import Resource
-
-
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+# Define the tasks under the workflow
+tasks:
+  - name: task
+    task_type: Procedure
+    datasource_name: db
+    method: show tables;
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/Python.yaml
similarity index 58%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/Python.yaml
index ebfb893..728b5c9 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/Python.yaml
@@ -15,24 +15,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "Python"
 
-from pydolphinscheduler.core.resource import Resource
-
-
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+# Define the tasks under the workflow
+tasks:
+  - name: python
+    task_type: Python
+    definition: |
+      import os
+      print(os)
+      print("1")
+      print("2")
diff --git a/examples/yaml_define/Pytorch.yaml b/examples/yaml_define/Pytorch.yaml
new file mode 100644
index 0000000..8706824
--- /dev/null
+++ b/examples/yaml_define/Pytorch.yaml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Define the workflow
+workflow:
+  name: "Pytorch"
+
+# Define the tasks under the workflow
+tasks:
+
+  # run project with existing environment
+  - name: task_existing_env
+    task_type: pytorch
+    script: main.py
+    script_params: --dry-run --no-cuda
+    project_path: https://github.com/pytorch/examples#mnist
+    python_command: /home/anaconda3/envs/pytorch/bin/python3
+
+
+  # run project with creating conda environment
+  - name: task_conda_env
+    task_type: pytorch
+    script: main.py
+    script_params: --dry-run --no-cuda
+    project_path: https://github.com/pytorch/examples#mnist
+    is_create_environment: True
+    python_env_tool: conda
+    requirements: requirements.txt
+    conda_python_version: 3.7
+
+  # run project with creating virtualenv environment
+  - name: task_virtualenv_env
+    task_type: pytorch
+    script: main.py
+    script_params: --dry-run --no-cuda
+    project_path: https://github.com/pytorch/examples#mnist
+    is_create_environment: True
+    python_env_tool: virtualenv
+    requirements: requirements.txt
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/Sagemaker.yaml
similarity index 58%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/Sagemaker.yaml
index ebfb893..9f77a3c 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/Sagemaker.yaml
@@ -15,24 +15,14 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "Sagemaker"
+  release_state: "offline"
 
-from pydolphinscheduler.core.resource import Resource
+# Define the tasks under the process
+tasks:
+  - name: sagemaker
+    task_type: Sagemaker
+    sagemaker_request_json: $FILE{"example_sagemaker_params.json"}
 
-
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/Shell.yaml
similarity index 58%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/Shell.yaml
index ebfb893..fdbe126 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/Shell.yaml
@@ -15,24 +15,26 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "Shell"
+  release_state: "offline"
+  run: true
 
-from pydolphinscheduler.core.resource import Resource
+# Define the tasks under the process
+tasks:
+  - name: task_parent
+    task_type: Shell
+    command: |
+      echo hello pydolphinscheduler 
+      echo run task parent
 
+  - name: task_child_one
+    task_type: Shell
+    deps: [task_parent]
+    command: echo "child one"
 
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+  - name: task_child_two
+    task_type: Shell
+    deps: [task_parent]
+    command: echo "child two"
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/Spark.yaml
similarity index 58%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/Spark.yaml
index ebfb893..6132b8d 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/Spark.yaml
@@ -15,24 +15,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "Spark"
 
-from pydolphinscheduler.core.resource import Resource
-
-
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+# Define the tasks under the workflow
+tasks:
+  - name: task
+    task_type: Spark
+    main_class: org.apache.spark.examples.SparkPi
+    main_package: test_java.jar
+    program_type: SCALA
+    deploy_mode: local
+    spark_version: SPARK1
diff --git a/src/pydolphinscheduler/core/resource.py b/examples/yaml_define/Sql.yaml
similarity index 50%
copy from src/pydolphinscheduler/core/resource.py
copy to examples/yaml_define/Sql.yaml
index a3aab81..c3c7e88 100644
--- a/src/pydolphinscheduler/core/resource.py
+++ b/examples/yaml_define/Sql.yaml
@@ -15,29 +15,31 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Module resource."""
-
-from typing import Optional
-
-from pydolphinscheduler.models import Base
-
-
-class Resource(Base):
-    """resource object, will define the resources that you want to create or update.
-
-    :param name: The fullname of resource.Includes path and suffix.
-    :param content: The description of resource.
-    :param description: The description of resource.
-    """
-
-    _DEFINE_ATTR = {"name", "content", "description"}
-
-    def __init__(
-        self,
-        name: str,
-        content: str,
-        description: Optional[str] = None,
-    ):
-        super().__init__(name, description)
-        self.content = content
-        self._resource_code = None
+# Define the workflow
+workflow:
+  name: "Sql"
+
+# Define the tasks under the workflow
+tasks:
+  - name: task_base
+    task_type: Sql
+    datasource_name: "db"
+    sql: show tables;
+
+  - name: task_multi_line
+    task_type: Sql
+    datasource_name: "db"
+    sql: |
+      show tables;
+      select id from version where id=1;
+
+  - name: task_file
+    task_type: Sql
+    datasource_name: "db"
+    sql: $FILE{"example_sql.sql"}
+
+  # Or you can define task "task_union" it with one line
+  - { "task_type": "Sql", "name": "task_base_one_line", "datasource_name": "db", "sql": "select id from version where id=1;"}
+
+  # Or you can define task "task_union" it with one line
+  - { "task_type": "Sql", "name": "task_file_one_line", "datasource_name": "db", "sql": '$FILE{"example_sql.sql"}'}
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/SubProcess.yaml
similarity index 58%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/SubProcess.yaml
index ebfb893..0ea7549 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/SubProcess.yaml
@@ -15,24 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "SubWorkflow"
 
-from pydolphinscheduler.core.resource import Resource
+tasks:
+  - name: example_workflow
+    task_type: SubProcess
+    process_definition_name: $WORKFLOW{"example_sub_workflow.yaml"}
 
-
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+  - { "task_type": "Shell", "deps": [example_workflow], "name": "task_3", "command": "echo task 3" }
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/Switch.yaml
similarity index 58%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/Switch.yaml
index ebfb893..33ed688 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/Switch.yaml
@@ -15,24 +15,25 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "Switch"
+  param:
+    var: 1
 
-from pydolphinscheduler.core.resource import Resource
+# Define the tasks under the workflow
+tasks:
+  - name: switch_child_1
+    task_type: Shell
+    command: echo switch_child_1
 
+  - name: switch_child_2
+    task_type: Shell
+    command: echo switch_child_2
 
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+  - name: switch
+    task_type: Switch
+    condition:
+      - task: switch_child_1
+        condition: "${var} > 1"
+      - task: switch_child_2
diff --git a/examples/yaml_define/example_datax.json b/examples/yaml_define/example_datax.json
new file mode 100644
index 0000000..3db8092
--- /dev/null
+++ b/examples/yaml_define/example_datax.json
@@ -0,0 +1,62 @@
+{
+  "job": {
+    "content": [
+      {
+        "reader": {
+          "name": "mysqlreader",
+          "parameter": {
+            "username": "usr",
+            "password": "pwd",
+            "column": [
+              "id",
+              "name",
+              "code",
+              "description"
+            ],
+            "splitPk": "id",
+            "connection": [
+              {
+                "table": [
+                  "source_table"
+                ],
+                "jdbcUrl": [
+                  "jdbc:mysql://127.0.0.1:3306/source_db"
+                ]
+              }
+            ]
+          }
+        },
+        "writer": {
+          "name": "mysqlwriter",
+          "parameter": {
+            "writeMode": "insert",
+            "username": "usr",
+            "password": "pwd",
+            "column": [
+              "id",
+              "name"
+            ],
+            "connection": [
+              {
+                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db",
+                "table": [
+                  "target_table"
+                ]
+              }
+            ]
+          }
+        }
+      }
+    ],
+    "setting": {
+      "errorLimit": {
+        "percentage": 0,
+        "record": 0
+      },
+      "speed": {
+        "channel": 1,
+        "record": 1000
+      }
+    }
+  }
+}
diff --git a/examples/yaml_define/example_sagemaker_params.json b/examples/yaml_define/example_sagemaker_params.json
new file mode 100644
index 0000000..9403320
--- /dev/null
+++ b/examples/yaml_define/example_sagemaker_params.json
@@ -0,0 +1,18 @@
+{
+    "ParallelismConfiguration":{
+        "MaxParallelExecutionSteps":1
+    },
+    "PipelineExecutionDescription":"run pipeline using ds",
+    "PipelineExecutionDisplayName":"ds-sagemaker-pipeline",
+    "PipelineName":"DsSagemakerPipeline",
+    "PipelineParameters":[
+        {
+            "Name":"InputData",
+            "Value": "s3://sagemaker/dataset/dataset.csv"
+        },
+        {
+            "Name":"InferenceData",
+            "Value": "s3://sagemaker/dataset/inference.csv"
+        }
+    ]
+}
diff --git a/examples/yaml_define/example_sql.sql b/examples/yaml_define/example_sql.sql
new file mode 100644
index 0000000..06b5c4c
--- /dev/null
+++ b/examples/yaml_define/example_sql.sql
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+select id from version where id=1;
+select id from version where id=2;
+select id from version where id=3;
+select id from version where id=4;
+select id from version where id=5;
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/example_sub_workflow.yaml
similarity index 58%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/example_sub_workflow.yaml
index ebfb893..af3a863 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/example_sub_workflow.yaml
@@ -15,24 +15,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "example_workflow_for_sub_workflow"
 
-from pydolphinscheduler.core.resource import Resource
-
-
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+# Define the tasks under the workflow
+tasks:
+  - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" }
+  - { "task_type": "Shell", "deps": [task_1], "name": "task_2", "command": "echo task 2" }
+  - { "task_type": "Shell", "deps": [task_2], "name": "task_3", "command": "echo task 3" }
diff --git a/examples/yaml_define/mlflow.yaml b/examples/yaml_define/mlflow.yaml
new file mode 100644
index 0000000..45e5672
--- /dev/null
+++ b/examples/yaml_define/mlflow.yaml
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# Define variable `mlflow_tracking_uri`
+mlflow_tracking_uri: &mlflow_tracking_uri "http://127.0.0.1:5000" 
+
+# Define the workflow
+workflow:
+  name: "MLflow"
+
+# Define the tasks under the workflow
+tasks:
+  - name: train_xgboost_native
+    task_type: MLFlowProjectsCustom 
+    repository: https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native
+    mlflow_tracking_uri: *mlflow_tracking_uri
+    parameters: -P learning_rate=0.2 -P colsample_bytree=0.8 -P subsample=0.9
+    experiment_name: xgboost
+
+  - name: train_automl
+    task_type: MLFlowProjectsAutoML 
+    mlflow_tracking_uri: *mlflow_tracking_uri
+    parameters: time_budget=30;estimator_list=['lgbm']
+    experiment_name: automl_iris
+    model_name: iris_A
+    automl_tool: flaml
+    data_path: /data/examples/iris
+
+  - name: deploy_docker
+    task_type: MLflowModels 
+    deps: [train_automl]
+    model_uri: models:/iris_A/Production
+    mlflow_tracking_uri: *mlflow_tracking_uri
+    deploy_mode: DOCKER
+    port: 7002
+
+  - name: train_basic_algorithm
+    task_type: MLFlowProjectsBasicAlgorithm 
+    mlflow_tracking_uri: *mlflow_tracking_uri
+    parameters: n_estimators=200;learning_rate=0.2
+    experiment_name: basic_algorithm_iris
+    model_name: iris_B
+    algorithm: lightgbm
+    data_path: /data/examples/iris
+    search_params: max_depth=[5, 10];n_estimators=[100, 200]
+
+  - name: deploy_mlflow
+    deps: [train_basic_algorithm]
+    task_type: MLflowModels
+    model_uri: models:/iris_B/Production
+    mlflow_tracking_uri: *mlflow_tracking_uri
+    deploy_mode: MLFLOW
+    port: 7001
+
diff --git a/tests/core/test_resource_definition.py b/examples/yaml_define/tutorial.yaml
similarity index 56%
copy from tests/core/test_resource_definition.py
copy to examples/yaml_define/tutorial.yaml
index ebfb893..104a8c3 100644
--- a/tests/core/test_resource_definition.py
+++ b/examples/yaml_define/tutorial.yaml
@@ -15,24 +15,32 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# Define the workflow
+workflow:
+  name: "tutorial"
+  schedule: "0 0 0 * * ? *"
+  start_time: "2021-01-01"
+  tenant: "tenant_exists"
+  release_state: "offline"
+  run: true
 
-from pydolphinscheduler.core.resource import Resource
+# Define the tasks under the workflow
+tasks:
+  - name: task_parent
+    task_type: Shell
+    command: echo hello pydolphinscheduler
 
+  - name: task_child_one
+    task_type: Shell
+    deps: [task_parent]
+    command: echo "child one"
 
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
-    )
-    assert resourceDefinition.get_define() == expect
+  - name: task_child_two
+    task_type: Shell
+    deps: [task_parent]
+    command: echo "child two"
+
+  - name: task_union
+    task_type: Shell
+    deps: [task_child_one, task_child_two]
+    command: echo "union"
diff --git a/setup.py b/setup.py
index 5d1903c..355b3a5 100644
--- a/setup.py
+++ b/setup.py
@@ -32,7 +32,7 @@ if sys.version_info[0] < 3:
 
 logger = logging.getLogger(__name__)
 
-version = "3.0.1"
+version = "3.1.0"
 
 # Start package required
 prod = [
diff --git a/src/pydolphinscheduler/cli/commands.py b/src/pydolphinscheduler/cli/commands.py
index d78e503..8d923f1 100644
--- a/src/pydolphinscheduler/cli/commands.py
+++ b/src/pydolphinscheduler/cli/commands.py
@@ -26,6 +26,7 @@ from pydolphinscheduler.configuration import (
     init_config_file,
     set_single_config,
 )
+from pydolphinscheduler.core.yaml_process_define import create_process_definition
 
 version_option_val = ["major", "minor", "micro"]
 
@@ -90,3 +91,16 @@ def config(getter, setter, init) -> None:
         for key, val in setter:
             set_single_config(key, val)
         click.echo("Set configuration done.")
+
+
+@cli.command()
+@click.option(
+    "--yaml_file",
+    "-f",
+    required=True,
+    help="YAML file path",
+    type=click.Path(exists=True),
+)
+def yaml(yaml_file) -> None:
+    """Create process definition using YAML file."""
+    create_process_definition(yaml_file)
diff --git a/src/pydolphinscheduler/constants.py b/src/pydolphinscheduler/constants.py
index a5089ac..fd640c5 100644
--- a/src/pydolphinscheduler/constants.py
+++ b/src/pydolphinscheduler/constants.py
@@ -57,6 +57,11 @@ class TaskType(str):
     FLINK = "FLINK"
     SPARK = "SPARK"
     MR = "MR"
+    SAGEMAKER = "SAGEMAKER"
+    MLFLOW = "MLFLOW"
+    OPENMLDB = "OPENMLDB"
+    PYTORCH = "PYTORCH"
+    DVC = "DVC"
 
 
 class DefaultTaskCodeNum(str):
diff --git a/src/pydolphinscheduler/core/database.py b/src/pydolphinscheduler/core/database.py
index b6602a6..4a93f22 100644
--- a/src/pydolphinscheduler/core/database.py
+++ b/src/pydolphinscheduler/core/database.py
@@ -22,7 +22,7 @@ from typing import Dict
 from py4j.protocol import Py4JJavaError
 
 from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 
 
 class Database(dict):
@@ -54,9 +54,8 @@ class Database(dict):
         if self._database:
             return self._database
         else:
-            gateway = launch_gateway()
             try:
-                self._database = gateway.entry_point.getDatasourceInfo(name)
+                self._database = JavaGate().get_datasource_info(name)
             # Handler database source do not exists error, for now we just terminate the process.
             except Py4JJavaError as ex:
                 raise PyDSParamException(str(ex.java_exception))
diff --git a/src/pydolphinscheduler/core/engine.py b/src/pydolphinscheduler/core/engine.py
index df84b5b..41021ed 100644
--- a/src/pydolphinscheduler/core/engine.py
+++ b/src/pydolphinscheduler/core/engine.py
@@ -23,7 +23,7 @@ from py4j.protocol import Py4JJavaError
 
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 
 
 class ProgramType(str):
@@ -62,9 +62,8 @@ class Engine(Task):
         if self._resource:
             return self._resource
         else:
-            gateway = launch_gateway()
             try:
-                self._resource = gateway.entry_point.getResourcesFileInfo(
+                self._resource = JavaGate().get_resources_file_info(
                     program_type, main_package
                 )
             # Handler source do not exists error, for now we just terminate the process.
diff --git a/src/pydolphinscheduler/core/process_definition.py b/src/pydolphinscheduler/core/process_definition.py
index 69dcbc1..df05b01 100644
--- a/src/pydolphinscheduler/core/process_definition.py
+++ b/src/pydolphinscheduler/core/process_definition.py
@@ -23,8 +23,9 @@ from typing import Any, Dict, List, Optional, Set
 
 from pydolphinscheduler import configuration
 from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.resource import Resource
 from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import Base, Project, Tenant, User
 from pydolphinscheduler.utils.date import MAX_DATETIME, conv_from_str, conv_to_schedule
 
@@ -110,7 +111,7 @@ class ProcessDefinition(Base):
         timeout: Optional[int] = 0,
         release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
         param: Optional[Dict] = None,
-        resource_list: Optional[List] = None,
+        resource_list: Optional[List[Resource]] = None,
     ):
         super().__init__(name, description)
         self.schedule = schedule
@@ -273,19 +274,6 @@ class ProcessDefinition(Base):
                 "timezoneId": self.timezone,
             }
 
-    # TODO inti DAG's tasks are in the same location with default {x: 0, y: 0}
-    @property
-    def task_location(self) -> List[Dict]:
-        """Return all tasks location for all process definition.
-
-        For now, we only set all location with same x and y valued equal to 0. Because we do not
-        find a good way to set task locations. This is requests from java gateway interface.
-        """
-        if not self.tasks:
-            return [self.tasks]
-        else:
-            return [{"taskCode": task_code, "x": 0, "y": 0} for task_code in self.tasks]
-
     @property
     def task_list(self) -> List["Task"]:  # noqa: F821
         """Return list of tasks objects."""
@@ -391,17 +379,14 @@ class ProcessDefinition(Base):
         self._ensure_side_model_exists()
         self._pre_submit_check()
 
-        gateway = launch_gateway()
-        self._process_definition_code = gateway.entry_point.createOrUpdateProcessDefinition(
+        self._process_definition_code = JavaGate().create_or_update_process_definition(
             self._user,
             self._project,
             self.name,
             str(self.description) if self.description else "",
             json.dumps(self.param_json),
-            json.dumps(self.schedule_json) if self.schedule_json else None,
             self.warning_type,
             self.warning_group_id,
-            json.dumps(self.task_location),
             self.timeout,
             self.worker_group,
             self._tenant,
@@ -409,16 +394,14 @@ class ProcessDefinition(Base):
             # TODO add serialization function
             json.dumps(self.task_relation_json),
             json.dumps(self.task_definition_json),
+            json.dumps(self.schedule_json) if self.schedule_json else None,
+            None,
             None,
         )
         if len(self.resource_list) > 0:
             for res in self.resource_list:
-                gateway.entry_point.createOrUpdateResource(
-                    self._user,
-                    res.name,
-                    res.description,
-                    res.content,
-                )
+                res.user_name = self._user
+                res.create_or_update_resource()
         return self._process_definition_code
 
     def start(self) -> None:
@@ -426,8 +409,7 @@ class ProcessDefinition(Base):
 
         which post to `start-process-instance` to java gateway
         """
-        gateway = launch_gateway()
-        gateway.entry_point.execProcessInstance(
+        JavaGate().exec_process_instance(
             self._user,
             self._project,
             self.name,
diff --git a/src/pydolphinscheduler/core/resource.py b/src/pydolphinscheduler/core/resource.py
index a3aab81..ea81191 100644
--- a/src/pydolphinscheduler/core/resource.py
+++ b/src/pydolphinscheduler/core/resource.py
@@ -19,6 +19,8 @@
 
 from typing import Optional
 
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import Base
 
 
@@ -28,16 +30,44 @@ class Resource(Base):
     :param name: The fullname of resource.Includes path and suffix.
     :param content: The description of resource.
     :param description: The description of resource.
+    :param user_name: The user name of resource.
     """
 
-    _DEFINE_ATTR = {"name", "content", "description"}
+    _DEFINE_ATTR = {"name", "content", "description", "user_name"}
 
     def __init__(
         self,
         name: str,
-        content: str,
+        content: Optional[str] = None,
         description: Optional[str] = None,
+        user_name: Optional[str] = None,
     ):
         super().__init__(name, description)
         self.content = content
+        self.user_name = user_name
         self._resource_code = None
+
+    def get_info_from_database(self):
+        """Get resource info from java gateway, contains resource id, name."""
+        if not self.user_name:
+            raise PyDSParamException(
+                "`user_name` is required when querying resources from python gate."
+            )
+        return JavaGate().query_resources_file_info(self.user_name, self.name)
+
+    def get_id_from_database(self):
+        """Get resource id from java gateway."""
+        return self.get_info_from_database().getId()
+
+    def create_or_update_resource(self):
+        """Create or update resource via java gateway."""
+        if not self.content or not self.user_name:
+            raise PyDSParamException(
+                "`user_name` and `content` are required when create or update resource from python gate."
+            )
+        JavaGate().create_or_update_resource(
+            self.user_name,
+            self.name,
+            self.content,
+            self.description,
+        )
diff --git a/src/pydolphinscheduler/core/task.py b/src/pydolphinscheduler/core/task.py
index 4d4e67e..5cbd21d 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -16,7 +16,7 @@
 # under the License.
 
 """DolphinScheduler Task and TaskRelation object."""
-
+import copy
 from logging import getLogger
 from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
 
@@ -32,7 +32,9 @@ from pydolphinscheduler.core.process_definition import (
     ProcessDefinition,
     ProcessDefinitionContext,
 )
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.core.resource import Resource
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import Base
 
 logger = getLogger(__name__)
@@ -88,6 +90,7 @@ class Task(Base):
         "flag",
         "task_priority",
         "worker_group",
+        "environment_code",
         "delay_time",
         "fail_retry_times",
         "fail_retry_interval",
@@ -96,6 +99,17 @@ class Task(Base):
         "timeout",
     }
 
+    # task default attribute will into `task_params` property
+    _task_default_attr = {
+        "local_params",
+        "resource_list",
+        "dependence",
+        "wait_start_timeout",
+        "condition_result",
+    }
+    # task attribute ignore from _task_default_attr and will not into `task_params` property
+    _task_ignore_attr: set = set()
+    # task custom attribute define in sub class and will append to `task_params` property
     _task_custom_attr: set = set()
 
     DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
@@ -108,6 +122,7 @@ class Task(Base):
         flag: Optional[str] = TaskFlag.YES,
         task_priority: Optional[str] = TaskPriority.MEDIUM,
         worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
+        environment_name: Optional[str] = None,
         delay_time: Optional[int] = 0,
         fail_retry_times: Optional[int] = 0,
         fail_retry_interval: Optional[int] = 1,
@@ -127,6 +142,7 @@ class Task(Base):
         self.flag = flag
         self.task_priority = task_priority
         self.worker_group = worker_group
+        self._environment_name = environment_name
         self.fail_retry_times = fail_retry_times
         self.fail_retry_interval = fail_retry_interval
         self.delay_time = delay_time
@@ -143,6 +159,7 @@ class Task(Base):
         # move attribute code and version after _process_definition and process_definition declare
         self.code, self.version = self.gen_code_and_version()
         # Add task to process definition, maybe we could put into property process_definition latter
+
         if (
             self.process_definition is not None
             and self.code not in self.process_definition.tasks
@@ -175,18 +192,28 @@ class Task(Base):
     def resource_list(self) -> List:
         """Get task define attribute `resource_list`."""
         resources = set()
-        for resource in self._resource_list:
-            if type(resource) == str:
-                resources.add(self.query_resource(resource).get(ResourceKey.ID))
-            elif type(resource) == dict and resource.get(ResourceKey.ID) is not None:
+        for res in self._resource_list:
+            if type(res) == str:
+                resources.add(
+                    Resource(name=res, user_name=self.user_name).get_id_from_database()
+                )
+            elif type(res) == dict and res.get(ResourceKey.ID) is not None:
                 logger.warning(
                     """`resource_list` should be defined using List[str] with resource paths,
                        the use of ids to define resources will be remove in version 3.2.0.
                     """
                 )
-                resources.add(resource.get(ResourceKey.ID))
+                resources.add(res.get(ResourceKey.ID))
         return [{ResourceKey.ID: r} for r in resources]
 
+    @property
+    def user_name(self) -> Optional[str]:
+        """Return user name of process definition."""
+        if self.process_definition:
+            return self.process_definition.user.name
+        else:
+            raise PyDSParamException("`user_name` cannot be empty.")
+
     @property
     def condition_result(self) -> Dict:
         """Get attribute condition_result."""
@@ -197,20 +224,24 @@ class Task(Base):
         """Set attribute condition_result."""
         self._condition_result = condition_result
 
+    def _get_attr(self) -> Set[str]:
+        """Get final task task_params attribute.
+
+        Base on `_task_default_attr`, append attribute from `_task_custom_attr` and subtract attribute from
+        `_task_ignore_attr`.
+        """
+        attr = copy.deepcopy(self._task_default_attr)
+        attr -= self._task_ignore_attr
+        attr |= self._task_custom_attr
+        return attr
+
     @property
     def task_params(self) -> Optional[Dict]:
         """Get task parameter object.
 
         Will get result to combine _task_custom_attr and custom_attr.
         """
-        custom_attr = {
-            "local_params",
-            "resource_list",
-            "dependence",
-            "wait_start_timeout",
-            "condition_result",
-        }
-        custom_attr |= self._task_custom_attr
+        custom_attr = self._get_attr()
         return self.get_define_custom(custom_attr=custom_attr)
 
     def __hash__(self):
@@ -288,17 +319,16 @@ class Task(Base):
         equal to 0 by java gateway, otherwise if will return the exists code and version.
         """
         # TODO get code from specific project process definition and task name
-        gateway = launch_gateway()
-        result = gateway.entry_point.getCodeAndVersion(
+        result = JavaGate().get_code_and_version(
             self.process_definition._project, self.process_definition.name, self.name
         )
         # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
         # gateway_result_checker(result)
         return result.get("code"), result.get("version")
 
-    def query_resource(self, full_name):
-        """Get resource info from java gateway, contains resource id, name."""
-        gateway = launch_gateway()
-        return gateway.entry_point.queryResourcesFileInfo(
-            self.process_definition.user.name, full_name
-        )
+    @property
+    def environment_code(self) -> str:
+        """Convert environment name to code."""
+        if self._environment_name is None:
+            return None
+        return JavaGate().query_environment_info(self._environment_name)
diff --git a/src/pydolphinscheduler/core/yaml_process_define.py b/src/pydolphinscheduler/core/yaml_process_define.py
new file mode 100644
index 0000000..0944925
--- /dev/null
+++ b/src/pydolphinscheduler/core/yaml_process_define.py
@@ -0,0 +1,466 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Parse YAML file to create process."""
+
+import logging
+import os
+import re
+from pathlib import Path
+from typing import Any, Dict
+
+from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
+from pydolphinscheduler.utils.yaml_parser import YamlParser
+
+logger = logging.getLogger(__file__)
+
+KEY_PROCESS = "workflow"
+KEY_TASK = "tasks"
+KEY_TASK_TYPE = "task_type"
+KEY_DEPS = "deps"
+KEY_OP = "op"
+
+TASK_SPECIAL_KEYS = [KEY_TASK_TYPE, KEY_DEPS]
+
+
+class ParseTool:
+    """Enhanced parsing tools."""
+
+    @staticmethod
+    def parse_string_param_if_file(string_param: str, **kwargs):
+        """Use $FILE{"data_path"} to load file from "data_path"."""
+        if string_param.startswith("$FILE"):
+            path = re.findall(r"\$FILE\{\"(.*?)\"\}", string_param)[0]
+            base_folder = kwargs.get("base_folder", ".")
+            path = ParseTool.get_possible_path(path, base_folder)
+            with open(path, "r") as read_file:
+                string_param = "".join(read_file)
+        return string_param
+
+    @staticmethod
+    def parse_string_param_if_env(string_param: str, **kwargs):
+        """Use $ENV{env_name} to load environment variable "env_name"."""
+        if "$ENV" in string_param:
+            key = re.findall(r"\$ENV\{(.*?)\}", string_param)[0]
+            env_value = os.environ.get(key, "$%s" % key)
+            string_param = string_param.replace("$ENV{%s}" % key, env_value)
+        return string_param
+
+    @staticmethod
+    def parse_string_param_if_config(string_param: str, **kwargs):
+        """Use ${CONFIG.var_name} to load variable "var_name" from configuration."""
+        if "${CONFIG" in string_param:
+            key = re.findall(r"\$\{CONFIG\.(.*?)\}", string_param)[0]
+            if hasattr(configuration, key):
+                string_param = getattr(configuration, key)
+            else:
+                string_param = configuration.get_single_config(key)
+
+        return string_param
+
+    @staticmethod
+    def get_possible_path(file_path, base_folder):
+        """Get file possible path.
+
+        Return new path if file_path is not exists, but base_folder + file_path exists
+        """
+        possible_path = file_path
+        if not Path(file_path).exists():
+            new_path = Path(base_folder).joinpath(file_path)
+            if new_path.exists():
+                possible_path = new_path
+                logger.info(f"{file_path} not exists, convert to {possible_path}")
+
+        return possible_path
+
+
+def get_task_cls(task_type) -> Task:
+    """Get the task class object by task_type (case compatible)."""
+    # only get task class from tasks.__all__
+    all_task_types = {type_.capitalize(): type_ for type_ in tasks.__all__}
+    task_type_cap = task_type.capitalize()
+    if task_type_cap not in all_task_types:
+        raise PyDSTaskNoFoundException("cant not find task %s" % task_type)
+
+    standard_name = all_task_types[task_type_cap]
+    return getattr(tasks, standard_name)
+
+
+class YamlProcess(YamlParser):
+    """Yaml parser for create process.
+
+    :param yaml_file: yaml file path.
+
+        examples1 ::
+
+            parser = YamlParser(yaml_file=...)
+            parser.create_process_definition()
+
+        examples2 ::
+
+            YamlParser(yaml_file=...).create_process_definition()
+
+    """
+
+    _parse_rules = [
+        ParseTool.parse_string_param_if_file,
+        ParseTool.parse_string_param_if_env,
+        ParseTool.parse_string_param_if_config,
+    ]
+
+    def __init__(self, yaml_file: str):
+        with open(yaml_file, "r") as f:
+            content = f.read()
+
+        self._base_folder = Path(yaml_file).parent
+        content = self.prepare_refer_process(content)
+        super().__init__(content)
+
+    def create_process_definition(self):
+        """Create process main function."""
+        # get process parameters with key "workflow"
+        process_params = self[KEY_PROCESS]
+
+        # pop "run" parameter, used at the end
+        is_run = process_params.pop("run", False)
+
+        # use YamlProcess._parse_rules to parse special value of yaml file
+        process_params = self.parse_params(process_params)
+
+        process_name = process_params["name"]
+        logger.info(f"Create Process: {process_name}")
+        with ProcessDefinition(**process_params) as pd:
+
+            # save dependencies between tasks
+            dependencies = {}
+
+            # save name and task mapping
+            name2task = {}
+
+            # get task datas with key "tasks"
+            for task_data in self[KEY_TASK]:
+                task = self.parse_task(task_data, name2task)
+
+                deps = task_data.get(KEY_DEPS, [])
+                if deps:
+                    dependencies[task.name] = deps
+                name2task[task.name] = task
+
+            # build dependencies between task
+            for downstream_task_name, deps in dependencies.items():
+                downstream_task = name2task[downstream_task_name]
+                for upstream_task_name in deps:
+                    upstream_task = name2task[upstream_task_name]
+                    upstream_task >> downstream_task
+
+            pd.submit()
+            # if set is_run, run the process after submit
+            if is_run:
+                logger.info(f"run workflow: {pd}")
+                pd.run()
+
+        return process_name
+
+    def parse_params(self, params: Any):
+        """Recursively resolves the parameter values.
+
+        The function operates params only when it encounters a string; other types continue recursively.
+        """
+        if isinstance(params, str):
+            for parse_rule in self._parse_rules:
+                params_ = params
+                params = parse_rule(params, base_folder=self._base_folder)
+                if params_ != params:
+                    logger.info(f"parse {params_} -> {params}")
+
+        elif isinstance(params, list):
+            for index in range(len(params)):
+                params[index] = self.parse_params(params[index])
+
+        elif isinstance(params, dict):
+            for key, value in params.items():
+                params[key] = self.parse_params(value)
+
+        return params
+
+    @classmethod
+    def parse(cls, yaml_file: str):
+        """Recursively resolves the parameter values.
+
+        The function operates params only when it encounters a string; other types continue recursively.
+        """
+        process_name = cls(yaml_file).create_process_definition()
+        return process_name
+
+    def prepare_refer_process(self, content):
+        """Allow YAML files to reference process derived from other YAML files."""
+        process_paths = re.findall(r"\$WORKFLOW\{\"(.*?)\"\}", content)
+        for process_path in process_paths:
+            logger.info(
+                f"find special token {process_path}, load process form {process_path}"
+            )
+            possible_path = ParseTool.get_possible_path(process_path, self._base_folder)
+            process_name = YamlProcess.parse(possible_path)
+            content = content.replace('$WORKFLOW{"%s"}' % process_path, process_name)
+
+        return content
+
+    def parse_task(self, task_data: dict, name2task: Dict[str, Task]):
+        """Parse various types of tasks.
+
+        :param task_data: dict.
+                {
+                    "task_type": "Shell",
+                    "params": {"name": "shell_task", "command":"ehco hellp"}
+                }
+
+        :param name2task: Dict[str, Task]), mapping of task_name and task
+
+
+        Some task type have special parse func:
+            if task type is Switch, use parse_switch;
+            if task type is Condition, use parse_condition;
+            if task type is Dependent, use parse_dependent;
+            other, we pass all task_params as input to task class, like "task_cls(**task_params)".
+        """
+        task_type = task_data["task_type"]
+        # get params without special key
+        task_params = {k: v for k, v in task_data.items() if k not in TASK_SPECIAL_KEYS}
+
+        task_cls = get_task_cls(task_type)
+
+        # use YamlProcess._parse_rules to parse special value of yaml file
+        task_params = self.parse_params(task_params)
+
+        if task_cls == tasks.Switch:
+            task = self.parse_switch(task_params, name2task)
+
+        elif task_cls == tasks.Condition:
+            task = self.parse_condition(task_params, name2task)
+
+        elif task_cls == tasks.Dependent:
+            task = self.parse_dependent(task_params, name2task)
+
+        else:
+            task = task_cls(**task_params)
+        logger.info(task_type, task)
+        return task
+
+    def parse_switch(self, task_params, name2task):
+        """Parse Switch Task.
+
+        This is an example Yaml fragment of task_params
+
+        name: switch
+        condition:
+          - ["${var} > 1", switch_child_1]
+          - switch_child_2
+        """
+        from pydolphinscheduler.tasks.switch import (
+            Branch,
+            Default,
+            Switch,
+            SwitchCondition,
+        )
+
+        condition_datas = task_params["condition"]
+        conditions = []
+        for condition_data in condition_datas:
+            assert "task" in condition_data, "task must be in %s" % condition_data
+            task_name = condition_data["task"]
+            condition_string = condition_data.get("condition", None)
+
+            # if condition_string is None, for example: {"task": "switch_child_2"}, set it to Default branch
+            if condition_string is None:
+                conditions.append(Default(task=name2task.get(task_name)))
+
+            # if condition_string is not None, for example:
+            # {"task": "switch_child_2", "condition": "${var} > 1"} set it to Branch
+            else:
+                conditions.append(
+                    Branch(condition_string, task=name2task.get(task_name))
+                )
+
+        switch = Switch(
+            name=task_params["name"], condition=SwitchCondition(*conditions)
+        )
+        return switch
+
+    def parse_condition(self, task_params, name2task):
+        """Parse Condition Task.
+
+        This is an example Yaml fragment of task_params
+
+        name: condition
+        success_task: success_branch
+        failed_task: fail_branch
+        OP: AND
+        groups:
+          -
+            OP: AND
+            groups:
+              - [pre_task_1, true]
+              - [pre_task_2, true]
+              - [pre_task_3, false]
+          -
+            OP: AND
+            groups:
+              - [pre_task_1, false]
+              - [pre_task_2, true]
+              - [pre_task_3, true]
+
+        """
+        from pydolphinscheduler.tasks.condition import (
+            FAILURE,
+            SUCCESS,
+            And,
+            Condition,
+            Or,
+        )
+
+        def get_op_cls(op):
+            cls = None
+            if op.lower() == "and":
+                cls = And
+            elif op.lower() == "or":
+                cls = Or
+            else:
+                raise Exception("OP must be in And or Or, but get: %s" % op)
+            return cls
+
+        second_cond_ops = []
+        for first_group in task_params["groups"]:
+            second_op = first_group["op"]
+            task_ops = []
+            for condition_data in first_group["groups"]:
+                assert "task" in condition_data, "task must be in %s" % condition_data
+                assert "flag" in condition_data, "flag must be in %s" % condition_data
+                task_name = condition_data["task"]
+                flag = condition_data["flag"]
+                task = name2task[task_name]
+
+                # for example: task = pre_task_1, flag = true
+                if flag:
+                    task_ops.append(SUCCESS(task))
+                else:
+                    task_ops.append(FAILURE(task))
+
+            second_cond_ops.append(get_op_cls(second_op)(*task_ops))
+
+        first_op = task_params["op"]
+        cond_operator = get_op_cls(first_op)(*second_cond_ops)
+
+        condition = Condition(
+            name=task_params["name"],
+            condition=cond_operator,
+            success_task=name2task[task_params["success_task"]],
+            failed_task=name2task[task_params["failed_task"]],
+        )
+        return condition
+
+    def parse_dependent(self, task_params, name2task):
+        """Parse Dependent Task.
+
+        This is an example Yaml fragment of task_params
+
+        name: dependent
+        denpendence:
+        OP: AND
+        groups:
+          -
+            OP: Or
+            groups:
+              - [pydolphin, task_dependent_external, task_1]
+              - [pydolphin, task_dependent_external, task_2]
+          -
+            OP: And
+            groups:
+              - [pydolphin, task_dependent_external, task_1, LAST_WEDNESDAY]
+              - [pydolphin, task_dependent_external, task_2, last24Hours]
+
+        """
+        from pydolphinscheduler.tasks.dependent import (
+            And,
+            Dependent,
+            DependentDate,
+            DependentItem,
+            Or,
+        )
+
+        def process_dependent_date(dependent_date):
+            """Parse dependent date (Compatible with key and value of DependentDate)."""
+            dependent_date_upper = dependent_date.upper()
+            if hasattr(DependentDate, dependent_date_upper):
+                dependent_date = getattr(DependentDate, dependent_date_upper)
+            return dependent_date
+
+        def get_op_cls(op):
+            cls = None
+            if op.lower() == "and":
+                cls = And
+            elif op.lower() == "or":
+                cls = Or
+            else:
+                raise Exception("OP must be in And or Or, but get: %s" % op)
+            return cls
+
+        def create_dependent_item(source_items):
+            """Parse dependent item.
+
+            project_name: pydolphin
+            process_definition_name: task_dependent_external
+            dependent_task_name: task_1
+            dependent_date: LAST_WEDNESDAY
+            """
+            project_name = source_items["project_name"]
+            process_definition_name = source_items["process_definition_name"]
+            dependent_task_name = source_items["dependent_task_name"]
+            dependent_date = source_items.get("dependent_date", DependentDate.TODAY)
+            dependent_item = DependentItem(
+                project_name=project_name,
+                process_definition_name=process_definition_name,
+                dependent_task_name=dependent_task_name,
+                dependent_date=process_dependent_date(dependent_date),
+            )
+
+            return dependent_item
+
+        second_dependences = []
+        for first_group in task_params["groups"]:
+            second_op = first_group[KEY_OP]
+            dependence_items = []
+            for source_items in first_group["groups"]:
+                dependence_items.append(create_dependent_item(source_items))
+
+            second_dependences.append(get_op_cls(second_op)(*dependence_items))
+
+        first_op = task_params[KEY_OP]
+        dependence = get_op_cls(first_op)(*second_dependences)
+
+        task = Dependent(
+            name=task_params["name"],
+            dependence=dependence,
+        )
+        return task
+
+
+def create_process_definition(yaml_file):
+    """CLI."""
+    YamlProcess.parse(yaml_file)
diff --git a/src/pydolphinscheduler/examples/task_dvc_example.py b/src/pydolphinscheduler/examples/task_dvc_example.py
new file mode 100644
index 0000000..2b93cd1
--- /dev/null
+++ b/src/pydolphinscheduler/examples/task_dvc_example.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+"""A example workflow for task dvc."""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks import DVCDownload, DVCInit, DVCUpload
+
+repository = "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git"
+
+with ProcessDefinition(
+    name="task_dvc_example",
+    tenant="tenant_exists",
+) as pd:
+    init_task = DVCInit(name="init_dvc", repository=repository, store_url="~/dvc_data")
+    upload_task = DVCUpload(
+        name="upload_data",
+        repository=repository,
+        data_path_in_dvc_repository="iris",
+        data_path_in_worker="~/source/iris",
+        version="v1",
+        message="upload iris data v1",
+    )
+
+    download_task = DVCDownload(
+        name="download_data",
+        repository=repository,
+        data_path_in_dvc_repository="iris",
+        data_path_in_worker="~/target/iris",
+        version="v1",
+    )
+
+    init_task >> upload_task >> download_task
+
+    pd.run()
+
+# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_mlflow_example.py b/src/pydolphinscheduler/examples/task_mlflow_example.py
new file mode 100644
index 0000000..c2734bc
--- /dev/null
+++ b/src/pydolphinscheduler/examples/task_mlflow_example.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+"""A example workflow for task mlflow."""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.mlflow import (
+    MLflowDeployType,
+    MLflowModels,
+    MLFlowProjectsAutoML,
+    MLFlowProjectsBasicAlgorithm,
+    MLFlowProjectsCustom,
+)
+
+mlflow_tracking_uri = "http://127.0.0.1:5000"
+
+with ProcessDefinition(
+    name="task_mlflow_example",
+    tenant="tenant_exists",
+) as pd:
+
+    # run custom mlflow project to train model
+    train_custom = MLFlowProjectsCustom(
+        name="train_xgboost_native",
+        repository="https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native",
+        mlflow_tracking_uri=mlflow_tracking_uri,
+        parameters="-P learning_rate=0.2 -P colsample_bytree=0.8 -P subsample=0.9",
+        experiment_name="xgboost",
+    )
+
+    # run automl to train model
+    train_automl = MLFlowProjectsAutoML(
+        name="train_automl",
+        mlflow_tracking_uri=mlflow_tracking_uri,
+        parameters="time_budget=30;estimator_list=['lgbm']",
+        experiment_name="automl_iris",
+        model_name="iris_A",
+        automl_tool="flaml",
+        data_path="/data/examples/iris",
+    )
+
+    # Using DOCKER to deploy model from train_automl
+    deploy_docker = MLflowModels(
+        name="deploy_docker",
+        model_uri="models:/iris_A/Production",
+        mlflow_tracking_uri=mlflow_tracking_uri,
+        deploy_mode=MLflowDeployType.DOCKER,
+        port=7002,
+    )
+
+    train_automl >> deploy_docker
+
+    # run lightgbm to train model
+    train_basic_algorithm = MLFlowProjectsBasicAlgorithm(
+        name="train_basic_algorithm",
+        mlflow_tracking_uri=mlflow_tracking_uri,
+        parameters="n_estimators=200;learning_rate=0.2",
+        experiment_name="basic_algorithm_iris",
+        model_name="iris_B",
+        algorithm="lightgbm",
+        data_path="/data/examples/iris",
+        search_params="max_depth=[5, 10];n_estimators=[100, 200]",
+    )
+
+    # Using MLFLOW to deploy model from training lightgbm project
+    deploy_mlflow = MLflowModels(
+        name="deploy_mlflow",
+        model_uri="models:/iris_B/Production",
+        mlflow_tracking_uri=mlflow_tracking_uri,
+        deploy_mode=MLflowDeployType.MLFLOW,
+        port=7001,
+    )
+
+    train_basic_algorithm >> deploy_mlflow
+
+    pd.submit()
+
+# [end workflow_declare]
diff --git a/tests/core/test_resource_definition.py b/src/pydolphinscheduler/examples/task_openmldb_example.py
similarity index 54%
copy from tests/core/test_resource_definition.py
copy to src/pydolphinscheduler/examples/task_openmldb_example.py
index ebfb893..5b90091 100644
--- a/tests/core/test_resource_definition.py
+++ b/src/pydolphinscheduler/examples/task_openmldb_example.py
@@ -15,24 +15,29 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test resource definition."""
+# [start workflow_declare]
+"""A example workflow for task openmldb."""
 
-from pydolphinscheduler.core.resource import Resource
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.openmldb import OpenMLDB
 
+sql = """USE demo_db;
+set @@job_timeout=200000;
+LOAD DATA INFILE 'file:///tmp/train_sample.csv'
+INTO TABLE talkingdata OPTIONS(mode='overwrite');
+"""
 
-def test_resource():
-    """Test resource set attributes which get with same type."""
-    name = "/dev/test.py"
-    content = """print("hello world")"""
-    description = "hello world"
-    expect = {
-        "name": name,
-        "content": content,
-        "description": description,
-    }
-    resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
+with ProcessDefinition(
+    name="task_openmldb_example",
+    tenant="tenant_exists",
+) as pd:
+    task_openmldb = OpenMLDB(
+        name="task_openmldb",
+        zookeeper="127.0.0.1:2181",
+        zookeeper_path="/openmldb",
+        execute_mode="offline",
+        sql=sql,
     )
-    assert resourceDefinition.get_define() == expect
+
+    pd.run()
+# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_pytorch_example.py b/src/pydolphinscheduler/examples/task_pytorch_example.py
new file mode 100644
index 0000000..6559c9a
--- /dev/null
+++ b/src/pydolphinscheduler/examples/task_pytorch_example.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+"""A example workflow for task pytorch."""
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.pytorch import Pytorch
+
+with ProcessDefinition(
+    name="task_pytorch_example",
+    tenant="tenant_exists",
+) as pd:
+
+    # run project with existing environment
+    task_existing_env = Pytorch(
+        name="task_existing_env",
+        script="main.py",
+        script_params="--dry-run --no-cuda",
+        project_path="https://github.com/pytorch/examples#mnist",
+        python_command="/home/anaconda3/envs/pytorch/bin/python3",
+    )
+
+    # run project with creating conda environment
+    task_conda_env = Pytorch(
+        name="task_conda_env",
+        script="main.py",
+        script_params="--dry-run --no-cuda",
+        project_path="https://github.com/pytorch/examples#mnist",
+        is_create_environment=True,
+        python_env_tool="conda",
+        requirements="requirements.txt",
+        conda_python_version="3.7",
+    )
+
+    # run project with creating virtualenv environment
+    task_virtualenv_env = Pytorch(
+        name="task_virtualenv_env",
+        script="main.py",
+        script_params="--dry-run --no-cuda",
+        project_path="https://github.com/pytorch/examples#mnist",
+        is_create_environment=True,
+        python_env_tool="virtualenv",
+        requirements="requirements.txt",
+    )
+
+    pd.submit()
+# [end workflow_declare]
diff --git a/src/pydolphinscheduler/examples/task_sagemaker_example.py b/src/pydolphinscheduler/examples/task_sagemaker_example.py
new file mode 100644
index 0000000..b056f61
--- /dev/null
+++ b/src/pydolphinscheduler/examples/task_sagemaker_example.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# [start workflow_declare]
+"""A example workflow for task sagemaker."""
+import json
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.sagemaker import SageMaker
+
+sagemaker_request_data = {
+    "ParallelismConfiguration": {"MaxParallelExecutionSteps": 1},
+    "PipelineExecutionDescription": "test Pipeline",
+    "PipelineExecutionDisplayName": "AbalonePipeline",
+    "PipelineName": "AbalonePipeline",
+    "PipelineParameters": [
+        {"Name": "ProcessingInstanceType", "Value": "ml.m4.xlarge"},
+        {"Name": "ProcessingInstanceCount", "Value": "2"},
+    ],
+}
+
+with ProcessDefinition(
+    name="task_sagemaker_example",
+    tenant="tenant_exists",
+) as pd:
+    task_sagemaker = SageMaker(
+        name="task_sagemaker",
+        sagemaker_request_json=json.dumps(sagemaker_request_data, indent=2),
+    )
+
+    pd.run()
+# [end workflow_declare]
diff --git a/src/pydolphinscheduler/java_gateway.py b/src/pydolphinscheduler/java_gateway.py
index 7b85902..0ff74ba 100644
--- a/src/pydolphinscheduler/java_gateway.py
+++ b/src/pydolphinscheduler/java_gateway.py
@@ -63,3 +63,161 @@ def gateway_result_checker(
     ):
         raise PyDSJavaGatewayException("Get result state not success.")
     return result
+
+
+class JavaGate:
+    """Launch java gateway to pydolphin scheduler."""
+
+    def __init__(
+        self,
+        address: Optional[str] = None,
+        port: Optional[int] = None,
+        auto_convert: Optional[bool] = True,
+    ):
+        self.java_gateway = launch_gateway(address, port, auto_convert)
+
+    def get_datasource_info(self, name: str):
+        """Get datasource info through java gateway."""
+        return self.java_gateway.entry_point.getDatasourceInfo(name)
+
+    def get_resources_file_info(self, program_type: str, main_package: str):
+        """Get resources file info through java gateway."""
+        return self.java_gateway.entry_point.getResourcesFileInfo(
+            program_type, main_package
+        )
+
+    def create_or_update_resource(
+        self, user_name: str, name: str, content: str, description: Optional[str] = None
+    ):
+        """Create or update resource through java gateway."""
+        return self.java_gateway.entry_point.createOrUpdateResource(
+            user_name, name, description, content
+        )
+
+    def query_resources_file_info(self, user_name: str, name: str):
+        """Get resources file info through java gateway."""
+        return self.java_gateway.entry_point.queryResourcesFileInfo(user_name, name)
+
+    def query_environment_info(self, name: str):
+        """Get environment info through java gateway."""
+        return self.java_gateway.entry_point.getEnvironmentInfo(name)
+
+    def get_code_and_version(
+        self, project_name: str, process_definition_name: str, task_name: str
+    ):
+        """Get code and version through java gateway."""
+        return self.java_gateway.entry_point.getCodeAndVersion(
+            project_name, process_definition_name, task_name
+        )
+
+    def create_or_grant_project(
+        self, user: str, name: str, description: Optional[str] = None
+    ):
+        """Create or grant project through java gateway."""
+        return self.java_gateway.entry_point.createOrGrantProject(
+            user, name, description
+        )
+
+    def create_tenant(
+        self, tenant_name: str, queue_name: str, description: Optional[str] = None
+    ):
+        """Create tenant through java gateway."""
+        return self.java_gateway.entry_point.createTenant(
+            tenant_name, description, queue_name
+        )
+
+    def create_user(
+        self,
+        name: str,
+        password: str,
+        email: str,
+        phone: str,
+        tenant: str,
+        queue: str,
+        status: int,
+    ):
+        """Create user through java gateway."""
+        return self.java_gateway.entry_point.createUser(
+            name, password, email, phone, tenant, queue, status
+        )
+
+    def get_dependent_info(
+        self,
+        project_name: str,
+        process_definition_name: str,
+        task_name: Optional[str] = None,
+    ):
+        """Get dependent info through java gateway."""
+        return self.java_gateway.entry_point.getDependentInfo(
+            project_name, process_definition_name, task_name
+        )
+
+    def get_process_definition_info(
+        self, user_name: str, project_name: str, process_definition_name: str
+    ):
+        """Get process definition info through java gateway."""
+        return self.java_gateway.entry_point.getProcessDefinitionInfo(
+            user_name, project_name, process_definition_name
+        )
+
+    def create_or_update_process_definition(
+        self,
+        user_name: str,
+        project_name: str,
+        name: str,
+        description: str,
+        global_params: str,
+        warning_type: str,
+        warning_group_id: int,
+        timeout: int,
+        worker_group: str,
+        tenant_code: str,
+        release_state: int,
+        task_relation_json: str,
+        task_definition_json: str,
+        schedule: Optional[str] = None,
+        other_params_json: Optional[str] = None,
+        execution_type: Optional[str] = None,
+    ):
+        """Create or update process definition through java gateway."""
+        return self.java_gateway.entry_point.createOrUpdateProcessDefinition(
+            user_name,
+            project_name,
+            name,
+            description,
+            global_params,
+            schedule,
+            warning_type,
+            warning_group_id,
+            timeout,
+            worker_group,
+            tenant_code,
+            release_state,
+            task_relation_json,
+            task_definition_json,
+            other_params_json,
+            execution_type,
+        )
+
+    def exec_process_instance(
+        self,
+        user_name: str,
+        project_name: str,
+        process_definition_name: str,
+        cron_time: str,
+        worker_group: str,
+        warning_type: str,
+        warning_group_id: int,
+        timeout: int,
+    ):
+        """Exec process instance through java gateway."""
+        return self.java_gateway.entry_point.execProcessInstance(
+            user_name,
+            project_name,
+            process_definition_name,
+            cron_time,
+            worker_group,
+            warning_type,
+            warning_group_id,
+            timeout,
+        )
diff --git a/src/pydolphinscheduler/models/project.py b/src/pydolphinscheduler/models/project.py
index ad72211..bebdafd 100644
--- a/src/pydolphinscheduler/models/project.py
+++ b/src/pydolphinscheduler/models/project.py
@@ -20,7 +20,7 @@
 from typing import Optional
 
 from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import BaseSide
 
 
@@ -36,7 +36,6 @@ class Project(BaseSide):
 
     def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
         """Create Project if not exists."""
-        gateway = launch_gateway()
-        gateway.entry_point.createOrGrantProject(user, self.name, self.description)
+        JavaGate().create_or_grant_project(user, self.name, self.description)
         # TODO recover result checker
         # gateway_result_checker(result, None)
diff --git a/src/pydolphinscheduler/models/queue.py b/src/pydolphinscheduler/models/queue.py
index 3f8f81d..e6da259 100644
--- a/src/pydolphinscheduler/models/queue.py
+++ b/src/pydolphinscheduler/models/queue.py
@@ -20,7 +20,6 @@
 from typing import Optional
 
 from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import gateway_result_checker, launch_gateway
 from pydolphinscheduler.models import BaseSide
 
 
@@ -33,10 +32,3 @@ class Queue(BaseSide):
         description: Optional[str] = "",
     ):
         super().__init__(name, description)
-
-    def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
-        """Create Queue if not exists."""
-        gateway = launch_gateway()
-        # Here we set Queue.name and Queue.queueName same as self.name
-        result = gateway.entry_point.createProject(user, self.name, self.name)
-        gateway_result_checker(result, None)
diff --git a/src/pydolphinscheduler/models/tenant.py b/src/pydolphinscheduler/models/tenant.py
index 148a8f6..6641d9a 100644
--- a/src/pydolphinscheduler/models/tenant.py
+++ b/src/pydolphinscheduler/models/tenant.py
@@ -20,7 +20,7 @@
 from typing import Optional
 
 from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import BaseSide
 
 
@@ -40,6 +40,5 @@ class Tenant(BaseSide):
         self, queue_name: str, user=configuration.USER_NAME
     ) -> None:
         """Create Tenant if not exists."""
-        gateway = launch_gateway()
-        gateway.entry_point.createTenant(self.name, self.description, queue_name)
+        JavaGate().create_tenant(self.name, queue_name, self.description)
         # gateway_result_checker(result, None)
diff --git a/src/pydolphinscheduler/models/user.py b/src/pydolphinscheduler/models/user.py
index de2d8b1..e11bb9c 100644
--- a/src/pydolphinscheduler/models/user.py
+++ b/src/pydolphinscheduler/models/user.py
@@ -20,7 +20,7 @@
 from typing import Optional
 
 from pydolphinscheduler import configuration
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models import BaseSide, Tenant
 
 
@@ -64,8 +64,7 @@ class User(BaseSide):
         """Create User if not exists."""
         # Should make sure queue already exists.
         self.create_tenant_if_not_exists()
-        gateway = launch_gateway()
-        gateway.entry_point.createUser(
+        JavaGate().create_user(
             self.name,
             self.password,
             self.email,
diff --git a/src/pydolphinscheduler/tasks/__init__.py b/src/pydolphinscheduler/tasks/__init__.py
index dd46c91..972b1b7 100644
--- a/src/pydolphinscheduler/tasks/__init__.py
+++ b/src/pydolphinscheduler/tasks/__init__.py
@@ -20,11 +20,21 @@
 from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Condition, Or
 from pydolphinscheduler.tasks.datax import CustomDataX, DataX
 from pydolphinscheduler.tasks.dependent import Dependent
+from pydolphinscheduler.tasks.dvc import DVCDownload, DVCInit, DVCUpload
 from pydolphinscheduler.tasks.flink import Flink
 from pydolphinscheduler.tasks.http import Http
 from pydolphinscheduler.tasks.map_reduce import MR
+from pydolphinscheduler.tasks.mlflow import (
+    MLflowModels,
+    MLFlowProjectsAutoML,
+    MLFlowProjectsBasicAlgorithm,
+    MLFlowProjectsCustom,
+)
+from pydolphinscheduler.tasks.openmldb import OpenMLDB
 from pydolphinscheduler.tasks.procedure import Procedure
 from pydolphinscheduler.tasks.python import Python
+from pydolphinscheduler.tasks.pytorch import Pytorch
+from pydolphinscheduler.tasks.sagemaker import SageMaker
 from pydolphinscheduler.tasks.shell import Shell
 from pydolphinscheduler.tasks.spark import Spark
 from pydolphinscheduler.tasks.sql import Sql
@@ -34,15 +44,26 @@ from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondi
 __all__ = [
     "Condition",
     "DataX",
+    "CustomDataX",
     "Dependent",
+    "DVCInit",
+    "DVCUpload",
+    "DVCDownload",
     "Flink",
     "Http",
     "MR",
+    "OpenMLDB",
+    "MLFlowProjectsBasicAlgorithm",
+    "MLFlowProjectsCustom",
+    "MLFlowProjectsAutoML",
+    "MLflowModels",
     "Procedure",
     "Python",
+    "Pytorch",
     "Shell",
     "Spark",
     "Sql",
     "SubProcess",
     "Switch",
+    "SageMaker",
 ]
diff --git a/src/pydolphinscheduler/tasks/dependent.py b/src/pydolphinscheduler/tasks/dependent.py
index cc6d25b..12ec6ba 100644
--- a/src/pydolphinscheduler/tasks/dependent.py
+++ b/src/pydolphinscheduler/tasks/dependent.py
@@ -22,7 +22,7 @@ from typing import Dict, Optional, Tuple
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSJavaGatewayException, PyDSParamException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 from pydolphinscheduler.models.base import Base
 
 DEPENDENT_ALL_TASK_IN_WORKFLOW = "0"
@@ -165,9 +165,8 @@ class DependentItem(Base):
         if self._code:
             return self._code
         else:
-            gateway = launch_gateway()
             try:
-                self._code = gateway.entry_point.getDependentInfo(*self.code_parameter)
+                self._code = JavaGate().get_dependent_info(*self.code_parameter)
                 return self._code
             except Exception:
                 raise PyDSJavaGatewayException("Function get_code_from_gateway error.")
diff --git a/src/pydolphinscheduler/tasks/dvc.py b/src/pydolphinscheduler/tasks/dvc.py
new file mode 100644
index 0000000..c5b5cd5
--- /dev/null
+++ b/src/pydolphinscheduler/tasks/dvc.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task dvc."""
+from copy import deepcopy
+from typing import Dict
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
+
+
+class DvcTaskType(str):
+    """Constants for dvc task type."""
+
+    INIT = "Init DVC"
+    DOWNLOAD = "Download"
+    UPLOAD = "Upload"
+
+
+class BaseDVC(Task):
+    """Base class for dvc task."""
+
+    dvc_task_type = None
+
+    _task_custom_attr = {
+        "dvc_task_type",
+        "dvc_repository",
+    }
+
+    _child_task_dvc_attr = set()
+
+    def __init__(self, name: str, repository: str, *args, **kwargs):
+        super().__init__(name, TaskType.DVC, *args, **kwargs)
+        self.dvc_repository = repository
+
+    @property
+    def task_params(self) -> Dict:
+        """Return task params."""
+        self._task_custom_attr = deepcopy(self._task_custom_attr)
+        self._task_custom_attr.update(self._child_task_dvc_attr)
+        return super().task_params
+
+
+class DVCInit(BaseDVC):
+    """Task DVC Init object, declare behavior for DVC Init task to dolphinscheduler."""
+
+    dvc_task_type = DvcTaskType.INIT
+
+    _child_task_dvc_attr = {"dvc_store_url"}
+
+    def __init__(self, name: str, repository: str, store_url: str, *args, **kwargs):
+        super().__init__(name, repository, *args, **kwargs)
+        self.dvc_store_url = store_url
+
+
+class DVCDownload(BaseDVC):
+    """Task DVC Download object, declare behavior for DVC Download task to dolphinscheduler."""
+
+    dvc_task_type = DvcTaskType.DOWNLOAD
+
+    _child_task_dvc_attr = {
+        "dvc_load_save_data_path",
+        "dvc_data_location",
+        "dvc_version",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        repository: str,
+        data_path_in_dvc_repository: str,
+        data_path_in_worker: str,
+        version: str,
+        *args,
+        **kwargs
+    ):
+        super().__init__(name, repository, *args, **kwargs)
+        self.dvc_data_location = data_path_in_dvc_repository
+        self.dvc_load_save_data_path = data_path_in_worker
+        self.dvc_version = version
+
+
+class DVCUpload(BaseDVC):
+    """Task DVC Upload object, declare behavior for DVC Upload task to dolphinscheduler."""
+
+    dvc_task_type = DvcTaskType.UPLOAD
+
+    _child_task_dvc_attr = {
+        "dvc_load_save_data_path",
+        "dvc_data_location",
+        "dvc_version",
+        "dvc_message",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        repository: str,
+        data_path_in_worker: str,
+        data_path_in_dvc_repository: str,
+        version: str,
+        message: str,
+        *args,
+        **kwargs
+    ):
+        super().__init__(name, repository, *args, **kwargs)
+        self.dvc_data_location = data_path_in_dvc_repository
+        self.dvc_load_save_data_path = data_path_in_worker
+        self.dvc_version = version
+        self.dvc_message = message
diff --git a/src/pydolphinscheduler/tasks/mlflow.py b/src/pydolphinscheduler/tasks/mlflow.py
new file mode 100644
index 0000000..e86797a
--- /dev/null
+++ b/src/pydolphinscheduler/tasks/mlflow.py
@@ -0,0 +1,256 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task mlflow."""
+from copy import deepcopy
+from typing import Dict, Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
+
+
+class MLflowTaskType(str):
+    """MLflow task type."""
+
+    MLFLOW_PROJECTS = "MLflow Projects"
+    MLFLOW_MODELS = "MLflow Models"
+
+
+class MLflowJobType(str):
+    """MLflow job type."""
+
+    AUTOML = "AutoML"
+    BASIC_ALGORITHM = "BasicAlgorithm"
+    CUSTOM_PROJECT = "CustomProject"
+
+
+class MLflowDeployType(str):
+    """MLflow deploy type."""
+
+    MLFLOW = "MLFLOW"
+    DOCKER = "DOCKER"
+
+
+DEFAULT_MLFLOW_TRACKING_URI = "http://127.0.0.1:5000"
+DEFAULT_VERSION = "master"
+
+
+class BaseMLflow(Task):
+    """Base MLflow task."""
+
+    mlflow_task_type = None
+
+    _task_custom_attr = {
+        "mlflow_tracking_uri",
+        "mlflow_task_type",
+    }
+
+    _child_task_mlflow_attr = set()
+
+    def __init__(self, name: str, mlflow_tracking_uri: str, *args, **kwargs):
+        super().__init__(name, TaskType.MLFLOW, *args, **kwargs)
+        self.mlflow_tracking_uri = mlflow_tracking_uri
+
+    @property
+    def task_params(self) -> Dict:
+        """Return task params."""
+        self._task_custom_attr = deepcopy(self._task_custom_attr)
+        self._task_custom_attr.update(self._child_task_mlflow_attr)
+        return super().task_params
+
+
+class MLflowModels(BaseMLflow):
+    """Task MLflow models object, declare behavior for MLflow models task to dolphinscheduler.
+
+    Deploy machine learning models in diverse serving environments.
+
+    :param name: task name
+    :param model_uri: Model-URI of MLflow , support models:/<model_name>/suffix format and runs:/ format.
+        See https://mlflow.org/docs/latest/tracking.html#artifact-stores
+    :param mlflow_tracking_uri: MLflow tracking server uri, default is http://127.0.0.1:5000
+    :param deploy_mode: MLflow deploy mode, support MLFLOW, DOCKER, default is DOCKER
+    :param port: deploy port, default is 7000
+    """
+
+    mlflow_task_type = MLflowTaskType.MLFLOW_MODELS
+
+    _child_task_mlflow_attr = {
+        "deploy_type",
+        "deploy_model_key",
+        "deploy_port",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        model_uri: str,
+        mlflow_tracking_uri: Optional[str] = DEFAULT_MLFLOW_TRACKING_URI,
+        deploy_mode: Optional[str] = MLflowDeployType.DOCKER,
+        port: Optional[int] = 7000,
+        *args,
+        **kwargs
+    ):
+        """Init mlflow models task."""
+        super().__init__(name, mlflow_tracking_uri, *args, **kwargs)
+        self.deploy_type = deploy_mode.upper()
+        self.deploy_model_key = model_uri
+        self.deploy_port = port
+
+
+class MLFlowProjectsCustom(BaseMLflow):
+    """Task MLflow projects object, declare behavior for MLflow Custom projects task to dolphinscheduler.
+
+    :param name: task name
+    :param repository: Repository url of MLflow Project, Support git address and directory on worker.
+        If it's in a subdirectory, We add # to support this (same as mlflow run) ,
+        for example https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native.
+    :param mlflow_tracking_uri: MLflow tracking server uri, default is http://127.0.0.1:5000
+    :param experiment_name: MLflow experiment name, default is empty
+    :param parameters: MLflow project parameters, default is empty
+    :param version: MLflow project version, default is master
+
+    """
+
+    mlflow_task_type = MLflowTaskType.MLFLOW_PROJECTS
+    mlflow_job_type = MLflowJobType.CUSTOM_PROJECT
+
+    _child_task_mlflow_attr = {
+        "mlflow_job_type",
+        "experiment_name",
+        "params",
+        "mlflow_project_repository",
+        "mlflow_project_version",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        repository: str,
+        mlflow_tracking_uri: Optional[str] = DEFAULT_MLFLOW_TRACKING_URI,
+        experiment_name: Optional[str] = "",
+        parameters: Optional[str] = "",
+        version: Optional[str] = "master",
+        *args,
+        **kwargs
+    ):
+        """Init mlflow projects task."""
+        super().__init__(name, mlflow_tracking_uri, *args, **kwargs)
+        self.mlflow_project_repository = repository
+        self.experiment_name = experiment_name
+        self.params = parameters
+        self.mlflow_project_version = version
+
+
+class MLFlowProjectsAutoML(BaseMLflow):
+    """Task MLflow projects object, declare behavior for AutoML task to dolphinscheduler.
+
+    :param name: task name
+    :param data_path: data path of MLflow Project, Support git address and directory on worker.
+    :param automl_tool: The AutoML tool used, currently supports autosklearn and flaml.
+    :param mlflow_tracking_uri: MLflow tracking server uri, default is http://127.0.0.1:5000
+    :param experiment_name: MLflow experiment name, default is empty
+    :param model_name: MLflow model name, default is empty
+    :param parameters: MLflow project parameters, default is empty
+
+    """
+
+    mlflow_task_type = MLflowTaskType.MLFLOW_PROJECTS
+    mlflow_job_type = MLflowJobType.AUTOML
+
+    _child_task_mlflow_attr = {
+        "mlflow_job_type",
+        "experiment_name",
+        "model_name",
+        "register_model",
+        "data_path",
+        "params",
+        "automl_tool",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        data_path: str,
+        automl_tool: Optional[str] = "flaml",
+        mlflow_tracking_uri: Optional[str] = DEFAULT_MLFLOW_TRACKING_URI,
+        experiment_name: Optional[str] = "",
+        model_name: Optional[str] = "",
+        parameters: Optional[str] = "",
+        *args,
+        **kwargs
+    ):
+        """Init mlflow projects task."""
+        super().__init__(name, mlflow_tracking_uri, *args, **kwargs)
+        self.data_path = data_path
+        self.experiment_name = experiment_name
+        self.model_name = model_name
+        self.params = parameters
+        self.automl_tool = automl_tool.lower()
+        self.register_model = bool(model_name)
+
+
+class MLFlowProjectsBasicAlgorithm(BaseMLflow):
+    """Task MLflow projects object, declare behavior for BasicAlgorithm task to dolphinscheduler.
+
+    :param name: task name
+    :param data_path: data path of MLflow Project, Support git address and directory on worker.
+    :param algorithm: The selected algorithm currently supports LR, SVM, LightGBM and XGboost
+            based on scikit-learn form.
+    :param mlflow_tracking_uri: MLflow tracking server uri, default is http://127.0.0.1:5000
+    :param experiment_name: MLflow experiment name, default is empty
+    :param model_name: MLflow model name, default is empty
+    :param parameters: MLflow project parameters, default is empty
+    :param search_params: Whether to search the parameters, default is empty
+
+    """
+
+    mlflow_job_type = MLflowJobType.BASIC_ALGORITHM
+    mlflow_task_type = MLflowTaskType.MLFLOW_PROJECTS
+
+    _child_task_mlflow_attr = {
+        "mlflow_job_type",
+        "experiment_name",
+        "model_name",
+        "register_model",
+        "data_path",
+        "params",
+        "algorithm",
+        "search_params",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        data_path: str,
+        algorithm: Optional[str] = "lightgbm",
+        mlflow_tracking_uri: Optional[str] = DEFAULT_MLFLOW_TRACKING_URI,
+        experiment_name: Optional[str] = "",
+        model_name: Optional[str] = "",
+        parameters: Optional[str] = "",
+        search_params: Optional[str] = "",
+        *args,
+        **kwargs
+    ):
+        """Init mlflow projects task."""
+        super().__init__(name, mlflow_tracking_uri, *args, **kwargs)
+        self.data_path = data_path
+        self.experiment_name = experiment_name
+        self.model_name = model_name
+        self.params = parameters
+        self.algorithm = algorithm.lower()
+        self.search_params = search_params
+        self.register_model = bool(model_name)
diff --git a/src/pydolphinscheduler/tasks/openmldb.py b/src/pydolphinscheduler/tasks/openmldb.py
new file mode 100644
index 0000000..5dad36e
--- /dev/null
+++ b/src/pydolphinscheduler/tasks/openmldb.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task OpenMLDB."""
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
+
+
+class OpenMLDB(Task):
+    """Task OpenMLDB object, declare behavior for OpenMLDB task to dolphinscheduler.
+
+    :param name: task name
+    :param zookeeper: OpenMLDB cluster zookeeper address, e.g. 127.0.0.1:2181.
+    :param zookeeper_path: OpenMLDB cluster zookeeper path, e.g. /openmldb.
+    :param execute_mode: Determine the init mode, offline or online. You can switch it in sql statementself.
+    :param sql: SQL statement.
+    """
+
+    _task_custom_attr = {
+        "zk",
+        "zk_path",
+        "execute_mode",
+        "sql",
+    }
+
+    def __init__(
+        self, name, zookeeper, zookeeper_path, execute_mode, sql, *args, **kwargs
+    ):
+        super().__init__(name, TaskType.OPENMLDB, *args, **kwargs)
+        self.zk = zookeeper
+        self.zk_path = zookeeper_path
+        self.execute_mode = execute_mode
+        self.sql = sql
diff --git a/src/pydolphinscheduler/tasks/pytorch.py b/src/pydolphinscheduler/tasks/pytorch.py
new file mode 100644
index 0000000..4767f7e
--- /dev/null
+++ b/src/pydolphinscheduler/tasks/pytorch.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task Pytorch."""
+from typing import Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
+
+
+class DEFAULT:
+    """Default values for Pytorch."""
+
+    is_create_environment = False
+    project_path = "."
+    python_command = "${PYTHON_HOME}"
+
+
+class Pytorch(Task):
+    """Task Pytorch object, declare behavior for Pytorch task to dolphinscheduler.
+
+    See also: `DolphinScheduler Pytorch Task Plugin
+    <https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/guide/task/pytorch.html>`_
+
+    :param name: task name
+    :param script: Entry to the Python script file that you want to run.
+    :param script_params: Input parameters at run time.
+    :param project_path: The path to the project. Default "." .
+    :param is_create_environment: is create environment. Default False.
+    :param python_command: The path to the python command. Default "${PYTHON_HOME}".
+    :param python_env_tool: The python environment tool. Default "conda".
+    :param requirements: The path to the requirements.txt file. Default "requirements.txt".
+    :param conda_python_version: The python version of conda environment. Default "3.7".
+    """
+
+    _task_custom_attr = {
+        "script",
+        "script_params",
+        "other_params",
+        "python_path",
+        "is_create_environment",
+        "python_command",
+        "python_env_tool",
+        "requirements",
+        "conda_python_version",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        script: str,
+        script_params: str = "",
+        project_path: Optional[str] = DEFAULT.project_path,
+        is_create_environment: Optional[bool] = DEFAULT.is_create_environment,
+        python_command: Optional[str] = DEFAULT.python_command,
+        python_env_tool: Optional[str] = "conda",
+        requirements: Optional[str] = "requirements.txt",
+        conda_python_version: Optional[str] = "3.7",
+        *args,
+        **kwargs,
+    ):
+        """Init Pytorch task."""
+        super().__init__(name, TaskType.PYTORCH, *args, **kwargs)
+        self.script = script
+        self.script_params = script_params
+        self.is_create_environment = is_create_environment
+        self.python_path = project_path
+        self.python_command = python_command
+        self.python_env_tool = python_env_tool
+        self.requirements = requirements
+        self.conda_python_version = conda_python_version
+
+    @property
+    def other_params(self):
+        """Return other params."""
+        conds = [
+            self.is_create_environment != DEFAULT.is_create_environment,
+            self.python_path != DEFAULT.project_path,
+            self.python_command != DEFAULT.python_command,
+        ]
+        return any(conds)
diff --git a/src/pydolphinscheduler/core/resource.py b/src/pydolphinscheduler/tasks/sagemaker.py
similarity index 50%
copy from src/pydolphinscheduler/core/resource.py
copy to src/pydolphinscheduler/tasks/sagemaker.py
index a3aab81..30b128d 100644
--- a/src/pydolphinscheduler/core/resource.py
+++ b/src/pydolphinscheduler/tasks/sagemaker.py
@@ -15,29 +15,26 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Module resource."""
+"""Task SageMaker."""
 
-from typing import Optional
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.task import Task
 
-from pydolphinscheduler.models import Base
 
+class SageMaker(Task):
+    """Task SageMaker object, declare behavior for SageMaker task to dolphinscheduler.
 
-class Resource(Base):
-    """resource object, will define the resources that you want to create or update.
+    :param name: A unique, meaningful string for the SageMaker task.
+    :param sagemaker_request_json: Request parameters of StartPipelineExecution,
+        see also `AWS API
+        <https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StartPipelineExecution.html>`_
 
-    :param name: The fullname of resource.Includes path and suffix.
-    :param content: The description of resource.
-    :param description: The description of resource.
     """
 
-    _DEFINE_ATTR = {"name", "content", "description"}
-
-    def __init__(
-        self,
-        name: str,
-        content: str,
-        description: Optional[str] = None,
-    ):
-        super().__init__(name, description)
-        self.content = content
-        self._resource_code = None
+    _task_custom_attr = {
+        "sagemaker_request_json",
+    }
+
+    def __init__(self, name: str, sagemaker_request_json: str, *args, **kwargs):
+        super().__init__(name, TaskType.SAGEMAKER, *args, **kwargs)
+        self.sagemaker_request_json = sagemaker_request_json
diff --git a/src/pydolphinscheduler/tasks/sub_process.py b/src/pydolphinscheduler/tasks/sub_process.py
index 8ba6b4c..c7a9f8b 100644
--- a/src/pydolphinscheduler/tasks/sub_process.py
+++ b/src/pydolphinscheduler/tasks/sub_process.py
@@ -22,7 +22,7 @@ from typing import Dict
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSProcessDefinitionNotAssignException
-from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.java_gateway import JavaGate
 
 
 class SubProcess(Task):
@@ -47,8 +47,7 @@ class SubProcess(Task):
             raise PyDSProcessDefinitionNotAssignException(
                 "ProcessDefinition must be provider for task SubProcess."
             )
-        gateway = launch_gateway()
-        return gateway.entry_point.getProcessDefinitionInfo(
+        return JavaGate().get_process_definition_info(
             self.process_definition.user.name,
             self.process_definition.project.name,
             process_definition_name,
diff --git a/src/pydolphinscheduler/tasks/switch.py b/src/pydolphinscheduler/tasks/switch.py
index 35eece8..45edaa9 100644
--- a/src/pydolphinscheduler/tasks/switch.py
+++ b/src/pydolphinscheduler/tasks/switch.py
@@ -134,6 +134,11 @@ class Switch(Task):
     if task `switch` in this workflow.
     """
 
+    _task_ignore_attr = {
+        "condition_result",
+        "dependence",
+    }
+
     def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs):
         super().__init__(name, TaskType.SWITCH, *args, **kwargs)
         self.condition = condition
diff --git a/tests/core/test_engine.py b/tests/core/test_engine.py
index e36c47b..ba44fad 100644
--- a/tests/core/test_engine.py
+++ b/tests/core/test_engine.py
@@ -124,6 +124,7 @@ def test_property_task_params(mock_resource, mock_code_version, attr, expect):
                 "flag": "YES",
                 "taskPriority": "MEDIUM",
                 "workerGroup": "default",
+                "environmentCode": None,
                 "failRetryTimes": 0,
                 "failRetryInterval": 1,
                 "timeoutFlag": "CLOSE",
diff --git a/tests/core/test_resource_definition.py b/tests/core/test_resource_definition.py
index ebfb893..07fcac3 100644
--- a/tests/core/test_resource_definition.py
+++ b/tests/core/test_resource_definition.py
@@ -16,8 +16,10 @@
 # under the License.
 
 """Test resource definition."""
+import pytest
 
 from pydolphinscheduler.core.resource import Resource
+from pydolphinscheduler.exceptions import PyDSParamException
 
 
 def test_resource():
@@ -25,14 +27,42 @@ def test_resource():
     name = "/dev/test.py"
     content = """print("hello world")"""
     description = "hello world"
+    user_name = "test_user"
     expect = {
         "name": name,
         "content": content,
         "description": description,
+        "userName": user_name,
     }
     resourceDefinition = Resource(
-        name=name,
-        content=content,
-        description=description,
+        name=name, content=content, description=description, user_name=user_name
     )
     assert resourceDefinition.get_define() == expect
+
+
+def test_empty_user_name():
+    """Tests for the exception get info from database when the user name is null."""
+    name = "/dev/test.py"
+    content = """print("hello world")"""
+    description = "hello world"
+    resourceDefinition = Resource(name=name, content=content, description=description)
+    with pytest.raises(
+        PyDSParamException,
+        match="`user_name` is required when querying resources from python gate.",
+    ):
+        resourceDefinition.get_info_from_database()
+
+
+def test_empty_content():
+    """Tests for the exception create or update resource when the user name or content is empty."""
+    name = "/dev/test.py"
+    user_name = "test_user"
+    description = "hello world"
+    resourceDefinition = Resource(
+        name=name, description=description, user_name=user_name
+    )
+    with pytest.raises(
+        PyDSParamException,
+        match="`user_name` and `content` are required when create or update resource from python gate.",
+    ):
+        resourceDefinition.create_or_update_resource()
diff --git a/tests/core/test_task.py b/tests/core/test_task.py
index 65555c1..87ebc99 100644
--- a/tests/core/test_task.py
+++ b/tests/core/test_task.py
@@ -19,18 +19,86 @@
 import logging
 import re
 from unittest.mock import patch
+from typing import Set
+from unittest.mock import patch
 
 import pytest
 
 from pydolphinscheduler.core.process_definition import ProcessDefinition
 from pydolphinscheduler.core.task import Task, TaskRelation
-from tests.testing.task import Task as testTask
+from tests.testing.task import Task as TestTask
 from tests.testing.task import TaskWithCode
 
 TEST_TASK_RELATION_SET = set()
 TEST_TASK_RELATION_SIZE = 0
 
 
+@pytest.mark.parametrize(
+    "addition, ignore, expect",
+    [
+        (
+            set(),
+            set(),
+            {
+                "local_params",
+                "resource_list",
+                "dependence",
+                "wait_start_timeout",
+                "condition_result",
+            },
+        ),
+        (
+            set(),
+            {"dependence", "condition_result", "not_exists"},
+            {
+                "local_params",
+                "resource_list",
+                "wait_start_timeout",
+            },
+        ),
+        (
+            {
+                "not_exists_1",
+                "not_exists_2",
+            },
+            set(),
+            {
+                "not_exists_1",
+                "not_exists_2",
+                "local_params",
+                "resource_list",
+                "dependence",
+                "wait_start_timeout",
+                "condition_result",
+            },
+        ),
+        # test addition and ignore conflict to see behavior
+        (
+            {
+                "not_exists",
+            },
+            {"condition_result", "not_exists"},
+            {
+                "not_exists",
+                "local_params",
+                "resource_list",
+                "dependence",
+                "wait_start_timeout",
+            },
+        ),
+    ],
+)
+def test__get_attr(addition: Set, ignore: Set, expect: Set):
+    """Test task function `_get_attr`."""
+    task = TestTask(
+        name="test-get-attr",
+        task_type="test",
+    )
+    task._task_custom_attr = addition
+    task._task_ignore_attr = ignore
+    assert task._get_attr() == expect
+
+
 @pytest.mark.parametrize(
     "attr, expect",
     [
@@ -63,12 +131,16 @@ TEST_TASK_RELATION_SIZE = 0
     ],
 )
 @patch(
-    "pydolphinscheduler.core.task.Task.query_resource",
-    return_value=({"id": 1, "name": "foo"}),
+    "pydolphinscheduler.core.resource.Resource.get_id_from_database",
+    return_value=1,
 )
-def test_property_task_params(mock_resource, attr, expect):
+@patch(
+    "pydolphinscheduler.core.task.Task.user_name",
+    return_value="test_user",
+)
+def test_property_task_params(mock_resource, mock_user_name, attr, expect):
     """Test class task property."""
-    task = testTask(
+    task = TestTask(
         "test-property-task-params",
         "test-task",
         **attr,
@@ -157,6 +229,7 @@ def test_task_get_define():
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
@@ -177,8 +250,8 @@ def test_two_tasks_shift(shift: str):
 
     Here we test both `>>` and `<<` bit operator.
     """
-    upstream = testTask(name="upstream", task_type=shift)
-    downstream = testTask(name="downstream", task_type=shift)
+    upstream = TestTask(name="upstream", task_type=shift)
+    downstream = TestTask(name="downstream", task_type=shift)
     if shift == "<<":
         downstream << upstream
     elif shift == ">>":
@@ -214,10 +287,10 @@ def test_tasks_list_shift(dep_expr: str, flag: str):
         "downstream": "upstream",
     }
     task_type = "dep_task_and_tasks"
-    task = testTask(name="upstream", task_type=task_type)
+    task = TestTask(name="upstream", task_type=task_type)
     tasks = [
-        testTask(name="downstream1", task_type=task_type),
-        testTask(name="downstream2", task_type=task_type),
+        TestTask(name="downstream1", task_type=task_type),
+        TestTask(name="downstream2", task_type=task_type),
     ]
 
     # Use build-in function eval to simply test case and reduce duplicate code
@@ -265,10 +338,16 @@ def test_add_duplicate(caplog):
     return_value=(123, 1),
 )
 @patch(
-    "pydolphinscheduler.core.task.Task.query_resource",
-    return_value=({"id": 1, "name": "/dev/test.py"}),
+    "pydolphinscheduler.core.resource.Resource.get_id_from_database",
+    return_value=1,
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.user_name",
+    return_value="test_user",
 )
-def test_python_resource_list(mock_code_version, mock_resource, resources, expect):
+def test_python_resource_list(
+    mock_code_version, mock_resource, mock_user_name, resources, expect
+):
     """Test python task resource list."""
     task = Task(
         name="python_resource_list.",
diff --git a/tests/core/test_yaml_process_define.py b/tests/core/test_yaml_process_define.py
new file mode 100644
index 0000000..99ad179
--- /dev/null
+++ b/tests/core/test_yaml_process_define.py
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test YAML process."""
+
+import os
+from pathlib import Path
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler import configuration, tasks
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.yaml_process_define import (
+    ParseTool,
+    create_process_definition,
+    get_task_cls,
+)
+from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
+from tests.testing.path import path_yaml_example
+from tests.testing.task import Task
+
+
+@pytest.mark.parametrize(
+    "string_param, expect",
+    [
+        ("$ENV{PROJECT_NAME}", "~/pydolphinscheduler"),
+    ],
+)
+def test_parse_tool_env_exist(string_param, expect):
+    """Test parsing the environment variable."""
+    os.environ["PROJECT_NAME"] = expect
+    assert expect == ParseTool.parse_string_param_if_env(string_param)
+
+
+def test_parse_tool_env_not_exist():
+    """Test parsing the not exist environment variable."""
+    key = "THIS_ENV_NOT_EXIST_0000000"
+    string_param = "$ENV{%s}" % key
+    expect = "$" + key
+    assert expect == ParseTool.parse_string_param_if_env(string_param)
+
+
+@pytest.mark.parametrize(
+    "string_param, expect_key",
+    [
+        ("${CONFIG.java_gateway.address}", "java_gateway.address"),
+        ("${CONFIG.WORKFLOW_PROJECT}", "default.workflow.project"),
+    ],
+)
+def test_parse_tool_config(string_param, expect_key):
+    """Test parsing configuration."""
+    expect = configuration.get_single_config(expect_key)
+    assert expect == ParseTool.parse_string_param_if_config(string_param)
+
+
+def test_parse_possible_yaml_file():
+    """Test parsing possible path."""
+    folder = Path(path_yaml_example)
+    file_name = "Shell.yaml"
+    path = folder.joinpath(file_name)
+
+    with open(path, "r") as f:
+        expect = "".join(f)
+
+    string_param = '$FILE{"%s"}' % file_name
+    content_ = ParseTool.parse_string_param_if_file(string_param, base_folder=folder)
+
+    assert expect == content_
+
+
+def test_parse_tool_parse_possible_path_file():
+    """Test parsing possible path."""
+    folder = Path(path_yaml_example)
+    file_name = "Shell.yaml"
+    path = folder.joinpath(file_name)
+
+    possible_path = ParseTool.get_possible_path(path, base_folder=folder)
+    assert path == possible_path
+
+    possible_path = ParseTool.get_possible_path(file_name, base_folder=folder)
+    assert path == possible_path
+
+    possible_path = ParseTool.get_possible_path(file_name, base_folder=".")
+    assert path != possible_path
+
+
+@pytest.mark.parametrize(
+    "task_type, expect",
+    [
+        ("shell", tasks.Shell),
+        ("Shell", tasks.Shell),
+        ("ShEll", tasks.Shell),
+        ("Condition", tasks.Condition),
+        ("DataX", tasks.DataX),
+        ("CustomDataX", tasks.CustomDataX),
+        ("Dependent", tasks.Dependent),
+        ("Flink", tasks.Flink),
+        ("Http", tasks.Http),
+        ("MR", tasks.MR),
+        ("Procedure", tasks.Procedure),
+        ("Python", tasks.Python),
+        ("Shell", tasks.Shell),
+        ("Spark", tasks.Spark),
+        ("Sql", tasks.Sql),
+        ("SubProcess", tasks.SubProcess),
+        ("Switch", tasks.Switch),
+        ("SageMaker", tasks.SageMaker),
+    ],
+)
+def test_get_task(task_type, expect):
+    """Test get task function."""
+    assert expect == get_task_cls(task_type)
+
+
+@pytest.mark.parametrize(
+    "task_type",
+    [
+        ("MYSQL"),
+    ],
+)
+def test_get_error(task_type):
+    """Test get task cls error."""
+    with pytest.raises(
+        PyDSTaskNoFoundException,
+        match=f"not find task {task_type}",
+    ):
+        get_task_cls(task_type)
+
+
+@pytest.mark.parametrize(
+    "yaml_file",
+    [
+        ("Condition.yaml"),
+        ("DataX.yaml"),
+        ("Dependent.yaml"),
+        ("Flink.yaml"),
+        ("Procedure.yaml"),
+        ("Http.yaml"),
+        ("MapReduce.yaml"),
+        ("Python.yaml"),
+        ("Shell.yaml"),
+        ("Spark.yaml"),
+        ("Sql.yaml"),
+        ("SubProcess.yaml"),
+        # ("Switch.yaml"),
+        ("MoreConfiguration.yaml"),
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.engine.Engine.get_resource_info",
+    return_value=({"id": 1, "name": "test"}),
+)
+@patch(
+    "pydolphinscheduler.core.database.Database.get_database_info",
+    return_value=({"id": 1, "type": "mock_type"}),
+)
+@patch(
+    "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
+    return_value={
+        "projectCode": 0,
+        "processDefinitionCode": 0,
+        "taskDefinitionCode": 0,
+    },
+)
+@patch.object(ProcessDefinition, "run")
+@patch.object(ProcessDefinition, "submit")
+def test_get_create_process_definition(
+    prun, psubmit, dep_item, db_info, resource_info, yaml_file
+):
+    """Test create_process_definition function to parse example YAML file."""
+    yaml_file_path = Path(path_yaml_example).joinpath(yaml_file)
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
+    ):
+        create_process_definition(yaml_file_path)
diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py
index a9cd352..236956b 100644
--- a/tests/integration/conftest.py
+++ b/tests/integration/conftest.py
@@ -17,6 +17,8 @@
 
 """py.test conftest.py file for package integration test."""
 
+import os
+
 import pytest
 
 from tests.testing.docker_wrapper import DockerWrapper
@@ -33,14 +35,17 @@ def docker_setup_teardown():
         For more information about conftest.py see:
         https://docs.pytest.org/en/latest/example/simple.html#package-directory-level-fixtures-setups
     """
-    docker_wrapper = DockerWrapper(
-        image="apache/dolphinscheduler-standalone-server:ci",
-        container_name="ci-dolphinscheduler-standalone-server",
-    )
-    ports = {"25333/tcp": 25333}
-    container = docker_wrapper.run_until_log(
-        log="Started StandaloneServer in", tty=True, ports=ports
-    )
-    assert container is not None
-    yield
-    docker_wrapper.remove_container()
+    if os.environ.get("skip_launch_docker") == "true":
+        yield True
+    else:
+        docker_wrapper = DockerWrapper(
+            image="apache/dolphinscheduler-standalone-server:ci",
+            container_name="ci-dolphinscheduler-standalone-server",
+        )
+        ports = {"25333/tcp": 25333}
+        container = docker_wrapper.run_until_log(
+            log="Started StandaloneServer in", tty=True, ports=ports
+        )
+        assert container is not None
+        yield
+        docker_wrapper.remove_container()
diff --git a/tests/tasks/test_condition.py b/tests/tasks/test_condition.py
index 5232640..72eec28 100644
--- a/tests/tasks/test_condition.py
+++ b/tests/tasks/test_condition.py
@@ -381,6 +381,7 @@ def test_condition_get_define(mock_condition_code_version, mock_task_code_versio
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/tests/tasks/test_datax.py b/tests/tasks/test_datax.py
index 9473f57..5d1890e 100644
--- a/tests/tasks/test_datax.py
+++ b/tests/tasks/test_datax.py
@@ -67,6 +67,7 @@ def test_datax_get_define(mock_datasource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
@@ -108,6 +109,7 @@ def test_custom_datax_get_define(json_template):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/tests/tasks/test_dependent.py b/tests/tasks/test_dependent.py
index f16e291..f55700e 100644
--- a/tests/tasks/test_dependent.py
+++ b/tests/tasks/test_dependent.py
@@ -782,6 +782,7 @@ def test_dependent_get_define(mock_code_version, mock_dep_code):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/tests/tasks/test_dvc.py b/tests/tasks/test_dvc.py
new file mode 100644
index 0000000..815d896
--- /dev/null
+++ b/tests/tasks/test_dvc.py
@@ -0,0 +1,173 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Dvc."""
+from unittest.mock import patch
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.tasks.dvc import DVCDownload, DVCInit, DvcTaskType, DVCUpload
+
+repository = "git@github.com:<YOUR-NAME-OR-ORG>/dvc-data-repository-example.git"
+
+
+def test_dvc_init_get_define():
+    """Test task dvc init function get_define."""
+    name = "test_dvc_init"
+    dvc_store_url = "~/dvc_data"
+
+    code = 123
+    version = 1
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": TaskType.DVC,
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "dvcTaskType": DvcTaskType.INIT,
+            "dvcRepository": repository,
+            "dvcStoreUrl": dvc_store_url,
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "environmentCode": None,
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        dvc_init = DVCInit(name, repository, dvc_store_url)
+        assert dvc_init.get_define() == expect
+
+
+def test_dvc_upload_get_define():
+    """Test task dvc upload function get_define."""
+    name = "test_dvc_upload"
+    data_path_in_dvc_repository = "iris"
+    data_path_in_worker = "~/source/iris"
+    version = "v1"
+    message = "upload iris data v1"
+
+    code = 123
+    version = 1
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": TaskType.DVC,
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "dvcTaskType": DvcTaskType.UPLOAD,
+            "dvcRepository": repository,
+            "dvcDataLocation": data_path_in_dvc_repository,
+            "dvcLoadSaveDataPath": data_path_in_worker,
+            "dvcVersion": version,
+            "dvcMessage": message,
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "environmentCode": None,
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        dvc_upload = DVCUpload(
+            name,
+            repository=repository,
+            data_path_in_dvc_repository=data_path_in_dvc_repository,
+            data_path_in_worker=data_path_in_worker,
+            version=version,
+            message=message,
+        )
+        assert dvc_upload.get_define() == expect
+
+
+def test_dvc_download_get_define():
+    """Test task dvc download function get_define."""
+    name = "test_dvc_upload"
+    data_path_in_dvc_repository = "iris"
+    data_path_in_worker = "~/target/iris"
+    version = "v1"
+
+    code = 123
+    version = 1
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": TaskType.DVC,
+        "taskParams": {
+            "resourceList": [],
+            "localParams": [],
+            "dvcTaskType": DvcTaskType.DOWNLOAD,
+            "dvcRepository": repository,
+            "dvcDataLocation": data_path_in_dvc_repository,
+            "dvcLoadSaveDataPath": data_path_in_worker,
+            "dvcVersion": version,
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "environmentCode": None,
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        dvc_download = DVCDownload(
+            name,
+            repository=repository,
+            data_path_in_dvc_repository=data_path_in_dvc_repository,
+            data_path_in_worker=data_path_in_worker,
+            version=version,
+        )
+        assert dvc_download.get_define() == expect
diff --git a/tests/tasks/test_flink.py b/tests/tasks/test_flink.py
index 92ae3ba..2f30a49 100644
--- a/tests/tasks/test_flink.py
+++ b/tests/tasks/test_flink.py
@@ -68,6 +68,7 @@ def test_flink_get_define(mock_resource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/tests/tasks/test_http.py b/tests/tasks/test_http.py
index 060cdec..399829b 100644
--- a/tests/tasks/test_http.py
+++ b/tests/tasks/test_http.py
@@ -130,6 +130,7 @@ def test_http_get_define():
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/tests/tasks/test_map_reduce.py b/tests/tasks/test_map_reduce.py
index dbe9e51..5d38e93 100644
--- a/tests/tasks/test_map_reduce.py
+++ b/tests/tasks/test_map_reduce.py
@@ -61,6 +61,7 @@ def test_mr_get_define(mock_resource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/tests/tasks/test_mlflow.py b/tests/tasks/test_mlflow.py
new file mode 100644
index 0000000..af0a324
--- /dev/null
+++ b/tests/tasks/test_mlflow.py
@@ -0,0 +1,205 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task MLflow."""
+from copy import deepcopy
+from unittest.mock import patch
+
+from pydolphinscheduler.tasks.mlflow import (
+    MLflowDeployType,
+    MLflowJobType,
+    MLflowModels,
+    MLFlowProjectsAutoML,
+    MLFlowProjectsBasicAlgorithm,
+    MLFlowProjectsCustom,
+    MLflowTaskType,
+)
+
+CODE = 123
+VERSION = 1
+MLFLOW_TRACKING_URI = "http://127.0.0.1:5000"
+
+EXPECT = {
+    "code": CODE,
+    "version": VERSION,
+    "description": None,
+    "delayTime": 0,
+    "taskType": "MLFLOW",
+    "taskParams": {
+        "resourceList": [],
+        "localParams": [],
+        "dependence": {},
+        "conditionResult": {"successNode": [""], "failedNode": [""]},
+        "waitStartTimeout": {},
+    },
+    "flag": "YES",
+    "taskPriority": "MEDIUM",
+    "workerGroup": "default",
+    "environmentCode": None,
+    "failRetryTimes": 0,
+    "failRetryInterval": 1,
+    "timeoutFlag": "CLOSE",
+    "timeoutNotifyStrategy": None,
+    "timeout": 0,
+}
+
+
+def test_mlflow_models_get_define():
+    """Test task mlflow models function get_define."""
+    name = "mlflow_models"
+    model_uri = "models:/xgboost_native/Production"
+    port = 7001
+
+    expect = deepcopy(EXPECT)
+    expect["name"] = name
+    task_params = expect["taskParams"]
+    task_params["mlflowTrackingUri"] = MLFLOW_TRACKING_URI
+    task_params["mlflowTaskType"] = MLflowTaskType.MLFLOW_MODELS
+    task_params["deployType"] = MLflowDeployType.DOCKER
+    task_params["deployModelKey"] = model_uri
+    task_params["deployPort"] = port
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(CODE, VERSION),
+    ):
+        task = MLflowModels(
+            name=name,
+            model_uri=model_uri,
+            mlflow_tracking_uri=MLFLOW_TRACKING_URI,
+            deploy_mode=MLflowDeployType.DOCKER,
+            port=port,
+        )
+        assert task.get_define() == expect
+
+
+def test_mlflow_project_custom_get_define():
+    """Test task mlflow project custom function get_define."""
+    name = ("train_xgboost_native",)
+    repository = "https://github.com/mlflow/mlflow#examples/xgboost/xgboost_native"
+    mlflow_tracking_uri = MLFLOW_TRACKING_URI
+    parameters = "-P learning_rate=0.2 -P colsample_bytree=0.8 -P subsample=0.9"
+    experiment_name = "xgboost"
+
+    expect = deepcopy(EXPECT)
+    expect["name"] = name
+    task_params = expect["taskParams"]
+
+    task_params["mlflowTrackingUri"] = MLFLOW_TRACKING_URI
+    task_params["mlflowTaskType"] = MLflowTaskType.MLFLOW_PROJECTS
+    task_params["mlflowJobType"] = MLflowJobType.CUSTOM_PROJECT
+    task_params["experimentName"] = experiment_name
+    task_params["params"] = parameters
+    task_params["mlflowProjectRepository"] = repository
+    task_params["mlflowProjectVersion"] = "dev"
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(CODE, VERSION),
+    ):
+        task = MLFlowProjectsCustom(
+            name=name,
+            repository=repository,
+            mlflow_tracking_uri=mlflow_tracking_uri,
+            parameters=parameters,
+            experiment_name=experiment_name,
+            version="dev",
+        )
+        assert task.get_define() == expect
+
+
+def test_mlflow_project_automl_get_define():
+    """Test task mlflow project automl function get_define."""
+    name = ("train_automl",)
+    mlflow_tracking_uri = MLFLOW_TRACKING_URI
+    parameters = "time_budget=30;estimator_list=['lgbm']"
+    experiment_name = "automl_iris"
+    model_name = "iris_A"
+    automl_tool = "flaml"
+    data_path = "/data/examples/iris"
+
+    expect = deepcopy(EXPECT)
+    expect["name"] = name
+    task_params = expect["taskParams"]
+
+    task_params["mlflowTrackingUri"] = MLFLOW_TRACKING_URI
+    task_params["mlflowTaskType"] = MLflowTaskType.MLFLOW_PROJECTS
+    task_params["mlflowJobType"] = MLflowJobType.AUTOML
+    task_params["experimentName"] = experiment_name
+    task_params["modelName"] = model_name
+    task_params["registerModel"] = bool(model_name)
+    task_params["dataPath"] = data_path
+    task_params["params"] = parameters
+    task_params["automlTool"] = automl_tool
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(CODE, VERSION),
+    ):
+        task = MLFlowProjectsAutoML(
+            name=name,
+            mlflow_tracking_uri=mlflow_tracking_uri,
+            parameters=parameters,
+            experiment_name=experiment_name,
+            model_name=model_name,
+            automl_tool=automl_tool,
+            data_path=data_path,
+        )
+    assert task.get_define() == expect
+
+
+def test_mlflow_project_basic_algorithm_get_define():
+    """Test task mlflow project BasicAlgorithm function get_define."""
+    name = "train_basic_algorithm"
+    mlflow_tracking_uri = MLFLOW_TRACKING_URI
+    parameters = "n_estimators=200;learning_rate=0.2"
+    experiment_name = "basic_algorithm_iris"
+    model_name = "iris_B"
+    algorithm = "lightgbm"
+    data_path = "/data/examples/iris"
+    search_params = "max_depth=[5, 10];n_estimators=[100, 200]"
+
+    expect = deepcopy(EXPECT)
+    expect["name"] = name
+    task_params = expect["taskParams"]
+
+    task_params["mlflowTrackingUri"] = MLFLOW_TRACKING_URI
+    task_params["mlflowTaskType"] = MLflowTaskType.MLFLOW_PROJECTS
+    task_params["mlflowJobType"] = MLflowJobType.BASIC_ALGORITHM
+    task_params["experimentName"] = experiment_name
+    task_params["modelName"] = model_name
+    task_params["registerModel"] = bool(model_name)
+    task_params["dataPath"] = data_path
+    task_params["params"] = parameters
+    task_params["algorithm"] = algorithm
+    task_params["searchParams"] = search_params
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(CODE, VERSION),
+    ):
+        task = MLFlowProjectsBasicAlgorithm(
+            name=name,
+            mlflow_tracking_uri=mlflow_tracking_uri,
+            parameters=parameters,
+            experiment_name=experiment_name,
+            model_name=model_name,
+            algorithm=algorithm,
+            data_path=data_path,
+            search_params=search_params,
+        )
+    assert task.get_define() == expect
diff --git a/tests/tasks/test_map_reduce.py b/tests/tasks/test_openmldb.py
similarity index 64%
copy from tests/tasks/test_map_reduce.py
copy to tests/tasks/test_openmldb.py
index dbe9e51..f580ab0 100644
--- a/tests/tasks/test_map_reduce.py
+++ b/tests/tasks/test_openmldb.py
@@ -15,45 +15,42 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test Task MR."""
-
+"""Test Task OpenMLDB."""
 from unittest.mock import patch
 
-from pydolphinscheduler.tasks.map_reduce import MR, ProgramType
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.tasks.openmldb import OpenMLDB
+
+
+def test_openmldb_get_define():
+    """Test task openmldb function get_define."""
+    zookeeper = "127.0.0.1:2181"
+    zookeeper_path = "/openmldb"
+    execute_mode = "offline"
 
+    sql = """USE demo_db;
+    set @@job_timeout=200000;
+    LOAD DATA INFILE 'file:///tmp/train_sample.csv'
+    INTO TABLE talkingdata OPTIONS(mode='overwrite');
+    """
 
-@patch(
-    "pydolphinscheduler.core.engine.Engine.get_resource_info",
-    return_value=({"id": 1, "name": "test"}),
-)
-def test_mr_get_define(mock_resource):
-    """Test task mr function get_define."""
     code = 123
     version = 1
-    name = "test_mr_get_define"
-    main_class = "org.apache.mr.test_main_class"
-    main_package = "test_main_package"
-    program_type = ProgramType.JAVA
-    main_args = "/dolphinscheduler/resources/file.txt /output/ds"
-
+    name = "test_openmldb_get_define"
     expect = {
         "code": code,
         "name": name,
         "version": 1,
         "description": None,
         "delayTime": 0,
-        "taskType": "MR",
+        "taskType": TaskType.OPENMLDB,
         "taskParams": {
-            "mainClass": main_class,
-            "mainJar": {
-                "id": 1,
-            },
-            "programType": program_type,
-            "appName": None,
-            "mainArgs": main_args,
-            "others": None,
-            "localParams": [],
             "resourceList": [],
+            "localParams": [],
+            "zk": zookeeper,
+            "zkPath": zookeeper_path,
+            "executeMode": execute_mode,
+            "sql": sql,
             "dependence": {},
             "conditionResult": {"successNode": [""], "failedNode": [""]},
             "waitStartTimeout": {},
@@ -61,6 +58,7 @@ def test_mr_get_define(mock_resource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
@@ -71,5 +69,5 @@ def test_mr_get_define(mock_resource):
         "pydolphinscheduler.core.task.Task.gen_code_and_version",
         return_value=(code, version),
     ):
-        task = MR(name, main_class, main_package, program_type, main_args=main_args)
-        assert task.get_define() == expect
+        openmldb = OpenMLDB(name, zookeeper, zookeeper_path, execute_mode, sql)
+        assert openmldb.get_define() == expect
diff --git a/tests/tasks/test_procedure.py b/tests/tasks/test_procedure.py
index 1782593..80afe7b 100644
--- a/tests/tasks/test_procedure.py
+++ b/tests/tasks/test_procedure.py
@@ -96,6 +96,7 @@ def test_sql_get_define(mock_datasource, mock_code_version):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/tests/tasks/test_python.py b/tests/tasks/test_python.py
index 1cdd85d..e8f7f10 100644
--- a/tests/tasks/test_python.py
+++ b/tests/tasks/test_python.py
@@ -132,6 +132,7 @@ def test_python_get_define(name, script_code, raw):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/tests/tasks/test_pytorch.py b/tests/tasks/test_pytorch.py
new file mode 100644
index 0000000..eccb51c
--- /dev/null
+++ b/tests/tasks/test_pytorch.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Pytorch."""
+from copy import deepcopy
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.tasks.pytorch import DEFAULT, Pytorch
+from tests.testing.task import Task
+
+CODE = 123
+VERSION = 1
+
+EXPECT = {
+    "code": CODE,
+    "version": VERSION,
+    "description": None,
+    "delayTime": 0,
+    "taskType": "PYTORCH",
+    "taskParams": {
+        "resourceList": [],
+        "localParams": [],
+        "dependence": {},
+        "conditionResult": {"successNode": [""], "failedNode": [""]},
+        "waitStartTimeout": {},
+    },
+    "flag": "YES",
+    "taskPriority": "MEDIUM",
+    "workerGroup": "default",
+    "environmentCode": None,
+    "failRetryTimes": 0,
+    "failRetryInterval": 1,
+    "timeoutFlag": "CLOSE",
+    "timeoutNotifyStrategy": None,
+    "timeout": 0,
+}
+
+
+def test_pytorch_get_define():
+    """Test task pytorch function get_define."""
+    name = "task_conda_env"
+    script = "main.py"
+    script_params = "--dry-run --no-cuda"
+    project_path = "https://github.com/pytorch/examples#mnist"
+    is_create_environment = True
+    python_env_tool = "conda"
+    requirements = "requirements.txt"
+    conda_python_version = "3.7"
+
+    expect = deepcopy(EXPECT)
+    expect["name"] = name
+    task_params = expect["taskParams"]
+
+    task_params["script"] = script
+    task_params["scriptParams"] = script_params
+    task_params["pythonPath"] = project_path
+    task_params["otherParams"] = True
+    task_params["isCreateEnvironment"] = is_create_environment
+    task_params["pythonCommand"] = "${PYTHON_HOME}"
+    task_params["pythonEnvTool"] = python_env_tool
+    task_params["requirements"] = requirements
+    task_params["condaPythonVersion"] = conda_python_version
+
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(CODE, VERSION),
+    ):
+        task = Pytorch(
+            name=name,
+            script=script,
+            script_params=script_params,
+            project_path=project_path,
+            is_create_environment=is_create_environment,
+            python_env_tool=python_env_tool,
+            requirements=requirements,
+        )
+        assert task.get_define() == expect
+
+
+@pytest.mark.parametrize(
+    "is_create_environment, project_path, python_command, expect",
+    [
+        (
+            DEFAULT.is_create_environment,
+            DEFAULT.project_path,
+            DEFAULT.python_command,
+            False,
+        ),
+        (True, DEFAULT.project_path, DEFAULT.python_command, True),
+        (DEFAULT.is_create_environment, "/home", DEFAULT.python_command, True),
+        (DEFAULT.is_create_environment, DEFAULT.project_path, "/usr/bin/python", True),
+    ],
+)
+def test_other_params(is_create_environment, project_path, python_command, expect):
+    """Test task pytorch function other_params."""
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
+    ):
+        task = Pytorch(
+            name="test",
+            script="",
+            script_params="",
+            project_path=project_path,
+            is_create_environment=is_create_environment,
+            python_command=python_command,
+        )
+        assert task.other_params == expect
diff --git a/tests/tasks/test_shell.py b/tests/tasks/test_sagemaker.py
similarity index 65%
copy from tests/tasks/test_shell.py
copy to tests/tasks/test_sagemaker.py
index e42f6dc..20edc22 100644
--- a/tests/tasks/test_shell.py
+++ b/tests/tasks/test_sagemaker.py
@@ -15,23 +15,36 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test Task shell."""
-
-
+"""Test Task SageMaker."""
+import json
 from unittest.mock import patch
 
 import pytest
 
-from pydolphinscheduler.tasks.shell import Shell
+from pydolphinscheduler.tasks.sagemaker import SageMaker
+
+sagemaker_request_json = json.dumps(
+    {
+        "ParallelismConfiguration": {"MaxParallelExecutionSteps": 1},
+        "PipelineExecutionDescription": "test Pipeline",
+        "PipelineExecutionDisplayName": "AbalonePipeline",
+        "PipelineName": "AbalonePipeline",
+        "PipelineParameters": [
+            {"Name": "ProcessingInstanceType", "Value": "ml.m4.xlarge"},
+            {"Name": "ProcessingInstanceCount", "Value": "2"},
+        ],
+    },
+    indent=2,
+)
 
 
 @pytest.mark.parametrize(
     "attr, expect",
     [
         (
-            {"command": "test script"},
+            {"sagemaker_request_json": sagemaker_request_json},
             {
-                "rawScript": "test script",
+                "sagemakerRequestJson": sagemaker_request_json,
                 "localParams": [],
                 "resourceList": [],
                 "dependence": {},
@@ -46,28 +59,27 @@ from pydolphinscheduler.tasks.shell import Shell
     return_value=(123, 1),
 )
 def test_property_task_params(mock_code_version, attr, expect):
-    """Test task shell task property."""
-    task = Shell("test-shell-task-params", **attr)
+    """Test task sagemaker task property."""
+    task = SageMaker("test-sagemaker-task-params", **attr)
     assert expect == task.task_params
 
 
-def test_shell_get_define():
-    """Test task shell function get_define."""
+def test_sagemaker_get_define():
+    """Test task sagemaker function get_define."""
     code = 123
     version = 1
-    name = "test_shell_get_define"
-    command = "echo test shell"
+    name = "test_sagemaker_get_define"
     expect = {
         "code": code,
         "name": name,
         "version": 1,
         "description": None,
         "delayTime": 0,
-        "taskType": "SHELL",
+        "taskType": "SAGEMAKER",
         "taskParams": {
             "resourceList": [],
             "localParams": [],
-            "rawScript": command,
+            "sagemakerRequestJson": sagemaker_request_json,
             "dependence": {},
             "conditionResult": {"successNode": [""], "failedNode": [""]},
             "waitStartTimeout": {},
@@ -75,6 +87,7 @@ def test_shell_get_define():
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
@@ -85,5 +98,5 @@ def test_shell_get_define():
         "pydolphinscheduler.core.task.Task.gen_code_and_version",
         return_value=(code, version),
     ):
-        shell = Shell(name, command)
-        assert shell.get_define() == expect
+        sagemaker = SageMaker(name, sagemaker_request_json)
+        assert sagemaker.get_define() == expect
diff --git a/tests/tasks/test_shell.py b/tests/tasks/test_shell.py
index e42f6dc..e2c87d8 100644
--- a/tests/tasks/test_shell.py
+++ b/tests/tasks/test_shell.py
@@ -62,6 +62,7 @@ def test_shell_get_define():
         "name": name,
         "version": 1,
         "description": None,
+        "environmentCode": None,
         "delayTime": 0,
         "taskType": "SHELL",
         "taskParams": {
@@ -75,6 +76,7 @@ def test_shell_get_define():
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
@@ -86,4 +88,5 @@ def test_shell_get_define():
         return_value=(code, version),
     ):
         shell = Shell(name, command)
+        print(shell.get_define())
         assert shell.get_define() == expect
diff --git a/tests/tasks/test_spark.py b/tests/tasks/test_spark.py
index 3b0672f..ed83f9f 100644
--- a/tests/tasks/test_spark.py
+++ b/tests/tasks/test_spark.py
@@ -68,6 +68,7 @@ def test_spark_get_define(mock_resource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/tests/tasks/test_sql.py b/tests/tasks/test_sql.py
index 50ccd94..ba9daa9 100644
--- a/tests/tasks/test_sql.py
+++ b/tests/tasks/test_sql.py
@@ -152,6 +152,7 @@ def test_sql_get_define(mock_datasource):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/tests/tasks/test_sub_process.py b/tests/tasks/test_sub_process.py
index 7f471a1..126ab10 100644
--- a/tests/tasks/test_sub_process.py
+++ b/tests/tasks/test_sub_process.py
@@ -99,6 +99,7 @@ def test_sub_process_get_define(mock_process_definition):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/tests/tasks/test_switch.py b/tests/tasks/test_switch.py
index 1f6ff5b..6f9222c 100644
--- a/tests/tasks/test_switch.py
+++ b/tests/tasks/test_switch.py
@@ -236,8 +236,6 @@ def test_switch_get_define(mock_task_code_version):
         "taskParams": {
             "resourceList": [],
             "localParams": [],
-            "dependence": {},
-            "conditionResult": {"successNode": [""], "failedNode": [""]},
             "waitStartTimeout": {},
             "switchResult": {
                 "dependTaskList": [
@@ -250,6 +248,7 @@ def test_switch_get_define(mock_task_code_version):
         "flag": "YES",
         "taskPriority": "MEDIUM",
         "workerGroup": "default",
+        "environmentCode": None,
         "failRetryTimes": 0,
         "failRetryInterval": 1,
         "timeoutFlag": "CLOSE",
diff --git a/tests/testing/path.py b/tests/testing/path.py
index 68d93c4..974ab3d 100644
--- a/tests/testing/path.py
+++ b/tests/testing/path.py
@@ -24,6 +24,7 @@ project_root = Path(__file__).parent.parent.parent
 
 path_code_tasks = project_root.joinpath("src", "pydolphinscheduler", "tasks")
 path_example = project_root.joinpath("src", "pydolphinscheduler", "examples")
+path_yaml_example = project_root.joinpath("examples", "yaml_define")
 path_doc_tasks = project_root.joinpath("docs", "source", "tasks")
 path_default_config_yaml = project_root.joinpath(
     "src", "pydolphinscheduler", "default_config.yaml"
diff --git a/tox.ini b/tox.ini
index d90e8a3..4ce8043 100644
--- a/tox.ini
+++ b/tox.ini
@@ -63,6 +63,14 @@ extras = test
 commands =
     python -m pytest tests/integration/
 
+[testenv:local-integrate-test]
+extras = test
+setenv =
+    skip_launch_docker = true
+commands =
+    {[testenv:integrate-test]commands}
+
+# local-ci do not build `doc-build-multi`
 [testenv:local-ci]
 extras = dev
 commands =