You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/02/22 06:18:36 UTC

[dolphinscheduler] branch 2.0.4-prepare updated: [cherry-pick][python] Pick laster code from dev branch (#8474)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch 2.0.4-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.4-prepare by this push:
     new 97ba97e  [cherry-pick][python] Pick laster code from dev branch (#8474)
97ba97e is described below

commit 97ba97e891da6505b28c68e79ac0c27e9517466a
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Tue Feb 22 14:18:28 2022 +0800

    [cherry-pick][python] Pick laster code from dev branch (#8474)
    
    * [cherry-pick] Cherry pick python api form branch dev
    
    * Also change python gateway server code
    
    * Add .gitkeep to license ignore
---
 .licenserc.yaml                                    |   1 +
 dolphinscheduler-python/pydolphinscheduler/.flake8 |   1 +
 .../pydolphinscheduler/{README.md => DEVELOP.md}   |  85 ++--------
 .../pydolphinscheduler/README.md                   | 152 ++++--------------
 .../pydolphinscheduler/RELEASE.md                  |  35 +++++
 .../pydolphinscheduler/ROADMAP.md                  |  34 ----
 .../tasks/shell.py => docs/Makefile}               |  33 ++--
 .../pydolphinscheduler/docs/make.bat               |  54 +++++++
 .../docs/source/_static/.gitkeep                   |   0
 .../pydolphinscheduler/docs/source/api.rst         |  47 ++++++
 .../pydolphinscheduler/docs/source/concept.rst     | 151 ++++++++++++++++++
 .../pydolphinscheduler/docs/source/conf.py         |  88 +++++++++++
 .../pydolphinscheduler/docs/source/index.rst       |  42 +++++
 .../pydolphinscheduler/docs/source/start.rst       | 113 ++++++++++++++
 .../docs/source/tasks/condition.rst                |  33 ++++
 .../pydolphinscheduler/docs/source/tasks/datax.rst |  33 ++++
 .../docs/source/tasks/dependent.rst                |  33 ++++
 .../pydolphinscheduler/docs/source/tasks/flink.rst |  33 ++++
 .../pydolphinscheduler/docs/source/tasks/http.rst  |  21 +++
 .../pydolphinscheduler/docs/source/tasks/index.rst |  41 +++++
 .../docs/source/tasks/map_reduce.rst               |  34 ++++
 .../docs/source/tasks/procedure.rst                |  21 +++
 .../docs/source/tasks/python.rst                   |  21 +++
 .../pydolphinscheduler/docs/source/tasks/shell.rst |  33 ++++
 .../pydolphinscheduler/docs/source/tasks/spark.rst |  33 ++++
 .../pydolphinscheduler/docs/source/tasks/sql.rst   |  21 +++
 .../docs/source/tasks/sub_process.rst              |  21 +++
 .../docs/source/tasks/switch.rst                   |  33 ++++
 .../pydolphinscheduler/docs/source/tutorial.rst    | 150 ++++++++++++++++++
 .../pydolphinscheduler/setup.py                    |  60 +++++--
 .../src/pydolphinscheduler/__init__.py             |   4 +
 .../src/pydolphinscheduler/constants.py            |   7 +
 .../src/pydolphinscheduler/core/__init__.py        |  10 ++
 .../src/pydolphinscheduler/core/database.py        |   7 +-
 .../src/pydolphinscheduler/core/engine.py          |  95 ++++++++++++
 .../pydolphinscheduler/core/process_definition.py  |  39 ++++-
 .../src/pydolphinscheduler/core/task.py            |  14 +-
 .../pydolphinscheduler/{ => examples}/__init__.py  |   2 +-
 .../examples/bulk_create_example.py}               |   0
 .../examples/task_condition_example.py}            |  12 +-
 .../examples/task_datax_example.py                 |  51 +++---
 .../examples/task_dependent_example.py             |   4 +-
 .../__init__.py => examples/task_flink_example.py} |  19 ++-
 .../task_map_reduce_example.py}                    |  34 ++--
 .../__init__.py => examples/task_spark_example.py} |  19 ++-
 .../examples/task_switch_example.py                |  13 +-
 .../pydolphinscheduler}/examples/tutorial.py       |  24 ++-
 .../src/pydolphinscheduler/java_gateway.py         |  13 +-
 .../src/pydolphinscheduler/side/__init__.py        |  10 ++
 .../src/pydolphinscheduler/tasks/__init__.py       |  30 ++++
 .../src/pydolphinscheduler/tasks/condition.py      |   3 +-
 .../src/pydolphinscheduler/tasks/flink.py          |  93 +++++++++++
 .../tasks/{shell.py => map_reduce.py}              |  40 +++--
 .../src/pydolphinscheduler/tasks/shell.py          |  23 ++-
 .../src/pydolphinscheduler/tasks/spark.py          |  94 +++++++++++
 .../pydolphinscheduler/tests/core/test_engine.py   | 147 ++++++++++++++++++
 .../tests/core/test_process_definition.py          |  77 +++++++++
 .../example/__init__.py}                           |   2 +-
 .../tests/example/test_example.py                  | 172 +++++++++++++++++++++
 .../tests/tasks/test_condition.py                  |   8 +-
 .../pydolphinscheduler/tests/tasks/test_datax.py   |   2 -
 .../pydolphinscheduler/tests/tasks/test_flink.py   |  82 ++++++++++
 .../tests/tasks/test_map_reduce.py                 |  75 +++++++++
 .../pydolphinscheduler/tests/tasks/test_spark.py   |  82 ++++++++++
 .../pydolphinscheduler/tests/test_docs.py          |  59 +++++++
 .../testing/constants.py}                          |  21 +--
 .../pydolphinscheduler/tests/testing/path.py       |  56 +++++++
 .../server/PythonGatewayServer.java                |  77 +++++++--
 .../server/config/PythonGatewayConfig.java         |  93 +++++++++++
 .../src/main/resources/python-gateway.properties   |  38 +++++
 70 files changed, 2725 insertions(+), 383 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index aabeaf6..0bdc0ea 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -46,5 +46,6 @@ header:
     - '.github/actions/lable-on-issue/**'
     - '.github/actions/reviewdog-setup/**'
     - '.github/actions/translate-on-issue/**'
+    - '**/.gitkeep'
 
   comment: on-failure
diff --git a/dolphinscheduler-python/pydolphinscheduler/.flake8 b/dolphinscheduler-python/pydolphinscheduler/.flake8
index e676972..7f659a2 100644
--- a/dolphinscheduler-python/pydolphinscheduler/.flake8
+++ b/dolphinscheduler-python/pydolphinscheduler/.flake8
@@ -35,3 +35,4 @@ ignore =
     W503   # W503: Line breaks before binary operators
 per-file-ignores =
     src/pydolphinscheduler/side/__init__.py:F401
+    src/pydolphinscheduler/tasks/__init__.py:F401
diff --git a/dolphinscheduler-python/pydolphinscheduler/README.md b/dolphinscheduler-python/pydolphinscheduler/DEVELOP.md
similarity index 56%
copy from dolphinscheduler-python/pydolphinscheduler/README.md
copy to dolphinscheduler-python/pydolphinscheduler/DEVELOP.md
index b06632c..f22ab86 100644
--- a/dolphinscheduler-python/pydolphinscheduler/README.md
+++ b/dolphinscheduler-python/pydolphinscheduler/DEVELOP.md
@@ -17,93 +17,50 @@
  under the License.
 -->
 
-# pydolphinscheduler
+# Develop
 
-[![GitHub Build][ga-py-test]][ga]
-[![Code style: black][black-shield]][black-gh]
-[![Imports: isort][isort-shield]][isort-gh]
-
-pydolphinscheduler is python API for Apache DolphinScheduler, which allow you definition
-your workflow by python code, aka workflow-as-codes.
-
-## Quick Start
-
-> **_Notice:_** For now, due to pydolphinscheduler without release to any binary tarball or [PyPI][pypi], you 
-> have to clone Apache DolphinScheduler code from GitHub to ensure quick start setup
+pydolphinscheduler is python API for Apache DolphinScheduler, it just defines what workflow look like instead of
+store or execute it. We here use [py4j][py4j] to dynamically access Java Virtual Machine.
 
-Here we show you how to install and run a simple example of pydolphinscheduler
+## Setup Develop Environment
 
-### Prepare
+**PyDolphinScheduler** use GitHub to hold all source code, you should clone the code before you do same change.
 
 ```shell
-# Clone code from github
 git clone git@github.com:apache/dolphinscheduler.git
-
-# Install pydolphinscheduler from source
-cd dolphinscheduler-python/pydolphinscheduler
-pip install -e .
 ```
 
-### Start Server And Run Example
-
-Before you run an example, you have to start backend server. You could follow [development setup][dev-setup]
-section "DolphinScheduler Standalone Quick Start" to set up developer environment. You have to start backend
-and frontend server in this step, which mean that you could view DolphinScheduler UI in your browser with URL
-http://localhost:12345/dolphinscheduler
-
-After backend server is being start, all requests from `pydolphinscheduler` would be sent to backend server.
-And for now we could run a simple example by:
+Now, we should install all dependence to make sure we could run test or check code style locally
 
 ```shell
-cd dolphinscheduler-python/pydolphinscheduler
-python example/tutorial.py
+cd dolphinscheduler/dolphinscheduler-python/pydolphinscheduler
+pip install .[dev]
 ```
 
-> **_NOTICE:_** Since Apache DolphinScheduler's tenant is requests while running command, you might need to change
-> tenant value in `example/tutorial.py`. For now the value is `tenant_exists`, please change it to username exists
-> in you environment. 
-
-After command execute, you could see a new project with single process definition named *tutorial* in the [UI][ui-project].
-
-Until now, we finish quick start by an example of pydolphinscheduler and run it. If you want to inspect or join
-pydolphinscheduler develop, you could take a look at [develop](#develop)
+Next, we have to open pydolphinscheduler project in you editor. We recommend you use [pycharm][pycharm]
+instead of [IntelliJ IDEA][idea] to open it. And you could just open directory
+`dolphinscheduler-python/pydolphinscheduler` instead of `dolphinscheduler-python`.
 
-## Develop
-
-pydolphinscheduler is python API for Apache DolphinScheduler, it just defines what workflow look like instead of
-store or execute it. We here use [py4j][py4j] to dynamically access Java Virtual Machine.
-
-### Setup Develop Environment
-
-We already clone the code in [quick start](#quick-start), so next step we have to open pydolphinscheduler project
-in you editor. We recommend you use [pycharm][pycharm] instead of [IntelliJ IDEA][idea] to open it. And you could
-just open directory `dolphinscheduler-python/pydolphinscheduler` instead of `dolphinscheduler-python`.
-
-Then you should add developer dependence to make sure you could run test and check code style locally
-
-```shell
-pip install -r requirements_dev.txt
-```
 
-### Brief Concept
+## Brief Concept
 
 Apache DolphinScheduler is design to define workflow by UI, and pydolphinscheduler try to define it by code. When
 define by code, user usually do not care user, tenant, or queue exists or not. All user care about is created
 a new workflow by the code his/her definition. So we have some **side object** in `pydolphinscheduler/side`
 directory, their only check object exists or not, and create them if not exists. 
 
-#### Process Definition
+### Process Definition
 
 pydolphinscheduler workflow object name, process definition is also same name as Java object(maybe would be change to
 other word for more simple).
 
-#### Tasks
+### Tasks
 
 pydolphinscheduler tasks object, we use tasks to define exact job we want DolphinScheduler do for us. For now,
 we only support `shell` task to execute shell task. [This link][all-task] list all tasks support in DolphinScheduler
 and would be implemented in the further.
 
-### Code Style
+## Code Style
 
 We use [isort][isort] to automatically keep Python imports alphabetically, and use [Black][black] for code
 formatter and [Flake8][flake8] for pep8 checker. If you use [pycharm][pycharm]or [IntelliJ IDEA][idea],
@@ -126,7 +83,7 @@ black .
 flake8
 ```
 
-### Testing
+## Testing
 
 pydolphinscheduler using [pytest][pytest] to test our codebase. GitHub Action will run our test when you create
 pull request or commit to dev branch, with python version `3.6|3.7|3.8|3.9` and operating system `linux|macOS|windows`.
@@ -151,9 +108,6 @@ line show you total coverage of you code. If your CI failed with coverage you co
 this command output.
 
 <!-- content -->
-[pypi]: https://pypi.org/
-[dev-setup]: https://dolphinscheduler.apache.org/en-us/development/development-environment-setup.html
-[ui-project]: http://8.142.34.29:12345/dolphinscheduler/ui/#/projects/list
 [py4j]: https://www.py4j.org/index.html
 [pycharm]: https://www.jetbrains.com/pycharm
 [idea]: https://www.jetbrains.com/idea/
@@ -164,10 +118,3 @@ this command output.
 [black-editor]: https://black.readthedocs.io/en/stable/integrations/editors.html#pycharm-intellij-idea
 [coverage]: https://coverage.readthedocs.io/en/stable/
 [isort]: https://pycqa.github.io/isort/index.html
-<!-- badge -->
-[ga-py-test]: https://github.com/apache/dolphinscheduler/actions/workflows/py-ci.yml/badge.svg?branch=dev
-[ga]: https://github.com/apache/dolphinscheduler/actions
-[black-shield]: https://img.shields.io/badge/code%20style-black-000000.svg
-[black-gh]: https://github.com/psf/black
-[isort-shield]: https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat&labelColor=ef8336
-[isort-gh]: https://pycqa.github.io/isort/
diff --git a/dolphinscheduler-python/pydolphinscheduler/README.md b/dolphinscheduler-python/pydolphinscheduler/README.md
index b06632c..9cc524d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/README.md
+++ b/dolphinscheduler-python/pydolphinscheduler/README.md
@@ -19,34 +19,38 @@
 
 # pydolphinscheduler
 
-[![GitHub Build][ga-py-test]][ga]
-[![Code style: black][black-shield]][black-gh]
-[![Imports: isort][isort-shield]][isort-gh]
+[![PyPi Version](https://img.shields.io/pypi/v/apache-dolphinscheduler.svg?style=flat-square&logo=PyPi)](https://pypi.org/project/apache-dolphinscheduler/)
+[![PyPi Python Versions](https://img.shields.io/pypi/pyversions/apache-dolphinscheduler.svg?style=flat-square&logo=python)](https://pypi.org/project/apache-dolphinscheduler/)
+[![PyPi License](https://img.shields.io/pypi/l/apache-dolphinscheduler.svg?style=flat-square)](https://pypi.org/project/apache-dolphinscheduler/)
+[![PyPi Status](https://img.shields.io/pypi/status/apache-dolphinscheduler.svg?style=flat-square)](https://pypi.org/project/apache-dolphinscheduler/)
+[![PyPi Downloads](https://img.shields.io/pypi/dm/apache-dolphinscheduler?style=flat-square)](https://pypi.org/project/apache-dolphinscheduler/)
 
-pydolphinscheduler is python API for Apache DolphinScheduler, which allow you definition
+[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg?style=flat-square)](https://github.com/psf/black)
+[![Imports: isort](https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat-square&labelColor=ef8336)](https://pycqa.github.io/isort)
+[![GitHub Build](https://github.com/apache/dolphinscheduler/actions/workflows/py-ci.yml/badge.svg?branch=dev)](https://github.com/apache/dolphinscheduler/actions?query=workflow%3A%22Python+API%22)
+
+**PyDolphinScheduler** is python API for Apache DolphinScheduler, which allow you definition
 your workflow by python code, aka workflow-as-codes.
 
 ## Quick Start
 
-> **_Notice:_** For now, due to pydolphinscheduler without release to any binary tarball or [PyPI][pypi], you 
-> have to clone Apache DolphinScheduler code from GitHub to ensure quick start setup
-
-Here we show you how to install and run a simple example of pydolphinscheduler
-
-### Prepare
+### Installation
 
 ```shell
-# Clone code from github
-git clone git@github.com:apache/dolphinscheduler.git
+# Install
+$ pip install apache-dolphinscheduler
 
-# Install pydolphinscheduler from source
-cd dolphinscheduler-python/pydolphinscheduler
-pip install -e .
+# Check installation, it is success if you see version output, here we use 0.1.0 as example
+$ python -c "import pydolphinscheduler; print(pydolphinscheduler.__version__)"
+0.1.0
 ```
 
+Here we show you how to install and run a simple example of pydolphinscheduler
+
 ### Start Server And Run Example
 
-Before you run an example, you have to start backend server. You could follow [development setup][dev-setup]
+Before you run an example, you have to start backend server. You could follow
+[development setup](https://dolphinscheduler.apache.org/en-us/development/development-environment-setup.html)
 section "DolphinScheduler Standalone Quick Start" to set up developer environment. You have to start backend
 and frontend server in this step, which mean that you could view DolphinScheduler UI in your browser with URL
 http://localhost:12345/dolphinscheduler
@@ -54,120 +58,30 @@ http://localhost:12345/dolphinscheduler
 After backend server is being start, all requests from `pydolphinscheduler` would be sent to backend server.
 And for now we could run a simple example by:
 
+<!-- TODO Add examples directory to dist package later. -->
+
 ```shell
-cd dolphinscheduler-python/pydolphinscheduler
-python example/tutorial.py
+# Please make sure your terminal could 
+curl https://raw.githubusercontent.com/apache/dolphinscheduler/dev/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py -o ./tutorial.py
+python ./tutorial.py
 ```
 
 > **_NOTICE:_** Since Apache DolphinScheduler's tenant is requests while running command, you might need to change
 > tenant value in `example/tutorial.py`. For now the value is `tenant_exists`, please change it to username exists
 > in you environment. 
 
-After command execute, you could see a new project with single process definition named *tutorial* in the [UI][ui-project].
-
-Until now, we finish quick start by an example of pydolphinscheduler and run it. If you want to inspect or join
-pydolphinscheduler develop, you could take a look at [develop](#develop)
+After command execute, you could see a new project with single process definition named *tutorial* in the
+[UI-project list](https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/project/project-list.html). 
 
 ## Develop
 
-pydolphinscheduler is python API for Apache DolphinScheduler, it just defines what workflow look like instead of
-store or execute it. We here use [py4j][py4j] to dynamically access Java Virtual Machine.
-
-### Setup Develop Environment
-
-We already clone the code in [quick start](#quick-start), so next step we have to open pydolphinscheduler project
-in you editor. We recommend you use [pycharm][pycharm] instead of [IntelliJ IDEA][idea] to open it. And you could
-just open directory `dolphinscheduler-python/pydolphinscheduler` instead of `dolphinscheduler-python`.
-
-Then you should add developer dependence to make sure you could run test and check code style locally
-
-```shell
-pip install -r requirements_dev.txt
-```
-
-### Brief Concept
-
-Apache DolphinScheduler is design to define workflow by UI, and pydolphinscheduler try to define it by code. When
-define by code, user usually do not care user, tenant, or queue exists or not. All user care about is created
-a new workflow by the code his/her definition. So we have some **side object** in `pydolphinscheduler/side`
-directory, their only check object exists or not, and create them if not exists. 
-
-#### Process Definition
-
-pydolphinscheduler workflow object name, process definition is also same name as Java object(maybe would be change to
-other word for more simple).
-
-#### Tasks
-
-pydolphinscheduler tasks object, we use tasks to define exact job we want DolphinScheduler do for us. For now,
-we only support `shell` task to execute shell task. [This link][all-task] list all tasks support in DolphinScheduler
-and would be implemented in the further.
-
-### Code Style
-
-We use [isort][isort] to automatically keep Python imports alphabetically, and use [Black][black] for code
-formatter and [Flake8][flake8] for pep8 checker. If you use [pycharm][pycharm]or [IntelliJ IDEA][idea],
-maybe you could follow [Black-integration][black-editor] to configure them in your environment.
-
-Our Python API CI would automatically run code style checker and unittest when you submit pull request in
-GitHub, you could also run static check locally.
-
-```shell
-# We recommend you run isort and Black before Flake8, because Black could auto fix some code style issue
-# but Flake8 just hint when code style not match pep8
-
-# Run Isort
-isort .
-
-# Run Black
-black .
-
-# Run Flake8
-flake8
-```
-
-### Testing
+Until now, we finish quick start by an example of pydolphinscheduler and run it. If you want to inspect or join
+pydolphinscheduler develop, you could take a look at [develop](./DEVELOP.md)
 
-pydolphinscheduler using [pytest][pytest] to test our codebase. GitHub Action will run our test when you create
-pull request or commit to dev branch, with python version `3.6|3.7|3.8|3.9` and operating system `linux|macOS|windows`.
+## Release
 
-To test locally, you could directly run pytest after set `PYTHONPATH` 
+If you are interested in how to release **PyDolphinScheduler**, you could go and see at [release](./RELEASE.md)
 
-```shell
-PYTHONPATH=src/ pytest
-```
-
-We try to keep pydolphinscheduler usable through unit test coverage. 90% test coverage is our target, but for
-now, we require test coverage up to 85%, and each pull request leas than 85% would fail our CI step
-`Tests coverage`. We use [coverage][coverage] to check our test coverage, and you could check it locally by
-run command.
-
-```shell
-coverage run && coverage report
-```
+## What's more
 
-It would not only run unit test but also show each file coverage which cover rate less than 100%, and `TOTAL`
-line show you total coverage of you code. If your CI failed with coverage you could go and find some reason by
-this command output.
-
-<!-- content -->
-[pypi]: https://pypi.org/
-[dev-setup]: https://dolphinscheduler.apache.org/en-us/development/development-environment-setup.html
-[ui-project]: http://8.142.34.29:12345/dolphinscheduler/ui/#/projects/list
-[py4j]: https://www.py4j.org/index.html
-[pycharm]: https://www.jetbrains.com/pycharm
-[idea]: https://www.jetbrains.com/idea/
-[all-task]: https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/guide/task/shell.html
-[pytest]: https://docs.pytest.org/en/latest/
-[black]: https://black.readthedocs.io/en/stable/index.html
-[flake8]: https://flake8.pycqa.org/en/latest/index.html
-[black-editor]: https://black.readthedocs.io/en/stable/integrations/editors.html#pycharm-intellij-idea
-[coverage]: https://coverage.readthedocs.io/en/stable/
-[isort]: https://pycqa.github.io/isort/index.html
-<!-- badge -->
-[ga-py-test]: https://github.com/apache/dolphinscheduler/actions/workflows/py-ci.yml/badge.svg?branch=dev
-[ga]: https://github.com/apache/dolphinscheduler/actions
-[black-shield]: https://img.shields.io/badge/code%20style-black-000000.svg
-[black-gh]: https://github.com/psf/black
-[isort-shield]: https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat&labelColor=ef8336
-[isort-gh]: https://pycqa.github.io/isort/
+For more detail information, please go to see **PyDolphinScheduler** [document](https://dolphinscheduler.apache.org/python/index.html)
diff --git a/dolphinscheduler-python/pydolphinscheduler/RELEASE.md b/dolphinscheduler-python/pydolphinscheduler/RELEASE.md
new file mode 100644
index 0000000..6c2b46e
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/RELEASE.md
@@ -0,0 +1,35 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Release
+
+**PyDolphinScheduler** office release is in [ASF Distribution Directory](https://downloads.apache.org/dolphinscheduler/),
+and it should be released together with [apache-dolphinscheduler](https://github.com/apache/dolphinscheduler).
+
+## To ASF Distribution Directory
+
+You could release to [ASF Distribution Directory](https://downloads.apache.org/dolphinscheduler/) according to
+[release guide](https://dolphinscheduler.apache.org/en-us/community/release-prepare.html) in DolphinScheduler
+website.
+
+## To PyPi
+
+[PyPI](https://pypi.org), Python Package Index, is a repository of software for the Python programming language.
+User could install Python package from it. Release to PyPi make user easier to install and try PyDolphinScheduler,
+There is an official way to package project from [PyPA](https://packaging.python.org/en/latest/tutorials/packaging-projects)
diff --git a/dolphinscheduler-python/pydolphinscheduler/ROADMAP.md b/dolphinscheduler-python/pydolphinscheduler/ROADMAP.md
deleted file mode 100644
index 32ad5e2..0000000
--- a/dolphinscheduler-python/pydolphinscheduler/ROADMAP.md
+++ /dev/null
@@ -1,34 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-## Roadmap
-
-### v0.0.3
-
-Add other features, tasks, parameters in DS, keep code coverage up to 90%
-
-### v0.0.2
-
-Add docs about how to use and develop package, code coverage up to 90%, add CI/CD
-for package
-
-### v0.0.1(current)
-
-Setup up POC, for defining DAG with python code, running DAG manually,
-releasing to pypi
\ No newline at end of file
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py b/dolphinscheduler-python/pydolphinscheduler/docs/Makefile
similarity index 52%
copy from dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
copy to dolphinscheduler-python/pydolphinscheduler/docs/Makefile
index 94ad2f4..985198a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/Makefile
@@ -15,24 +15,25 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Task shell."""
-
-from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+# Minimal makefile for Sphinx documentation
+#
 
+# You can set these variables from the command line, and also
+# from the environment for the first two.
 
-class Shell(Task):
-    """Task shell object, declare behavior for shell task to dolphinscheduler.
+# Add opts `turn warnings into errors` strict sphinx-build behavior
+SPHINXOPTS    ?= -W
+SPHINXBUILD   ?= sphinx-build
+SOURCEDIR     = source
+BUILDDIR      = build
 
-    TODO maybe we could use instance name to replace attribute `name`
-    which is simplify as `task_shell = Shell(command = "echo 1")` and
-    task.name assign to `task_shell`
-    """
+# Put it first so that "make" without argument is like "make help".
+help:
+	@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
 
-    _task_custom_attr = {
-        "raw_script",
-    }
+.PHONY: help Makefile
 
-    def __init__(self, name: str, command: str, *args, **kwargs):
-        super().__init__(name, TaskType.SHELL, *args, **kwargs)
-        self.raw_script = command
+# Catch-all target: route all unknown targets to Sphinx using the new
+# "make mode" option.  $(O) is meant as a shortcut for $(SPHINXOPTS).
+%: Makefile
+	@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/make.bat b/dolphinscheduler-python/pydolphinscheduler/docs/make.bat
new file mode 100644
index 0000000..feac4c9
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/make.bat
@@ -0,0 +1,54 @@
+REM Licensed to the Apache Software Foundation (ASF) under one
+REM or more contributor license agreements.  See the NOTICE file
+REM distributed with this work for additional information
+REM regarding copyright ownership.  The ASF licenses this file
+REM to you under the Apache License, Version 2.0 (the
+REM "License"); you may not use this file except in compliance
+REM with the License.  You may obtain a copy of the License at
+REM 
+REM   http://www.apache.org/licenses/LICENSE-2.0
+REM 
+REM Unless required by applicable law or agreed to in writing,
+REM software distributed under the License is distributed on an
+REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+REM KIND, either express or implied.  See the License for the
+REM specific language governing permissions and limitations
+REM under the License.
+
+@ECHO OFF
+
+pushd %~dp0
+
+REM Command file for Sphinx documentation
+
+if "%SPHINXBUILD%" == "" (
+	set SPHINXBUILD=sphinx-build
+)
+set SOURCEDIR=source
+set BUILDDIR=build
+REM Add opts `turn warnings into errors` strict sphinx-build behavior
+set SPHINXOPTS=-W
+
+if "%1" == "" goto help
+
+%SPHINXBUILD% >NUL 2>NUL
+if errorlevel 9009 (
+	echo.
+	echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
+	echo.installed, then set the SPHINXBUILD environment variable to point
+	echo.to the full path of the 'sphinx-build' executable. Alternatively you
+	echo.may add the Sphinx directory to PATH.
+	echo.
+	echo.If you don't have Sphinx installed, grab it from
+	echo.https://www.sphinx-doc.org/
+	exit /b 1
+)
+
+%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
+goto end
+
+:help
+%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
+
+:end
+popd
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/_static/.gitkeep b/dolphinscheduler-python/pydolphinscheduler/docs/source/_static/.gitkeep
new file mode 100644
index 0000000..e69de29
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/api.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/api.rst
new file mode 100644
index 0000000..8e55ea5
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/api.rst
@@ -0,0 +1,47 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+API
+===
+
+Core
+----
+
+.. automodule:: pydolphinscheduler.core
+  :inherited-members:
+
+Sides
+-----
+
+.. automodule:: pydolphinscheduler.side
+  :inherited-members:
+
+Tasks
+-----
+
+.. automodule:: pydolphinscheduler.tasks
+  :inherited-members:
+
+Constants
+---------
+
+.. automodule:: pydolphinscheduler.constants
+
+Exceptions
+----------
+
+.. automodule:: pydolphinscheduler.exceptions
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/concept.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/concept.rst
new file mode 100644
index 0000000..9a9527d
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/concept.rst
@@ -0,0 +1,151 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Concepts
+========
+
+In this section, you would know the core concepts of *PyDolphinScheduler*.
+
+Process Definition
+------------------
+
+Process definition describe the whole things except `tasks`_ and `tasks dependence`_, which including
+name, schedule interval, schedule start time and end time. You would know scheduler 
+
+Process definition could be initialized in normal assign statement or in context manger.
+
+.. code-block:: python
+
+   # Initialization with assign statement
+   pd = ProcessDefinition(name="my first process definition")
+
+   # Or context manger 
+   with ProcessDefinition(name="my first process definition") as pd:
+       pd.submit()
+
+Process definition is the main object communicate between *PyDolphinScheduler* and DolphinScheduler daemon.
+After process definition and task is be declared, you could use `submit` and `run` notify server your definition.
+
+If you just want to submit your definition and create workflow, without run it, you should use attribute `submit`.
+But if you want to run the workflow after you submit it, you could use attribute `run`.
+
+.. code-block:: python
+
+   # Just submit definition, without run it
+   pd.submit()
+   
+   # Both submit and run definition
+   pd.run()
+
+Schedule
+~~~~~~~~
+
+We use parameter `schedule` determine the schedule interval of workflow, *PyDolphinScheduler* support seven
+asterisks expression, and each of the meaning of position as below
+
+.. code-block:: text
+
+    * * * * * * *
+    ┬ ┬ ┬ ┬ ┬ ┬ ┬
+    │ │ │ │ │ │ │
+    │ │ │ │ │ │ └─── year
+    │ │ │ │ │ └───── day of week (0 - 7) (0 to 6 are Sunday to Saturday, or use names; 7 is Sunday, the same as 0)
+    │ │ │ │ └─────── month (1 - 12)
+    │ │ │ └───────── day of month (1 - 31)
+    │ │ └─────────── hour (0 - 23)
+    │ └───────────── min (0 - 59)
+    └─────────────── second (0 - 59)
+
+Here we add some example crontab:
+
+- `0 0 0 * * ? *`: Workflow execute every day at 00:00:00.
+- `10 2 * * * ? *`: Workflow execute hourly day at ten pass two.
+- `10,11 20 0 1,2 * ? *`: Workflow execute first and second day of month at 00:20:10 and 00:20:11.
+
+Tenant
+~~~~~~
+
+Tenant is the user who run task command in machine or in virtual machine. it could be assign by simple string.
+
+.. code-block:: python
+
+   # 
+   pd = ProcessDefinition(name="process definition tenant", tenant="tenant_exists")
+
+.. note::
+
+   Make should tenant exists in target machine, otherwise it will raise an error when you try to run command
+
+Tasks
+-----
+
+Task is the minimum unit running actual job, and it is nodes of DAG, aka directed acyclic graph. You could define
+what you want to in the task. It have some required parameter to make uniqueness and definition.
+
+Here we use :py:meth:`pydolphinscheduler.tasks.Shell` as example, parameter `name` and `command` is required and must be provider. Parameter
+`name` set name to the task, and parameter `command` declare the command you wish to run in this task.
+
+.. code-block:: python
+
+   # We named this task as "shell", and just run command `echo shell task`
+   shell_task = Shell(name="shell", command="echo shell task")
+
+If you want to see all type of tasks, you could see :doc:`tasks/index`.
+
+Tasks Dependence
+~~~~~~~~~~~~~~~~
+
+You could define many tasks in on single `Process Definition`_. If all those task is in parallel processing,
+then you could leave them alone without adding any additional information. But if there have some tasks should
+not be run unless pre task in workflow have be done, we should set task dependence to them. Set tasks dependence
+have two mainly way and both of them is easy. You could use bitwise operator `>>` and `<<`, or task attribute 
+`set_downstream` and `set_upstream` to do it.
+
+.. code-block:: python
+
+   # Set task1 as task2 upstream
+   task1 >> task2
+   # You could use attribute `set_downstream` too, is same as `task1 >> task2`
+   task1.set_downstream(task2)
+   
+   # Set task1 as task2 downstream
+   task1 << task2
+   # It is same as attribute `set_upstream`
+   task1.set_upstream(task2)
+   
+   # Beside, we could set dependence between task and sequence of tasks,
+   # we set `task1` is upstream to both `task2` and `task3`. It is useful
+   # for some tasks have same dependence.
+   task1 >> [task2, task3]
+
+Task With Process Definition
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+In most of data orchestration cases, you should assigned attribute `process_definition` to task instance to
+decide workflow of task. You could set `process_definition` in both normal assign or in context manger mode
+
+.. code-block:: python
+
+   # Normal assign, have to explicit declaration and pass `ProcessDefinition` instance to task
+   pd = ProcessDefinition(name="my first process definition")
+   shell_task = Shell(name="shell", command="echo shell task", process_definition=pd)
+
+   # Context manger, `ProcessDefinition` instance pd would implicit declaration to task
+   with ProcessDefinition(name="my first process definition") as pd:
+       shell_task = Shell(name="shell", command="echo shell task",
+
+With both `Process Definition`_, `Tasks`_  and `Tasks Dependence`_, we could build a workflow with multiple tasks.
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py b/dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py
new file mode 100644
index 0000000..5ee73a5
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Configuration file for the Sphinx documentation builder.
+#
+# This file only contains a selection of the most common options. For a full
+# list see the documentation:
+# https://www.sphinx-doc.org/en/master/usage/configuration.html
+
+# -- Path setup --------------------------------------------------------------
+
+# If extensions (or modules to document with autodoc) are in another directory,
+# add these directories to sys.path here. If the directory is relative to the
+# documentation root, use os.path.abspath to make it absolute, like shown here.
+#
+# import os
+# import sys
+# sys.path.insert(0, os.path.abspath('.'))
+
+
+# -- Project information -----------------------------------------------------
+
+project = "pydolphinscheduler"
+copyright = "2022, apache"
+author = "apache"
+
+# The full version, including alpha/beta/rc tags
+release = "0.0.1"
+
+
+# -- General configuration ---------------------------------------------------
+
+# Add any Sphinx extension module names here, as strings. They can be
+# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
+# ones.
+extensions = [
+    # Measures durations of Sphinx processing
+    "sphinx.ext.duration",
+    # Semi-automatic make docstrings to document
+    "sphinx.ext.autodoc",
+    "sphinx.ext.viewcode",
+    "sphinx.ext.autosectionlabel",
+    "sphinx_rtd_theme",
+]
+
+# Add any paths that contain templates here, relative to this directory.
+templates_path = ["_templates"]
+
+# List of patterns, relative to source directory, that match files and
+# directories to ignore when looking for source files.
+# This pattern also affects html_static_path and html_extra_path.
+exclude_patterns = []
+
+autodoc_default_options = {
+    "members": True,
+    "show-inheritance": True,
+    "private-members": True,
+    "undoc-members": True,
+    "member-order": "groupwise",
+}
+
+autosectionlabel_prefix_document = True
+
+# -- Options for HTML output -------------------------------------------------
+
+# The theme to use for HTML and HTML Help pages.  See the documentation for
+# a list of builtin themes.
+#
+html_theme = "sphinx_rtd_theme"
+
+# Add any paths that contain custom static files (such as style sheets) here,
+# relative to this directory. They are copied after the builtin static files,
+# so a file named "default.css" will overwrite the builtin "default.css".
+html_static_path = ["_static"]
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
new file mode 100644
index 0000000..b04c26f
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/index.rst
@@ -0,0 +1,42 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+PyDolphinScheduler
+==================
+
+**PyDolphinScheduler** is Python API for `Apache DolphinScheduler <https://dolphinscheduler.apache.org>`_,
+which allow you definition your workflow by Python code, aka workflow-as-codes.
+
+I could go and find how to :ref:`install <start:getting started>` the project. Or if you want to see simply example
+then go and see :doc:`tutorial` for more detail.
+
+
+.. toctree::
+   :maxdepth: 2
+
+   start
+   tutorial
+   concept
+   tasks/index
+   api
+
+Indices and tables
+==================
+
+* :ref:`genindex`
+* :ref:`modindex`
+* :ref:`search`
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst
new file mode 100644
index 0000000..0af90d5
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst
@@ -0,0 +1,113 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Getting Started
+===============
+
+To get started with *PyDolphinScheduler* you must ensure python and pip
+installed on your machine, if you're already set up, you can skip straight
+to `Installing PyDolphinScheduler`_, otherwise please continue with
+`Installing Python`_.
+
+Installing Python
+-----------------
+
+How to install `python` and `pip` depends on what operating system
+you're using. The python wiki provides up to date
+`instructions for all platforms here`_. When you entering the website
+and choice your operating system, you would be offered the choice and
+select python version. *PyDolphinScheduler* recommend use version above
+Python 3.6 and we highly recommend you install *Stable Releases* instead
+of *Pre-releases*.
+
+After you have download and installed Python, you should open your terminal,
+typing and running :code:`python --version` to check whether the installation
+is correct or not. If all thing good, you could see the version in console
+without error(here is a example after Python 3.8.7 installed)
+
+.. code-block:: bash
+
+    $ python --version
+    Python 3.8.7
+
+Installing PyDolphinScheduler
+-----------------------------
+
+After Python is already installed on your machine following section
+`installing Python`_, it easy to *PyDolphinScheduler* by pip.
+
+.. code-block:: bash
+
+    $ pip install apache-dolphinscheduler
+
+The latest version of *PyDolphinScheduler* would be installed after you run above
+command in your terminal. You could go and `start Python Gateway Server`_ to finish
+the prepare, and then go to :doc:`tutorial` to make your hand dirty. But if you
+want to install the unreleased version of *PyDolphinScheduler*, you could go and see
+section `installing PyDolphinScheduler in dev`_ for more detail.
+
+Installing PyDolphinScheduler In Dev
+------------------------------------
+
+Because the project is developing and some of the features still not release.
+If you want to try some thing unreleased you could install from the source code
+which we hold in GitHub
+
+.. code-block:: bash
+
+    # Clone Apache DolphinScheduler repository
+    $ git clone git@github.com:apache/dolphinscheduler.git
+    # Install PyDolphinScheduler in develop mode
+    $ cd dolphinscheduler-python/pydolphinscheduler && pip install -e .
+
+After you installed *PyDolphinScheduler*, please remember `start Python Gateway Server`_
+which waiting for *PyDolphinScheduler*'s workflow definition require.
+
+Start Python Gateway Server
+---------------------------
+
+Since **PyDolphinScheduler** is Python API for `Apache DolphinScheduler`_, it
+could define workflow and tasks structure, but could not run it unless you
+`install Apache DolphinScheduler`_ and start Python gateway server. We only
+and some key steps here and you could go `install Apache DolphinScheduler`_
+for more detail
+
+.. code-block:: bash
+
+    # Start pythonGatewayServer
+    $ ./bin/dolphinscheduler-daemon.sh start pythonGatewayServer
+
+To check whether the server is alive or not, you could run :code:`jps`. And
+the server is health if keyword `PythonGatewayServer` in the console. 
+
+.. code-block:: bash
+
+    $ jps
+    ....
+    201472 PythonGatewayServer
+    ....
+
+What's More
+-----------
+
+If you do not familiar with *PyDolphinScheduler*, you could go to :doc:`tutorial`
+and see how it work. But if you already know the inside of *PyDolphinScheduler*,
+maybe you could go and play with all :doc:`tasks/index` *PyDolphinScheduler* supports.
+
+.. _`instructions for all platforms here`: https://wiki.python.org/moin/BeginnersGuide/Download
+.. _`Apache DolphinScheduler`: https://dolphinscheduler.apache.org
+.. _`install Apache DolphinScheduler`: https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/installation/standalone.html
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst
new file mode 100644
index 0000000..20b0350
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/condition.rst
@@ -0,0 +1,33 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Condition
+=========
+
+A condition task type's example and dive into information of **PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_condition_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.condition
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst
new file mode 100644
index 0000000..c077269
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/datax.rst
@@ -0,0 +1,33 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Datax
+=====
+
+A DataX task type's example and dive into information of **PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_datax_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.datax
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst
new file mode 100644
index 0000000..fe26d0f
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/dependent.rst
@@ -0,0 +1,33 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Dependent
+=========
+
+A dependent task type's example and dive into information of **PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_dependent_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.dependent
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst
new file mode 100644
index 0000000..8db9ac2
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/flink.rst
@@ -0,0 +1,33 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Flink
+=====
+
+A flink task type's example and dive into information of **PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_flink_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.flink
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst
new file mode 100644
index 0000000..4c6d8f8
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/http.rst
@@ -0,0 +1,21 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+HTTP
+====
+
+.. automodule:: pydolphinscheduler.tasks.http
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
new file mode 100644
index 0000000..42dcdf9
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
@@ -0,0 +1,41 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Tasks
+=====
+
+In this section 
+
+.. toctree::
+   :maxdepth: 1
+   
+   shell
+   sql
+   python
+   http
+
+   switch
+   condition
+   dependent
+
+   spark
+   flink
+   map_reduce
+   procedure
+
+   datax
+   sub_process
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst
new file mode 100644
index 0000000..068b8d8
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/map_reduce.rst
@@ -0,0 +1,34 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Map Reduce
+==========
+
+
+A Map Reduce task type's example and dive into information of **PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_map_reduce_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.map_reduce
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst
new file mode 100644
index 0000000..cd79eff
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/procedure.rst
@@ -0,0 +1,21 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Procedure
+=========
+
+.. automodule:: pydolphinscheduler.tasks.procedure
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst
new file mode 100644
index 0000000..660e46a
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/python.rst
@@ -0,0 +1,21 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Python
+======
+
+.. automodule:: pydolphinscheduler.tasks.python
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst
new file mode 100644
index 0000000..5ce16c3
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/shell.rst
@@ -0,0 +1,33 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Shell
+=====
+
+A shell task type's example and dive into information of **PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/tutorial.py
+   :start-after: [start workflow_declare]
+   :end-before: [end task_relation_declare]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.shell
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst
new file mode 100644
index 0000000..cdb5902
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/spark.rst
@@ -0,0 +1,33 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Spark
+=====
+
+A spark task type's example and dive into information of **PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_spark_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.spark
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst
new file mode 100644
index 0000000..21eaec7
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sql.rst
@@ -0,0 +1,21 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+SQL
+===
+
+.. automodule:: pydolphinscheduler.tasks.sql
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst
new file mode 100644
index 0000000..8a9f562
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/sub_process.rst
@@ -0,0 +1,21 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Sub Process
+===========
+
+.. automodule:: pydolphinscheduler.tasks.sub_process
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst
new file mode 100644
index 0000000..d8b34a4
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/switch.rst
@@ -0,0 +1,33 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Switch
+======
+
+A switch task type's example and dive into information of **PyDolphinScheduler**.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/task_switch_example.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.switch
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
new file mode 100644
index 0000000..83c5746
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
@@ -0,0 +1,150 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Tutorial
+========
+
+This tutorial show you the basic concept of *PyDolphinScheduler* and tell all
+things you should know before you submit or run your first workflow. If you
+still not install *PyDolphinScheduler* and start Apache DolphinScheduler, you
+could go and see :ref:`how to getting start PyDolphinScheduler <start:getting started>`
+
+Overview of Tutorial
+--------------------
+
+Here have an overview of our tutorial, and it look a little complex but do not
+worry about that because we explain this example below as detailed as possible.
+
+.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+   :start-after: [start tutorial]
+   :end-before: [end tutorial]
+
+Import Necessary Module
+-----------------------
+
+First of all, we should importing necessary module which we would use later just
+like other Python package. We just create a minimum demo here, so we just import
+:class:`pydolphinscheduler.core.process_definition` and
+:class:`pydolphinscheduler.tasks.shell`.
+
+.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+   :start-after: [start package_import]
+   :end-before: [end package_import]
+
+If you want to use other task type you could click and
+:doc:`see all tasks we support <tasks/index>`
+
+Process Definition Declaration
+------------------------------
+
+We should instantiate object after we import them from `import necessary module`_.
+Here we declare basic arguments for process definition(aka, workflow). We define
+the name of process definition, using `Python context manager`_ and it
+**the only required argument** for object process definition. Beside that we also
+declare three arguments named `schedule`, `start_time` which setting workflow schedule
+interval and schedule start_time, and argument `tenant` which changing workflow's
+task running user in the worker, :ref:`section tenant <concept:tenant>` in *PyDolphinScheduler*
+:doc:`concept` page have more detail information.
+
+.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+   :start-after: [start workflow_declare]
+   :end-before: [end workflow_declare]
+
+We could find more detail about process definition in
+:ref:`concept about process definition <concept:process definition>` if you interested in it.
+For all arguments of object process definition, you could find in the
+:class:`pydolphinscheduler.core.process_definition` api documentation.
+
+Task Declaration
+----------------
+
+Here we declare four tasks, and bot of them are simple task of
+:class:`pydolphinscheduler.tasks.shell` which running `echo` command in terminal.
+Beside the argument `command`, we also need setting argument `name` for each task *(not
+only shell task, `name` is required for each type of task)*.
+
+.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+   :dedent: 0
+   :start-after: [start task_declare]
+   :end-before: [end task_declare]
+
+Beside shell task, *PyDolphinScheduler* support multiple tasks and you could
+find in :doc:`tasks/index`.
+
+Setting Task Dependence
+-----------------------
+
+After we declare both process definition and task, we have one workflow with
+four tasks, both all tasks is independent so that they would run in parallel.
+We should reorder the sort and the dependence of tasks. It useful when we need
+run prepare task before we run actual task or we need tasks running is specific
+rule. We both support attribute `set_downstream` and `set_upstream`, or bitwise
+operators `>>` and `<<`.
+
+In this example, we set task `task_parent` is the upstream task of task
+`task_child_one` and `task_child_two`, and task `task_union` is the downstream
+task of both these two task.
+
+.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+   :dedent: 0
+   :start-after: [start task_relation_declare]
+   :end-before: [end task_relation_declare]
+
+Please notice that we could grouping some tasks and set dependence if they have
+same downstream or upstream. We declare task `task_child_one` and `task_child_two`
+as a group here, named as `task_group` and set task `task_parent` as upstream of
+both of them. You could see more detail in :ref:`concept:Tasks Dependence` section in concept
+documentation.
+
+Submit Or Run Workflow
+----------------------
+
+Now we finish our workflow definition, with task and task dependence, but all
+these things are in local, we should let Apache DolphinScheduler daemon know what we
+define our workflow. So the last thing we have to do here is submit our workflow to
+Apache DolphinScheduler daemon.
+
+We here in the example using `ProcessDefinition` attribute `run` to submit workflow
+to the daemon, and set the schedule time we just declare in `process definition declaration`_.
+
+Now, we could run the Python code like other Python script, for the basic usage run
+:code:`python tutorial.py` to trigger and run it.
+
+.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+   :dedent: 0
+   :start-after: [start submit_or_run]
+   :end-before: [end submit_or_run]
+
+If you not start your Apache DolphinScheduler server, you could find the way in
+:ref:`start:start Python gateway server` and it would have more detail about related server
+start. Beside attribute `run`, we have attribute `submit` for object `ProcessDefinition`
+and it just submit workflow to the daemon but not setting the schedule information. For
+more detail you could see :ref:`concept:process definition`.
+
+DAG Graph After Tutorial Run
+----------------------------
+
+After we run the tutorial code, you could login Apache DolphinScheduler web UI,
+go and see the `DolphinScheduler project page`_. they is a new process definition be
+created and named "Tutorial". It create by *PyDolphinScheduler* and the DAG graph as below
+
+.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+   :language: text
+   :lines: 24-28
+
+.. _`DolphinScheduler project page`: https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/project.html
+.. _`Python context manager`: https://docs.python.org/3/library/stdtypes.html#context-manager-types
diff --git a/dolphinscheduler-python/pydolphinscheduler/setup.py b/dolphinscheduler-python/pydolphinscheduler/setup.py
index 4a6c045..cd6eb3e 100644
--- a/dolphinscheduler-python/pydolphinscheduler/setup.py
+++ b/dolphinscheduler-python/pydolphinscheduler/setup.py
@@ -22,13 +22,41 @@ from os.path import dirname, join
 
 from setuptools import find_packages, setup
 
-version = "0.0.1.dev0"
-
 if sys.version_info[0] < 3:
     raise Exception(
         "pydolphinscheduler does not support Python 2. Please upgrade to Python 3."
     )
 
+version = "0.1.0"
+
+# Start package required
+prod = [
+    "py4j~=0.10",
+]
+
+doc = [
+    "sphinx>=4.3",
+    "sphinx_rtd_theme>=1.0",
+]
+
+test = [
+    "pytest>=6.2",
+    "freezegun>=1.1",
+    "coverage>=6.1",
+]
+
+style = [
+    "flake8>=4.0",
+    "flake8-docstrings>=1.6",
+    "flake8-black>=0.2",
+    "isort>=5.10",
+]
+
+dev = style + test + doc
+
+all_dep = prod + dev
+# End package required
+
 
 def read(*names, **kwargs):
     """Read file content from given file path."""
@@ -38,10 +66,10 @@ def read(*names, **kwargs):
 
 
 setup(
-    name="pydolphinscheduler",
+    name="apache-dolphinscheduler",
     version=version,
     license="Apache License 2.0",
-    description="Apache DolphinScheduler python SDK",
+    description="Apache DolphinScheduler Python API",
     long_description=read("README.md"),
     # Make sure pypi is expecting markdown
     long_description_content_type="text/markdown",
@@ -57,8 +85,8 @@ setup(
     ],
     project_urls={
         "Homepage": "https://dolphinscheduler.apache.org",
-        "Documentation": "https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/quick-start.html",
-        "Source": "https://github.com/apache/dolphinscheduler",
+        "Documentation": "https://dolphinscheduler.apache.org/python/index.html",
+        "Source": "https://github.com/apache/dolphinscheduler/dolphinscheduler-python/pydolphinscheduler",
         "Issue Tracker": "https://github.com/apache/dolphinscheduler/issues",
         "Discussion": "https://github.com/apache/dolphinscheduler/discussions",
         "Twitter": "https://twitter.com/dolphinschedule",
@@ -66,9 +94,13 @@ setup(
     packages=find_packages(where="src"),
     package_dir={"": "src"},
     include_package_data=True,
+    package_data={
+        "examples": ["examples.tutorial.py"],
+    },
+    platforms=["any"],
     classifiers=[
         # complete classifier list: http://pypi.python.org/pypi?%3Aaction=list_classifiers
-        "Development Status :: 1 - Planning",
+        "Development Status :: 3 - Alpha",
         "Environment :: Console",
         "Intended Audience :: Developers",
         "License :: OSI Approved :: Apache Software License",
@@ -85,10 +117,12 @@ setup(
         "Programming Language :: Python :: Implementation :: PyPy",
         "Topic :: Software Development :: User Interfaces",
     ],
-    install_requires=[
-        # Core
-        "py4j~=0.10",
-        # Dev
-        "pytest~=6.2",
-    ],
+    install_requires=prod,
+    extras_require={
+        "all": all_dep,
+        "dev": dev,
+        "style": style,
+        "test": test,
+        "doc": doc,
+    },
 )
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py
index 701b4cc..2a7b554 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py
@@ -16,3 +16,7 @@
 # under the License.
 
 """Init root of pydolphinscheduler."""
+
+from pkg_resources import get_distribution
+
+__version__ = get_distribution("apache-dolphinscheduler").version
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 84becbc..65bf6c5 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -77,6 +77,9 @@ class TaskType(str):
     DEPENDENT = "DEPENDENT"
     CONDITIONS = "CONDITIONS"
     SWITCH = "SWITCH"
+    FLINK = "FLINK"
+    SPARK = "SPARK"
+    MR = "MR"
 
 
 class DefaultTaskCodeNum(str):
@@ -96,6 +99,10 @@ class JavaGatewayDefault(str):
 
     RESULT_DATA = "data"
 
+    SERVER_ADDRESS = "127.0.0.1"
+    SERVER_PORT = 25333
+    AUTO_CONVERT = True
+
 
 class Delimiter(str):
     """Constants for delimiter."""
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
index 3fbddf3..31dc944 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
@@ -16,3 +16,13 @@
 # under the License.
 
 """Init pydolphinscheduler.core package."""
+
+from pydolphinscheduler.core.database import Database
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.task import Task
+
+__all__ = [
+    "ProcessDefinition",
+    "Task",
+    "Database",
+]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py
index d4addf9..b6602a6 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/database.py
@@ -17,11 +17,11 @@
 
 """Module database."""
 
-import logging
 from typing import Dict
 
 from py4j.protocol import Py4JJavaError
 
+from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.java_gateway import launch_gateway
 
 
@@ -58,7 +58,6 @@ class Database(dict):
             try:
                 self._database = gateway.entry_point.getDatasourceInfo(name)
             # Handler database source do not exists error, for now we just terminate the process.
-            except Py4JJavaError:
-                logging.error("Datasource name `%s` do not exists.", name)
-                exit(1)
+            except Py4JJavaError as ex:
+                raise PyDSParamException(str(ex.java_exception))
             return self._database
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py
new file mode 100644
index 0000000..df84b5b
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Module engine."""
+
+from typing import Dict, Optional
+
+from py4j.protocol import Py4JJavaError
+
+from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.java_gateway import launch_gateway
+
+
+class ProgramType(str):
+    """Type of program engine runs, for now it just contain `JAVA`, `SCALA` and `PYTHON`."""
+
+    JAVA = "JAVA"
+    SCALA = "SCALA"
+    PYTHON = "PYTHON"
+
+
+class Engine(Task):
+    """Task engine object, declare behavior for engine task to dolphinscheduler.
+
+    This is the parent class of spark, flink and mr tasks,
+    and is used to provide the programType, mainClass and mainJar task parameters for reuse.
+    """
+
+    def __init__(
+        self,
+        name: str,
+        task_type: str,
+        main_class: str,
+        main_package: str,
+        program_type: Optional[ProgramType] = ProgramType.SCALA,
+        *args,
+        **kwargs
+    ):
+        super().__init__(name, task_type, *args, **kwargs)
+        self.main_class = main_class
+        self.main_package = main_package
+        self.program_type = program_type
+        self._resource = {}
+
+    def get_resource_info(self, program_type, main_package):
+        """Get resource info from java gateway, contains resource id, name."""
+        if self._resource:
+            return self._resource
+        else:
+            gateway = launch_gateway()
+            try:
+                self._resource = gateway.entry_point.getResourcesFileInfo(
+                    program_type, main_package
+                )
+            # Handler source do not exists error, for now we just terminate the process.
+            except Py4JJavaError as ex:
+                raise PyDSParamException(str(ex.java_exception))
+            return self._resource
+
+    def get_jar_id(self) -> int:
+        """Get jar id from java gateway, a wrapper for :func:`get_resource_info`."""
+        return self.get_resource_info(self.program_type, self.main_package).get("id")
+
+    @property
+    def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
+        """Override Task.task_params for engine children task.
+
+        children task have some specials attribute for task_params, and is odd if we
+        directly set as python property, so we Override Task.task_params here.
+        """
+        params = super().task_params
+        custom_params = {
+            "programType": self.program_type,
+            "mainClass": self.main_class,
+            "mainJar": {
+                "id": self.get_jar_id(),
+            },
+        }
+        params.update(custom_params)
+        return params
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index f6fab01..1c123fc 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -24,6 +24,7 @@ from typing import Any, Dict, List, Optional, Set
 from pydolphinscheduler.constants import (
     ProcessDefinitionDefault,
     ProcessDefinitionReleaseState,
+    TaskType,
 )
 from pydolphinscheduler.core.base import Base
 from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
@@ -97,7 +98,7 @@ class ProcessDefinition(Base):
         worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP,
         timeout: Optional[int] = 0,
         release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
-        param: Optional[List] = None,
+        param: Optional[Dict] = None,
     ):
         super().__init__(name, description)
         self.schedule = schedule
@@ -190,6 +191,22 @@ class ProcessDefinition(Base):
         self._end_time = val
 
     @property
+    def param_json(self) -> Optional[List[Dict]]:
+        """Return param json base on self.param."""
+        # Handle empty dict and None value
+        if not self.param:
+            return []
+        return [
+            {
+                "prop": k,
+                "direct": "IN",
+                "type": "VARCHAR",
+                "value": v,
+            }
+            for k, v in self.param.items()
+        ]
+
+    @property
     def task_definition_json(self) -> List[Dict]:
         """Return all tasks definition in list of dict."""
         if not self.tasks:
@@ -323,16 +340,33 @@ class ProcessDefinition(Base):
         # Project model need User object exists
         self.project.create_if_not_exists(self._user)
 
+    def _pre_submit_check(self):
+        """Check specific condition satisfy before.
+
+        This method should be called before process definition submit to java gateway
+        For now, we have below checker:
+        * `self.param` should be set if task `switch` in this workflow.
+        """
+        if (
+            any([task.task_type == TaskType.SWITCH for task in self.tasks.values()])
+            and self.param is None
+        ):
+            raise PyDSParamException(
+                "Parameter param must be provider if task Switch in process definition."
+            )
+
     def submit(self) -> int:
         """Submit ProcessDefinition instance to java gateway."""
         self._ensure_side_model_exists()
+        self._pre_submit_check()
+
         gateway = launch_gateway()
         self._process_definition_code = gateway.entry_point.createOrUpdateProcessDefinition(
             self._user,
             self._project,
             self.name,
             str(self.description) if self.description else "",
-            json.dumps(self.param) if self.param else None,
+            json.dumps(self.param_json),
             json.dumps(self.schedule_json) if self.schedule_json else None,
             json.dumps(self.task_location),
             self.timeout,
@@ -341,6 +375,7 @@ class ProcessDefinition(Base):
             # TODO add serialization function
             json.dumps(self.task_relation_json),
             json.dumps(self.task_definition_json),
+            None,
         )
         return self._process_definition_code
 
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index 89fda57..693f508 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -26,7 +26,6 @@ from pydolphinscheduler.constants import (
     TaskFlag,
     TaskPriority,
     TaskTimeoutFlag,
-    TaskType,
 )
 from pydolphinscheduler.core.base import Base
 from pydolphinscheduler.core.process_definition import (
@@ -157,8 +156,7 @@ class Task(Base):
         self.resource_list = resource_list or []
         self.dependence = dependence or {}
         self.wait_start_timeout = wait_start_timeout or {}
-        if task_type != TaskType.CONDITIONS:
-            self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
+        self._condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
 
     @property
     def process_definition(self) -> Optional[ProcessDefinition]:
@@ -171,6 +169,16 @@ class Task(Base):
         self._process_definition = process_definition
 
     @property
+    def condition_result(self) -> Dict:
+        """Get attribute condition_result."""
+        return self._condition_result
+
+    @condition_result.setter
+    def condition_result(self, condition_result: Optional[Dict]):
+        """Set attribute condition_result."""
+        self._condition_result = condition_result
+
+    @property
     def task_params(self) -> Optional[Dict]:
         """Get task parameter object.
 
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/__init__.py
similarity index 90%
copy from dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py
copy to dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/__init__.py
index 701b4cc..37b2e5b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/__init__.py
@@ -15,4 +15,4 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Init root of pydolphinscheduler."""
+"""Init examples package which provides users with pydolphinscheduler examples."""
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/bulk_create.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/bulk_create_example.py
similarity index 100%
rename from dolphinscheduler-python/pydolphinscheduler/examples/bulk_create.py
rename to dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/bulk_create_example.py
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_condition_example.py
similarity index 87%
rename from dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
rename to dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_condition_example.py
index 1415206..2d73df4 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_condition_example.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+# [start workflow_declare]
 r"""
 A example workflow for task condition.
 
@@ -31,10 +32,10 @@ pre_task_3 ->                     -> fail_branch
 """
 
 from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Conditions
+from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Condition
 from pydolphinscheduler.tasks.shell import Shell
 
-with ProcessDefinition(name="task_conditions_example_1", tenant="tenant_exists") as pd:
+with ProcessDefinition(name="task_condition_example", tenant="tenant_exists") as pd:
     pre_task_1 = Shell(name="pre_task_1", command="echo pre_task_1")
     pre_task_2 = Shell(name="pre_task_2", command="echo pre_task_2")
     pre_task_3 = Shell(name="pre_task_3", command="echo pre_task_3")
@@ -45,13 +46,14 @@ with ProcessDefinition(name="task_conditions_example_1", tenant="tenant_exists")
         ),
     )
 
-    success_branch = Shell(name="success_branch", command="success_branch parent")
+    success_branch = Shell(name="success_branch", command="echo success_branch")
     fail_branch = Shell(name="fail_branch", command="echo fail_branch")
 
-    condition = Conditions(
-        name="conditions",
+    condition = Condition(
+        name="condition",
         condition=cond_operator,
         success_task=success_branch,
         failed_task=fail_branch,
     )
     pd.submit()
+# [end workflow_declare]
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_datax_example.py
similarity index 73%
rename from dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
rename to dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_datax_example.py
index 9b4254a..94bd449 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_datax_example.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+# [start workflow_declare]
 """
 A example workflow for task datax.
 
@@ -24,7 +25,6 @@ You can create data sources `first_mysql` and `first_mysql` through UI.
 It creates a task to synchronize datax from the source database to the target database.
 """
 
-
 from pydolphinscheduler.core.process_definition import ProcessDefinition
 from pydolphinscheduler.tasks.datax import CustomDataX, DataX
 
@@ -38,24 +38,15 @@ JSON_TEMPLATE = {
                     "parameter": {
                         "username": "usr",
                         "password": "pwd",
-                        "column": [
-                            "id",
-                            "name",
-                            "code",
-                            "description"
-                        ],
+                        "column": ["id", "name", "code", "description"],
                         "splitPk": "id",
                         "connection": [
                             {
-                                "table": [
-                                    "source_table"
-                                ],
-                                "jdbcUrl": [
-                                    "jdbc:mysql://127.0.0.1:3306/source_db"
-                                ]
+                                "table": ["source_table"],
+                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/source_db"],
                             }
-                        ]
-                    }
+                        ],
+                    },
                 },
                 "writer": {
                     "name": "mysqlwriter",
@@ -63,31 +54,32 @@ JSON_TEMPLATE = {
                         "writeMode": "insert",
                         "username": "usr",
                         "password": "pwd",
-                        "column": [
-                            "id",
-                            "name"
-                        ],
+                        "column": ["id", "name"],
                         "connection": [
                             {
                                 "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db",
-                                "table": [
-                                    "target_table"
-                                ]
+                                "table": ["target_table"],
                             }
-                        ]
-                    }
-                }
+                        ],
+                    },
+                },
             }
-        ]
+        ],
+        "setting": {
+            "errorLimit": {"percentage": 0, "record": 0},
+            "speed": {"channel": 1, "record": 1000},
+        },
     }
 }
 
 with ProcessDefinition(
-    name="task_datax_1",
+    name="task_datax_example",
     tenant="tenant_exists",
 ) as pd:
     # This task synchronizes the data in `t_ds_project`
     # of `first_mysql` database to `target_project` of `second_mysql` database.
+    # You have to make sure data source named `first_mysql` and `second_mysql` exists
+    # in your environment.
     task1 = DataX(
         name="task_datax",
         datasource_name="first_mysql",
@@ -96,7 +88,8 @@ with ProcessDefinition(
         target_table="target_table",
     )
 
-    # you can custom json_template of datax to sync data. This task create job
-    # same as task1 do
+    # You can custom json_template of datax to sync data. This task create a new
+    # datax job same as task1, transfer record from `first_mysql` to `second_mysql`
     task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE))
     pd.run()
+# [end workflow_declare]
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_dependent_example.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_dependent_example.py
similarity index 96%
rename from dolphinscheduler-python/pydolphinscheduler/examples/task_dependent_example.py
rename to dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_dependent_example.py
index 68f2a57..ae19d95 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/task_dependent_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_dependent_example.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+# [start workflow_declare]
 r"""
 A example workflow for task dependent.
 
@@ -49,7 +50,7 @@ with ProcessDefinition(
     pd.submit()
 
 with ProcessDefinition(
-    name="task_dependent",
+    name="task_dependent_example",
     tenant="tenant_exists",
 ) as pd:
     task = Dependent(
@@ -70,3 +71,4 @@ with ProcessDefinition(
         ),
     )
     pd.submit()
+# [end workflow_declare]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_flink_example.py
similarity index 57%
copy from dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
copy to dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_flink_example.py
index de5188c..1e8a040 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_flink_example.py
@@ -15,8 +15,19 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Init Side package, Side package keep object related to DolphinScheduler but not in the Core part."""
+# [start workflow_declare]
+"""A example workflow for task flink."""
 
-from pydolphinscheduler.side.project import Project
-from pydolphinscheduler.side.tenant import Tenant
-from pydolphinscheduler.side.user import User
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.flink import DeployMode, Flink, ProgramType
+
+with ProcessDefinition(name="task_flink_example", tenant="tenant_exists") as pd:
+    task = Flink(
+        name="task_flink",
+        main_class="org.apache.flink.streaming.examples.wordcount.WordCount",
+        main_package="WordCount.jar",
+        program_type=ProgramType.JAVA,
+        deploy_mode=DeployMode.LOCAL,
+    )
+    pd.run()
+# [end workflow_declare]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_map_reduce_example.py
similarity index 55%
copy from dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
copy to dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_map_reduce_example.py
index 94ad2f4..39b204f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_map_reduce_example.py
@@ -15,24 +15,20 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Task shell."""
+# [start workflow_declare]
+"""A example workflow for task mr."""
 
-from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.core.engine import ProgramType
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.map_reduce import MR
 
-
-class Shell(Task):
-    """Task shell object, declare behavior for shell task to dolphinscheduler.
-
-    TODO maybe we could use instance name to replace attribute `name`
-    which is simplify as `task_shell = Shell(command = "echo 1")` and
-    task.name assign to `task_shell`
-    """
-
-    _task_custom_attr = {
-        "raw_script",
-    }
-
-    def __init__(self, name: str, command: str, *args, **kwargs):
-        super().__init__(name, TaskType.SHELL, *args, **kwargs)
-        self.raw_script = command
+with ProcessDefinition(name="task_map_reduce_example", tenant="tenant_exists") as pd:
+    task = MR(
+        name="task_mr",
+        main_class="wordcount",
+        main_package="hadoop-mapreduce-examples-3.3.1.jar",
+        program_type=ProgramType.JAVA,
+        main_args="/dolphinscheduler/tenant_exists/resources/file.txt /output/ds",
+    )
+    pd.run()
+# [end workflow_declare]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_spark_example.py
similarity index 57%
copy from dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
copy to dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_spark_example.py
index de5188c..594d95f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_spark_example.py
@@ -15,8 +15,19 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Init Side package, Side package keep object related to DolphinScheduler but not in the Core part."""
+# [start workflow_declare]
+"""A example workflow for task spark."""
 
-from pydolphinscheduler.side.project import Project
-from pydolphinscheduler.side.tenant import Tenant
-from pydolphinscheduler.side.user import User
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark
+
+with ProcessDefinition(name="task_spark_example", tenant="tenant_exists") as pd:
+    task = Spark(
+        name="task_spark",
+        main_class="org.apache.spark.examples.SparkPi",
+        main_package="spark-examples_2.12-3.2.0.jar",
+        program_type=ProgramType.JAVA,
+        deploy_mode=DeployMode.LOCAL,
+    )
+    pd.run()
+# [end workflow_declare]
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_switch_example.py
similarity index 90%
rename from dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
rename to dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_switch_example.py
index 9a13854..7966af3 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/task_switch_example.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+# [start workflow_declare]
 r"""
 A example workflow for task switch.
 
@@ -34,16 +35,7 @@ from pydolphinscheduler.tasks.shell import Shell
 from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
 
 with ProcessDefinition(
-    name="task_switch_example",
-    tenant="tenant_exists",
-    param=[
-        {
-            "prop": "var",
-            "direct": "IN",
-            "type": "VARCHAR",
-            "value": "1"
-        }
-    ]
+    name="task_switch_example", tenant="tenant_exists", param={"var": "1"}
 ) as pd:
     parent = Shell(name="parent", command="echo parent")
     switch_child_1 = Shell(name="switch_child_1", command="echo switch_child_1")
@@ -56,3 +48,4 @@ with ProcessDefinition(
     switch = Switch(name="switch", condition=switch_condition)
     parent >> switch
     pd.submit()
+# [end workflow_declare]
diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial.py
similarity index 70%
rename from dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
rename to dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial.py
index 52a8e97..0478e68 100644
--- a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial.py
@@ -24,25 +24,45 @@ and workflow DAG graph as below:
                   --> task_child_one
                 /                    \
 task_parent -->                        -->  task_union
-                \                   /
+                \                    /
                   --> task_child_two
 
 it will instantiate and run all the task it have.
 """
 
+# [start tutorial]
+# [start package_import]
+# Import ProcessDefinition object to define your workflow attributes
 from pydolphinscheduler.core.process_definition import ProcessDefinition
+
+# Import task Shell object cause we would create some shell tasks later
 from pydolphinscheduler.tasks.shell import Shell
 
+# [end package_import]
+
+# [start workflow_declare]
 with ProcessDefinition(
-    name="aklsfkkalsfjkol",
+    name="tutorial",
+    schedule="0 0 0 * * ? *",
+    start_time="2021-01-01",
     tenant="tenant_exists",
 ) as pd:
+    # [end workflow_declare]
+    # [start task_declare]
+    task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
     task_child_one = Shell(name="task_child_one", command="echo 'child one'")
     task_child_two = Shell(name="task_child_two", command="echo 'child two'")
     task_union = Shell(name="task_union", command="echo union")
+    # [end task_declare]
 
+    # [start task_relation_declare]
     task_group = [task_child_one, task_child_two]
+    task_parent.set_downstream(task_group)
 
     task_union << task_group
+    # [end task_relation_declare]
 
+    # [start submit_or_run]
     pd.run()
+    # [end submit_or_run]
+# [end tutorial]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
index d0b4c05..2876ed5 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py
@@ -26,14 +26,23 @@ from pydolphinscheduler.constants import JavaGatewayDefault
 from pydolphinscheduler.exceptions import PyDSJavaGatewayException
 
 
-def launch_gateway() -> JavaGateway:
+def launch_gateway(
+    address: Optional[str] = None,
+    port: Optional[int] = None,
+    auto_convert: Optional[bool] = True,
+) -> JavaGateway:
     """Launch java gateway to pydolphinscheduler.
 
     TODO Note that automatic conversion makes calling Java methods slightly less efficient because
     in the worst case, Py4J needs to go through all registered converters for all parameters.
     This is why automatic conversion is disabled by default.
     """
-    gateway = JavaGateway(gateway_parameters=GatewayParameters(auto_convert=True))
+    gateway_parameters = GatewayParameters(
+        address=address or JavaGatewayDefault.SERVER_ADDRESS,
+        port=port or JavaGatewayDefault.SERVER_PORT,
+        auto_convert=auto_convert or JavaGatewayDefault.AUTO_CONVERT,
+    )
+    gateway = JavaGateway(gateway_parameters=gateway_parameters)
     return gateway
 
 
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
index de5188c..c4479dd 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py
@@ -18,5 +18,15 @@
 """Init Side package, Side package keep object related to DolphinScheduler but not in the Core part."""
 
 from pydolphinscheduler.side.project import Project
+from pydolphinscheduler.side.queue import Queue
 from pydolphinscheduler.side.tenant import Tenant
 from pydolphinscheduler.side.user import User
+from pydolphinscheduler.side.worker_group import WorkerGroup
+
+__all__ = [
+    "Project",
+    "Tenant",
+    "User",
+    "Queue",
+    "WorkerGroup",
+]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
index 9eded99..dd46c91 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py
@@ -16,3 +16,33 @@
 # under the License.
 
 """Init pydolphinscheduler.tasks package."""
+
+from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Condition, Or
+from pydolphinscheduler.tasks.datax import CustomDataX, DataX
+from pydolphinscheduler.tasks.dependent import Dependent
+from pydolphinscheduler.tasks.flink import Flink
+from pydolphinscheduler.tasks.http import Http
+from pydolphinscheduler.tasks.map_reduce import MR
+from pydolphinscheduler.tasks.procedure import Procedure
+from pydolphinscheduler.tasks.python import Python
+from pydolphinscheduler.tasks.shell import Shell
+from pydolphinscheduler.tasks.spark import Spark
+from pydolphinscheduler.tasks.sql import Sql
+from pydolphinscheduler.tasks.sub_process import SubProcess
+from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
+
+__all__ = [
+    "Condition",
+    "DataX",
+    "Dependent",
+    "Flink",
+    "Http",
+    "MR",
+    "Procedure",
+    "Python",
+    "Shell",
+    "Spark",
+    "Sql",
+    "SubProcess",
+    "Switch",
+]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
index a5773b2..50aac25 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py
@@ -154,7 +154,7 @@ class Or(ConditionOperator):
         super().__init__(*args)
 
 
-class Conditions(Task):
+class Condition(Task):
     """Task condition object, declare behavior for condition task to dolphinscheduler."""
 
     def __init__(
@@ -201,5 +201,4 @@ class Conditions(Task):
         """
         params = super().task_params
         params["dependence"] = self.condition.get_define()
-        params["conditionResult"] = self.condition_result
         return params
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py
new file mode 100644
index 0000000..83cae95
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task Flink."""
+
+from typing import Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.engine import Engine, ProgramType
+
+
+class FlinkVersion(str):
+    """Flink version, for now it just contain `HIGHT` and `LOW`."""
+
+    LOW_VERSION = "<1.10"
+    HIGHT_VERSION = ">=1.10"
+
+
+class DeployMode(str):
+    """Flink deploy mode, for now it just contain `LOCAL` and `CLUSTER`."""
+
+    LOCAL = "local"
+    CLUSTER = "cluster"
+
+
+class Flink(Engine):
+    """Task flink object, declare behavior for flink task to dolphinscheduler."""
+
+    _task_custom_attr = {
+        "deploy_mode",
+        "flink_version",
+        "slot",
+        "task_manager",
+        "job_manager_memory",
+        "task_manager_memory",
+        "app_name",
+        "parallelism",
+        "main_args",
+        "others",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        main_class: str,
+        main_package: str,
+        program_type: Optional[ProgramType] = ProgramType.SCALA,
+        deploy_mode: Optional[DeployMode] = DeployMode.CLUSTER,
+        flink_version: Optional[FlinkVersion] = FlinkVersion.LOW_VERSION,
+        app_name: Optional[str] = None,
+        job_manager_memory: Optional[str] = "1G",
+        task_manager_memory: Optional[str] = "2G",
+        slot: Optional[int] = 1,
+        task_manager: Optional[int] = 2,
+        parallelism: Optional[int] = 1,
+        main_args: Optional[str] = None,
+        others: Optional[str] = None,
+        *args,
+        **kwargs
+    ):
+        super().__init__(
+            name,
+            TaskType.FLINK,
+            main_class,
+            main_package,
+            program_type,
+            *args,
+            **kwargs
+        )
+        self.deploy_mode = deploy_mode
+        self.flink_version = flink_version
+        self.app_name = app_name
+        self.job_manager_memory = job_manager_memory
+        self.task_manager_memory = task_manager_memory
+        self.slot = slot
+        self.task_manager = task_manager
+        self.parallelism = parallelism
+        self.main_args = main_args
+        self.others = others
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/map_reduce.py
similarity index 52%
copy from dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
copy to dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/map_reduce.py
index 94ad2f4..5050bd3 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/map_reduce.py
@@ -15,24 +15,38 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Task shell."""
+"""Task MR."""
 
-from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task
+from typing import Optional
 
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.engine import Engine, ProgramType
 
-class Shell(Task):
-    """Task shell object, declare behavior for shell task to dolphinscheduler.
 
-    TODO maybe we could use instance name to replace attribute `name`
-    which is simplify as `task_shell = Shell(command = "echo 1")` and
-    task.name assign to `task_shell`
-    """
+class MR(Engine):
+    """Task mr object, declare behavior for mr task to dolphinscheduler."""
 
     _task_custom_attr = {
-        "raw_script",
+        "app_name",
+        "main_args",
+        "others",
     }
 
-    def __init__(self, name: str, command: str, *args, **kwargs):
-        super().__init__(name, TaskType.SHELL, *args, **kwargs)
-        self.raw_script = command
+    def __init__(
+        self,
+        name: str,
+        main_class: str,
+        main_package: str,
+        program_type: Optional[ProgramType] = ProgramType.SCALA,
+        app_name: Optional[str] = None,
+        main_args: Optional[str] = None,
+        others: Optional[str] = None,
+        *args,
+        **kwargs
+    ):
+        super().__init__(
+            name, TaskType.MR, main_class, main_package, program_type, *args, **kwargs
+        )
+        self.app_name = app_name
+        self.main_args = main_args
+        self.others = others
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
index 94ad2f4..9a73535 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
@@ -24,11 +24,28 @@ from pydolphinscheduler.core.task import Task
 class Shell(Task):
     """Task shell object, declare behavior for shell task to dolphinscheduler.
 
-    TODO maybe we could use instance name to replace attribute `name`
-    which is simplify as `task_shell = Shell(command = "echo 1")` and
-    task.name assign to `task_shell`
+    :param name: A unique, meaningful string for the shell task.
+    :param command: One or more command want to run in this task.
+
+        It could be simply command::
+
+            Shell(name=..., command="echo task shell")
+
+        or maybe same commands trying to do complex task::
+
+            command = '''echo task shell step 1;
+            echo task shell step 2;
+            echo task shell step 3
+            '''
+
+            Shell(name=..., command=command)
+
     """
 
+    # TODO maybe we could use instance name to replace attribute `name`
+    #  which is simplify as `task_shell = Shell(command = "echo 1")` and
+    #  task.name assign to `task_shell`
+
     _task_custom_attr = {
         "raw_script",
     }
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/spark.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/spark.py
new file mode 100644
index 0000000..565daad
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/spark.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task Spark."""
+
+from typing import Optional
+
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core.engine import Engine, ProgramType
+
+
+class SparkVersion(str):
+    """Spark version, for now it just contain `SPARK1` and `SPARK2`."""
+
+    SPARK1 = "SPARK1"
+    SPARK2 = "SPARK2"
+
+
+class DeployMode(str):
+    """SPARK deploy mode, for now it just contain `LOCAL`, `CLIENT` and `CLUSTER`."""
+
+    LOCAL = "local"
+    CLIENT = "client"
+    CLUSTER = "cluster"
+
+
+class Spark(Engine):
+    """Task spark object, declare behavior for spark task to dolphinscheduler."""
+
+    _task_custom_attr = {
+        "deploy_mode",
+        "spark_version",
+        "driver_cores",
+        "driver_memory",
+        "num_executors",
+        "executor_memory",
+        "executor_cores",
+        "app_name",
+        "main_args",
+        "others",
+    }
+
+    def __init__(
+        self,
+        name: str,
+        main_class: str,
+        main_package: str,
+        program_type: Optional[ProgramType] = ProgramType.SCALA,
+        deploy_mode: Optional[DeployMode] = DeployMode.CLUSTER,
+        spark_version: Optional[SparkVersion] = SparkVersion.SPARK2,
+        app_name: Optional[str] = None,
+        driver_cores: Optional[int] = 1,
+        driver_memory: Optional[str] = "512M",
+        num_executors: Optional[int] = 2,
+        executor_memory: Optional[str] = "2G",
+        executor_cores: Optional[int] = 2,
+        main_args: Optional[str] = None,
+        others: Optional[str] = None,
+        *args,
+        **kwargs
+    ):
+        super().__init__(
+            name,
+            TaskType.SPARK,
+            main_class,
+            main_package,
+            program_type,
+            *args,
+            **kwargs
+        )
+        self.deploy_mode = deploy_mode
+        self.spark_version = spark_version
+        self.app_name = app_name
+        self.driver_cores = driver_cores
+        self.driver_memory = driver_memory
+        self.num_executors = num_executors
+        self.executor_memory = executor_memory
+        self.executor_cores = executor_cores
+        self.main_args = main_args
+        self.others = others
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py
new file mode 100644
index 0000000..e36c47b
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Engine."""
+
+
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.core.engine import Engine, ProgramType
+
+TEST_ENGINE_TASK_TYPE = "ENGINE"
+TEST_MAIN_CLASS = "org.apache.examples.mock.Mock"
+TEST_MAIN_PACKAGE = "Mock.jar"
+TEST_PROGRAM_TYPE = ProgramType.JAVA
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.core.engine.Engine.get_resource_info",
+    return_value=({"id": 1, "name": "mock_name"}),
+)
+def test_get_jar_detail(mock_resource, mock_code_version):
+    """Test :func:`get_jar_id` can return expect value."""
+    name = "test_get_jar_detail"
+    task = Engine(
+        name,
+        TEST_ENGINE_TASK_TYPE,
+        TEST_MAIN_CLASS,
+        TEST_MAIN_PACKAGE,
+        TEST_PROGRAM_TYPE,
+    )
+    assert 1 == task.get_jar_id()
+
+
+@pytest.mark.parametrize(
+    "attr, expect",
+    [
+        (
+            {
+                "name": "test-task-params",
+                "task_type": "test-engine",
+                "main_class": "org.apache.examples.mock.Mock",
+                "main_package": "TestMock.jar",
+                "program_type": ProgramType.JAVA,
+            },
+            {
+                "mainClass": "org.apache.examples.mock.Mock",
+                "mainJar": {
+                    "id": 1,
+                },
+                "programType": ProgramType.JAVA,
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+                "waitStartTimeout": {},
+            },
+        )
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.core.engine.Engine.get_resource_info",
+    return_value=({"id": 1, "name": "mock_name"}),
+)
+def test_property_task_params(mock_resource, mock_code_version, attr, expect):
+    """Test task engine task property."""
+    task = Engine(**attr)
+    assert expect == task.task_params
+
+
+@pytest.mark.parametrize(
+    "attr, expect",
+    [
+        (
+            {
+                "name": "test-task-test_engine_get_define",
+                "task_type": "test-engine",
+                "main_class": "org.apache.examples.mock.Mock",
+                "main_package": "TestMock.jar",
+                "program_type": ProgramType.JAVA,
+            },
+            {
+                "code": 123,
+                "name": "test-task-test_engine_get_define",
+                "version": 1,
+                "description": None,
+                "delayTime": 0,
+                "taskType": "test-engine",
+                "taskParams": {
+                    "mainClass": "org.apache.examples.mock.Mock",
+                    "mainJar": {
+                        "id": 1,
+                    },
+                    "programType": ProgramType.JAVA,
+                    "localParams": [],
+                    "resourceList": [],
+                    "dependence": {},
+                    "conditionResult": {"successNode": [""], "failedNode": [""]},
+                    "waitStartTimeout": {},
+                },
+                "flag": "YES",
+                "taskPriority": "MEDIUM",
+                "workerGroup": "default",
+                "failRetryTimes": 0,
+                "failRetryInterval": 1,
+                "timeoutFlag": "CLOSE",
+                "timeoutNotifyStrategy": None,
+                "timeout": 0,
+            },
+        )
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.core.engine.Engine.get_resource_info",
+    return_value=({"id": 1, "name": "mock_name"}),
+)
+def test_engine_get_define(mock_resource, mock_code_version, attr, expect):
+    """Test task engine function get_define."""
+    task = Engine(**attr)
+    assert task.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
index 8491878..f51338d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
@@ -19,6 +19,7 @@
 
 from datetime import datetime
 from typing import Any
+from unittest.mock import patch
 
 import pytest
 from freezegun import freeze_time
@@ -30,10 +31,12 @@ from pydolphinscheduler.constants import (
 from pydolphinscheduler.core.process_definition import ProcessDefinition
 from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.side import Project, Tenant, User
+from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
 from pydolphinscheduler.utils.date import conv_to_schedule
 from tests.testing.task import Task
 
 TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
+TEST_TASK_TYPE = "test-task-type"
 
 
 @pytest.mark.parametrize("func", ["run", "submit", "start"])
@@ -151,6 +154,80 @@ def test__parse_datetime_not_support_type(val: Any):
             pd._parse_datetime(val)
 
 
+@pytest.mark.parametrize(
+    "param, expect",
+    [
+        (
+            None,
+            [],
+        ),
+        (
+            {},
+            [],
+        ),
+        (
+            {"key1": "val1"},
+            [
+                {
+                    "prop": "key1",
+                    "direct": "IN",
+                    "type": "VARCHAR",
+                    "value": "val1",
+                }
+            ],
+        ),
+        (
+            {
+                "key1": "val1",
+                "key2": "val2",
+            },
+            [
+                {
+                    "prop": "key1",
+                    "direct": "IN",
+                    "type": "VARCHAR",
+                    "value": "val1",
+                },
+                {
+                    "prop": "key2",
+                    "direct": "IN",
+                    "type": "VARCHAR",
+                    "value": "val2",
+                },
+            ],
+        ),
+    ],
+)
+def test_property_param_json(param, expect):
+    """Test ProcessDefinition's property param_json."""
+    pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, param=param)
+    assert pd.param_json == expect
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test__pre_submit_check_switch_without_param(mock_code_version):
+    """Test :func:`_pre_submit_check` if process definition with switch but without attribute param."""
+    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+        parent = Task(name="parent", task_type=TEST_TASK_TYPE)
+        switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
+        switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
+        switch_condition = SwitchCondition(
+            Branch(condition="${var} > 1", task=switch_child_1),
+            Default(task=switch_child_2),
+        )
+
+        switch = Switch(name="switch", condition=switch_condition)
+        parent >> switch
+        with pytest.raises(
+            PyDSParamException,
+            match="Parameter param must be provider if task Switch in process definition.",
+        ):
+            pd._pre_submit_check()
+
+
 def test_process_definition_get_define_without_task():
     """Test process definition function get_define without task."""
     expect = {
diff --git a/dolphinscheduler-python/pydolphinscheduler/requirements.txt b/dolphinscheduler-python/pydolphinscheduler/tests/example/__init__.py
similarity index 95%
rename from dolphinscheduler-python/pydolphinscheduler/requirements.txt
rename to dolphinscheduler-python/pydolphinscheduler/tests/example/__init__.py
index cdec3ca..49323e7 100644
--- a/dolphinscheduler-python/pydolphinscheduler/requirements.txt
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/example/__init__.py
@@ -15,4 +15,4 @@
 # specific language governing permissions and limitations
 # under the License.
 
-py4j~=0.10.9.2
\ No newline at end of file
+"""Init example package tests."""
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py b/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
new file mode 100644
index 0000000..5bf897f
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
@@ -0,0 +1,172 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test example."""
+
+import ast
+import importlib
+from unittest.mock import patch
+
+import pytest
+
+from tests.testing.constants import task_without_example
+from tests.testing.path import get_all_examples, get_tasks
+from tests.testing.task import Task
+
+process_definition_name = set()
+
+
+def import_module(script_name, script_path):
+    """Import and run example module in examples directory."""
+    spec = importlib.util.spec_from_file_location(script_name, script_path)
+    module = importlib.util.module_from_spec(spec)
+    spec.loader.exec_module(module)
+    return module
+
+
+def test_task_without_example():
+    """Test task which without example.
+
+    Avoiding add new type of tasks but without adding example describe how to use it.
+    """
+    # We use example/tutorial.py as shell task example
+    ignore_name = {"__init__.py", "shell.py"}
+    all_tasks = {task.stem for task in get_tasks(ignore_name=ignore_name)}
+
+    have_example_tasks = set()
+    start = "task_"
+    end = "_example"
+    for ex in get_all_examples():
+        stem = ex.stem
+        if stem.startswith(start) and stem.endswith(end):
+            task_name = stem.replace(start, "").replace(end, "")
+            have_example_tasks.add(task_name)
+
+    assert all_tasks.difference(have_example_tasks) == task_without_example
+
+
+@pytest.fixture
+def setup_and_teardown_for_stuff():
+    """Fixture of py.test handle setup and teardown."""
+    yield
+    global process_definition_name
+    process_definition_name = set()
+
+
+def submit_check_without_same_name(self):
+    """Side effect for verifying process definition name and adding it to global variable."""
+    if self.name in process_definition_name:
+        raise ValueError(
+            "Example process definition should not have same name, but get duplicate name: %s",
+            self.name,
+        )
+    submit_add_process_definition(self)
+
+
+def submit_add_process_definition(self):
+    """Side effect for adding process definition name to global variable."""
+    process_definition_name.add(self.name)
+
+
+def test_example_basic():
+    """Test example basic information.
+
+    Which including:
+    * File extension name is `.py`
+    * All example except `tutorial.py` is end with keyword "_example"
+    * All example must have not empty `__doc__`.
+    """
+    for ex in get_all_examples():
+        # All files in example is python script
+        assert (
+            ex.suffix == ".py"
+        ), f"We expect all examples is python script, but get {ex.name}."
+
+        # All except tutorial and __init__ is end with keyword "_example"
+        if ex.stem != "tutorial" and ex.stem != "__init__":
+            assert ex.stem.endswith(
+                "_example"
+            ), f"We expect all examples script end with keyword '_example', but get {ex.stem}."
+
+        # All files have __doc__
+        tree = ast.parse(ex.read_text())
+        example_doc = ast.get_docstring(tree, clean=False)
+        assert (
+            example_doc is not None
+        ), f"We expect all examples have __doc__, but {ex.name} do not."
+
+
+@patch("pydolphinscheduler.core.process_definition.ProcessDefinition.start")
+@patch(
+    "pydolphinscheduler.core.process_definition.ProcessDefinition.submit",
+    side_effect=submit_check_without_same_name,
+    autospec=True,
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    # Example bulk_create_example.py would create workflow dynamic by :func:`get_one_task_by_name`
+    # and would raise error in :func:`get_one_task_by_name` if we return constant value
+    # using :arg:`return_value`
+    side_effect=Task("test_example", "test_example").gen_code_and_version,
+)
+def test_example_process_definition_without_same_name(
+    mock_code_version, mock_submit, mock_start
+):
+    """Test all examples file without same process definition's name.
+
+    Our process definition would compete with others if we have same process definition name. It will make
+    different between actually workflow and our workflow-as-code file which make users feel strange.
+    """
+    for ex in get_all_examples():
+        # We use side_effect `submit_check_without_same_name` overwrite :func:`submit`
+        # and check whether it have duplicate name or not
+        import_module(ex.name, str(ex))
+    assert True
+
+
+@patch("pydolphinscheduler.core.process_definition.ProcessDefinition.start")
+@patch(
+    "pydolphinscheduler.core.process_definition.ProcessDefinition.submit",
+    side_effect=submit_add_process_definition,
+    autospec=True,
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    # Example bulk_create_example.py would create workflow dynamic by :func:`get_one_task_by_name`
+    # and would raise error in :func:`get_one_task_by_name` if we return constant value
+    # using :arg:`return_value`
+    side_effect=Task("test_example", "test_example").gen_code_and_version,
+)
+def test_file_name_in_process_definition(mock_code_version, mock_submit, mock_start):
+    """Test example file name in example definition name.
+
+    We should not directly assert equal, because some of the examples contain
+    more than one process definition.
+    """
+    global process_definition_name
+    for ex in get_all_examples():
+        # Skip __init__ file
+        if ex.stem == "__init__":
+            continue
+        # Skip bulk_create_example check, cause it contain multiple workflow and
+        # without one named bulk_create_example
+        if ex.stem == "bulk_create_example":
+            continue
+        process_definition_name = set()
+        assert ex.stem not in process_definition_name
+        import_module(ex.name, str(ex))
+        assert ex.stem in process_definition_name
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
index 6085de2..5232640 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py
@@ -27,8 +27,8 @@ from pydolphinscheduler.tasks.condition import (
     FAILURE,
     SUCCESS,
     And,
+    Condition,
     ConditionOperator,
-    Conditions,
     Or,
     Status,
 )
@@ -321,7 +321,7 @@ def test_condition_operator_set_define_attr_mix_operator(
     return_value=(12345, 1),
 )
 @patch(
-    "pydolphinscheduler.tasks.condition.Conditions.gen_code_and_version",
+    "pydolphinscheduler.tasks.condition.Condition.gen_code_and_version",
     return_value=(123, 1),
 )
 def test_condition_get_define(mock_condition_code_version, mock_task_code_version):
@@ -388,7 +388,7 @@ def test_condition_get_define(mock_condition_code_version, mock_task_code_versio
         "timeout": 0,
     }
 
-    task = Conditions(
+    task = Condition(
         name, condition=cond_operator, success_task=common_task, failed_task=common_task
     )
     assert task.get_define() == expect
@@ -414,7 +414,7 @@ def test_condition_set_dep_workflow(mock_task_code_version):
         success_branch = Task(name="success_branch", task_type=TEST_TYPE)
         fail_branch = Task(name="fail_branch", task_type=TEST_TYPE)
 
-        condition = Conditions(
+        condition = Condition(
             name="conditions",
             condition=cond_operator,
             success_task=success_branch,
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
index 7fa4569..9473f57 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py
@@ -119,6 +119,4 @@ def test_custom_datax_get_define(json_template):
         return_value=(code, version),
     ):
         task = CustomDataX(name, json_template)
-        print(task.get_define())
-        print(expect)
         assert task.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
new file mode 100644
index 0000000..92ae3ba
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Flink."""
+
+from unittest.mock import patch
+
+from pydolphinscheduler.tasks.flink import DeployMode, Flink, FlinkVersion, ProgramType
+
+
+@patch(
+    "pydolphinscheduler.core.engine.Engine.get_resource_info",
+    return_value=({"id": 1, "name": "test"}),
+)
+def test_flink_get_define(mock_resource):
+    """Test task flink function get_define."""
+    code = 123
+    version = 1
+    name = "test_flink_get_define"
+    main_class = "org.apache.flink.test_main_class"
+    main_package = "test_main_package"
+    program_type = ProgramType.JAVA
+    deploy_mode = DeployMode.LOCAL
+
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "FLINK",
+        "taskParams": {
+            "mainClass": main_class,
+            "mainJar": {
+                "id": 1,
+            },
+            "programType": program_type,
+            "deployMode": deploy_mode,
+            "flinkVersion": FlinkVersion.LOW_VERSION,
+            "slot": 1,
+            "parallelism": 1,
+            "taskManager": 2,
+            "jobManagerMemory": "1G",
+            "taskManagerMemory": "2G",
+            "appName": None,
+            "mainArgs": None,
+            "others": None,
+            "localParams": [],
+            "resourceList": [],
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        task = Flink(name, main_class, main_package, program_type, deploy_mode)
+        assert task.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
new file mode 100644
index 0000000..dbe9e51
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task MR."""
+
+from unittest.mock import patch
+
+from pydolphinscheduler.tasks.map_reduce import MR, ProgramType
+
+
+@patch(
+    "pydolphinscheduler.core.engine.Engine.get_resource_info",
+    return_value=({"id": 1, "name": "test"}),
+)
+def test_mr_get_define(mock_resource):
+    """Test task mr function get_define."""
+    code = 123
+    version = 1
+    name = "test_mr_get_define"
+    main_class = "org.apache.mr.test_main_class"
+    main_package = "test_main_package"
+    program_type = ProgramType.JAVA
+    main_args = "/dolphinscheduler/resources/file.txt /output/ds"
+
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "MR",
+        "taskParams": {
+            "mainClass": main_class,
+            "mainJar": {
+                "id": 1,
+            },
+            "programType": program_type,
+            "appName": None,
+            "mainArgs": main_args,
+            "others": None,
+            "localParams": [],
+            "resourceList": [],
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        task = MR(name, main_class, main_package, program_type, main_args=main_args)
+        assert task.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
new file mode 100644
index 0000000..3b0672f
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test Task Spark."""
+
+from unittest.mock import patch
+
+from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark, SparkVersion
+
+
+@patch(
+    "pydolphinscheduler.core.engine.Engine.get_resource_info",
+    return_value=({"id": 1, "name": "test"}),
+)
+def test_spark_get_define(mock_resource):
+    """Test task spark function get_define."""
+    code = 123
+    version = 1
+    name = "test_spark_get_define"
+    main_class = "org.apache.spark.test_main_class"
+    main_package = "test_main_package"
+    program_type = ProgramType.JAVA
+    deploy_mode = DeployMode.LOCAL
+
+    expect = {
+        "code": code,
+        "name": name,
+        "version": 1,
+        "description": None,
+        "delayTime": 0,
+        "taskType": "SPARK",
+        "taskParams": {
+            "mainClass": main_class,
+            "mainJar": {
+                "id": 1,
+            },
+            "programType": program_type,
+            "deployMode": deploy_mode,
+            "sparkVersion": SparkVersion.SPARK2,
+            "driverCores": 1,
+            "driverMemory": "512M",
+            "numExecutors": 2,
+            "executorMemory": "2G",
+            "executorCores": 2,
+            "appName": None,
+            "mainArgs": None,
+            "others": None,
+            "localParams": [],
+            "resourceList": [],
+            "dependence": {},
+            "conditionResult": {"successNode": [""], "failedNode": [""]},
+            "waitStartTimeout": {},
+        },
+        "flag": "YES",
+        "taskPriority": "MEDIUM",
+        "workerGroup": "default",
+        "failRetryTimes": 0,
+        "failRetryInterval": 1,
+        "timeoutFlag": "CLOSE",
+        "timeoutNotifyStrategy": None,
+        "timeout": 0,
+    }
+    with patch(
+        "pydolphinscheduler.core.task.Task.gen_code_and_version",
+        return_value=(code, version),
+    ):
+        task = Spark(name, main_class, main_package, program_type, deploy_mode)
+        assert task.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/test_docs.py b/dolphinscheduler-python/pydolphinscheduler/tests/test_docs.py
new file mode 100644
index 0000000..930e4f7
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/test_docs.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test pydolphinscheduler docs."""
+
+import re
+
+from tests.testing.constants import task_without_example
+from tests.testing.path import get_doc_tasks, get_tasks
+
+ignore_code_file = {"__init__.py"}
+ignore_doc_file = {"index.rst"}
+
+
+def test_without_missing_task_rst():
+    """Test without missing any task document by compare filename.
+
+    Avoiding add new type of tasks but without adding document about it.
+    """
+    code_files = {p.stem for p in get_tasks(ignore_name=ignore_code_file)}
+    doc_files = {p.stem for p in get_doc_tasks(ignore_name=ignore_doc_file)}
+    assert code_files == doc_files
+
+
+def test_task_without_example():
+    """Test task document which without example.
+
+    Avoiding add new type of tasks but without adding example content describe how to use it.
+    """
+    task_without_example_detected = set()
+    pattern = re.compile("Example\n-------")
+
+    for doc in get_doc_tasks(ignore_name=ignore_doc_file):
+        search_result = pattern.search(doc.read_text())
+        if not search_result:
+            task_without_example_detected.add(doc.stem)
+    assert task_without_example == task_without_example_detected
+
+
+def test_doc_automodule_directive_name():
+    """Test task document with correct name in directive automodule."""
+    pattern = re.compile(".. automodule:: (.*)")
+    for doc in get_doc_tasks(ignore_name=ignore_doc_file):
+        match_string = pattern.search(doc.read_text()).group(1)
+        assert f"pydolphinscheduler.tasks.{doc.stem}" == match_string
diff --git a/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt b/dolphinscheduler-python/pydolphinscheduler/tests/testing/constants.py
similarity index 70%
rename from dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt
rename to dolphinscheduler-python/pydolphinscheduler/tests/testing/constants.py
index d31a56d..1552192 100644
--- a/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/testing/constants.py
@@ -15,13 +15,14 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# testting
-pytest~=6.2.5
-freezegun
-# Test coverage
-coverage
-# code linting and formatting
-flake8
-flake8-docstrings
-flake8-black
-isort
+"""Constants variables for test module."""
+
+# Record some task without example in directory `example`. Some of them maybe can not write example,
+# but most of them just without adding by mistake, and we should add it later.
+task_without_example = {
+    "sql",
+    "http",
+    "sub_process",
+    "python",
+    "procedure",
+}
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py b/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py
new file mode 100644
index 0000000..2e75be2
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/testing/path.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Handle path related issue in test module."""
+
+from pathlib import Path
+from typing import Any, Generator
+
+path_code_tasks = Path(__file__).parent.parent.parent.joinpath(
+    "src", "pydolphinscheduler", "tasks"
+)
+path_example = Path(__file__).parent.parent.parent.joinpath(
+    "src", "pydolphinscheduler", "examples"
+)
+path_doc_tasks = Path(__file__).parent.parent.parent.joinpath("docs", "source", "tasks")
+
+
+def get_all_examples() -> Generator[Path, Any, None]:
+    """Get all examples files path in examples directory."""
+    return (ex for ex in path_example.iterdir() if ex.is_file())
+
+
+def get_tasks(ignore_name: set = None) -> Generator[Path, Any, None]:
+    """Get all tasks files path in src/pydolphinscheduler/tasks directory."""
+    if not ignore_name:
+        ignore_name = set()
+    return (
+        ex
+        for ex in path_code_tasks.iterdir()
+        if ex.is_file() and ex.name not in ignore_name
+    )
+
+
+def get_doc_tasks(ignore_name: set = None) -> Generator[Path, Any, None]:
+    """Get all tasks document path in docs/source/tasks directory."""
+    if not ignore_name:
+        ignore_name = set()
+    return (
+        ex
+        for ex in path_doc_tasks.iterdir()
+        if ex.is_file() and ex.name not in ignore_name
+    )
diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
index 1cec8b1..a12ff5b 100644
--- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
+++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -17,11 +17,13 @@
 
 package org.apache.dolphinscheduler.server;
 
+import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.ExecutorService;
 import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
 import org.apache.dolphinscheduler.api.service.QueueService;
+import org.apache.dolphinscheduler.api.service.ResourcesService;
 import org.apache.dolphinscheduler.api.service.SchedulerService;
 import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
 import org.apache.dolphinscheduler.api.service.TenantService;
@@ -30,6 +32,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.ProgramType;
 import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.RunMode;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
@@ -49,11 +52,16 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.server.config.PythonGatewayConfig;
+import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import javax.annotation.PostConstruct;
 
@@ -67,6 +75,8 @@ import org.springframework.context.annotation.FilterType;
 
 import py4j.GatewayServer;
 
+import org.apache.commons.collections.CollectionUtils;
+
 @ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
     @ComponentScan.Filter(type = FilterType.REGEX, pattern = {
         "org.apache.dolphinscheduler.server.master.*",
@@ -77,7 +87,7 @@ import py4j.GatewayServer;
     })
 })
 public class PythonGatewayServer extends SpringBootServletInitializer {
-    private static final Logger LOGGER = LoggerFactory.getLogger(PythonGatewayServer.class);
+    private static final Logger logger = LoggerFactory.getLogger(PythonGatewayServer.class);
 
     private static final WarningType DEFAULT_WARNING_TYPE = WarningType.NONE;
     private static final int DEFAULT_WARNING_GROUP_ID = 0;
@@ -114,6 +124,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
     private QueueService queueService;
 
     @Autowired
+    private ResourcesService resourceService;
+
+    @Autowired
     private ProjectMapper projectMapper;
 
     @Autowired
@@ -128,6 +141,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
     @Autowired
     private DataSourceMapper dataSourceMapper;
 
+    @Autowired
+    private PythonGatewayConfig pythonGatewayConfig;
+
     // TODO replace this user to build in admin user if we make sure build in one could not be change
     private final User dummyAdminUser = new User() {
         {
@@ -249,7 +265,7 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
             processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
         } else if (verifyStatus != Status.SUCCESS) {
             String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST.";
-            LOGGER.error(msg);
+            logger.error(msg);
             throw new RuntimeException(msg);
         }
 
@@ -387,12 +403,12 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
     public Map<String, Object> getDatasourceInfo(String datasourceName) {
         Map<String, Object> result = new HashMap<>();
         List<DataSource> dataSourceList = dataSourceMapper.queryDataSourceByName(datasourceName);
-        if (dataSourceList.size() > 1) {
-            String msg = String.format("Get more than one datasource by name %s", datasourceName);
+        if (dataSourceList == null || dataSourceList.isEmpty()) {
+            String msg = String.format("Can not find any datasource by name %s", datasourceName);
             logger.error(msg);
             throw new IllegalArgumentException(msg);
-        } else if (dataSourceList.size() == 0) {
-            String msg = String.format("Can not find any datasource by name %s", datasourceName);
+        } else if (dataSourceList.size() > 1) {
+            String msg = String.format("Get more than one datasource by name %s", datasourceName);
             logger.error(msg);
             throw new IllegalArgumentException(msg);
         } else {
@@ -470,13 +486,52 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
         return result;
     }
 
+    /**
+     * Get resource by given program type and full name. It return map contain resource id, name.
+     * Useful in Python API create flink or spark task which need processDefinition information.
+     *
+     * @param programType program type one of SCALA, JAVA and PYTHON
+     * @param fullName    full name of the resource
+     */
+    public Map<String, Object> getResourcesFileInfo(String programType, String fullName) {
+        Map<String, Object> result = new HashMap<>();
+
+        Map<String, Object> resources = resourceService.queryResourceByProgramType(dummyAdminUser, ResourceType.FILE, ProgramType.valueOf(programType));
+        List<ResourceComponent> resourcesComponent = (List<ResourceComponent>) resources.get(Constants.DATA_LIST);
+        List<ResourceComponent> namedResources = resourcesComponent.stream().filter(s -> fullName.equals(s.getFullName())).collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(namedResources)) {
+            String msg = String.format("Can not find valid resource by program type %s and name %s", programType, fullName);
+            logger.error(msg);
+            throw new IllegalArgumentException(msg);
+        }
+
+        result.put("id", namedResources.get(0).getId());
+        result.put("name", namedResources.get(0).getName());
+        return result;
+    }
+
     @PostConstruct
     public void run() {
-        GatewayServer server = new GatewayServer(this);
-        GatewayServer.turnLoggingOn();
-        // Start server to accept python client socket
-        LOGGER.info("Start python gateway server.");
-        server.start();
+        GatewayServer server;
+        try {
+            InetAddress gatewayHost = InetAddress.getByName(pythonGatewayConfig.getGatewayServerAddress());
+            InetAddress pythonHost = InetAddress.getByName(pythonGatewayConfig.getPythonAddress());
+            server = new GatewayServer(
+                this,
+                pythonGatewayConfig.getGatewayServerPort(),
+                pythonGatewayConfig.getPythonPort(),
+                gatewayHost,
+                pythonHost,
+                pythonGatewayConfig.getConnectTimeout(),
+                pythonGatewayConfig.getReadTimeout(),
+                null
+            );
+            GatewayServer.turnLoggingOn();
+            logger.info("PythonGatewayServer started on: " + gatewayHost.toString());
+            server.start();
+        } catch (UnknownHostException e) {
+            logger.error("exception occurred while constructing PythonGatewayServer().", e);
+        }
     }
 
     public static void main(String[] args) {
diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/config/PythonGatewayConfig.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/config/PythonGatewayConfig.java
new file mode 100644
index 0000000..e4ab09d
--- /dev/null
+++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/config/PythonGatewayConfig.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.config;
+
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.stereotype.Component;
+import org.springframework.beans.factory.annotation.Value;
+
+@Component
+@PropertySource(value = "python-gateway.properties")
+public class PythonGatewayConfig {
+
+    @Value("${gateway.server.address:0.0.0.0}")
+    private String gatewayServerAddress;
+
+    @Value("${gateway.server.port:25333}")
+    private int gatewayServerPort;
+
+    @Value("${python.address:127.0.0.1}")
+    private String pythonAddress;
+
+    @Value("${python.port:25334}")
+    private int pythonPort;
+
+    @Value("${connect.timeout:0}")
+    private int connectTimeout;
+
+    @Value("${read.timeout:0}")
+    private int readTimeout;
+
+    public String getGatewayServerAddress() {
+        return gatewayServerAddress;
+    }
+
+    public void setGatewayServerAddress(String gatewayServerAddress) {
+        this.gatewayServerAddress = gatewayServerAddress;
+    }
+
+    public int getGatewayServerPort() {
+        return gatewayServerPort;
+    }
+
+    public void setGatewayServerPort(int gatewayServerPort) {
+        this.gatewayServerPort = gatewayServerPort;
+    }
+
+    public String getPythonAddress() {
+        return pythonAddress;
+    }
+
+    public void setPythonAddress(String pythonAddress) {
+        this.pythonAddress = pythonAddress;
+    }
+
+    public int getPythonPort() {
+        return pythonPort;
+    }
+
+    public void setPythonPort(int pythonPort) {
+        this.pythonPort = pythonPort;
+    }
+
+    public int getConnectTimeout() {
+        return connectTimeout;
+    }
+
+    public void setConnectTimeout(int connectTimeout) {
+        this.connectTimeout = connectTimeout;
+    }
+
+    public int getReadTimeout() {
+        return readTimeout;
+    }
+
+    public void setReadTimeout(int readTimeout) {
+        this.readTimeout = readTimeout;
+    }
+}
diff --git a/dolphinscheduler-python/src/main/resources/python-gateway.properties b/dolphinscheduler-python/src/main/resources/python-gateway.properties
new file mode 100644
index 0000000..86ad059
--- /dev/null
+++ b/dolphinscheduler-python/src/main/resources/python-gateway.properties
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
+# between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
+#gateway.server.address=0.0.0.0
+
+# The port of Python gateway server start. Define which port you could connect to Python gateway server from
+# Python API side.
+#gateway.server.port=25333
+
+# The address of Python callback client.
+#python.address=127.0.0.1
+
+# The port of Python callback client.
+#python.port=25334
+
+# Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),
+# and socket server would never close even though no requests accept
+#connect.timeout=0
+
+# Close each active connection of socket server if python program not active after x milliseconds. Define value is
+# (0 = infinite), and socket server would never close even though no requests accept
+#read.timeout=0