You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/11/09 03:38:29 UTC

[dolphinscheduler-sdk-python] branch main updated: [migrate] Add tag 3.0.0 source code (#7)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ed3bf9  [migrate] Add tag 3.0.0 source code (#7)
7ed3bf9 is described below

commit 7ed3bf9c44e285bc901ea4d133c7bdccaa3e0116
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Wed Nov 9 11:38:25 2022 +0800

    [migrate] Add tag 3.0.0 source code (#7)
    
    Add version 3.0.0 code from https://github.com/apache/dolphinscheduler/tree/3.0.0/dolphinscheduler-python/pydolphinscheduler
---
 .coveragerc                                        |   6 +-
 .flake8                                            |   8 +-
 .pre-commit-config.yaml                            |  56 +++++
 DEVELOP.md                                         | 139 ++++++++++-
 LICENSE                                            | 228 +++++++++++++++++
 NOTICE                                             |   5 +
 README.md                                          |   8 +-
 UPDATING.md                                        |  37 +++
 docs/source/{tasks/index.rst => cli.rst}           |  43 ++--
 docs/source/conf.py                                |   5 +
 docs/source/config.rst                             | 218 +++++++++++++++++
 docs/source/{tasks => howto}/index.rst             |  29 +--
 docs/source/howto/remote-submit.rst                |  51 ++++
 docs/source/index.rst                              |   3 +
 docs/source/start.rst                              | 112 +++++++--
 docs/source/tasks/{index.rst => func_wrap.rst}     |  40 ++-
 docs/source/tasks/index.rst                        |   1 +
 docs/source/tutorial.rst                           | 233 ++++++++++++------
 pytest.ini                                         |   5 +-
 setup.py                                           |  25 +-
 src/pydolphinscheduler/{core => cli}/__init__.py   |  12 +-
 src/pydolphinscheduler/cli/commands.py             |  92 +++++++
 src/pydolphinscheduler/constants.py                |  33 +--
 src/pydolphinscheduler/core/__init__.py            |   2 +
 src/pydolphinscheduler/core/base_side.py           |   4 +-
 src/pydolphinscheduler/core/configuration.py       | 193 +++++++++++++++
 src/pydolphinscheduler/core/default_config.yaml    |  58 +++++
 src/pydolphinscheduler/core/process_definition.py  |  99 ++++++--
 .../core/{base_side.py => resource.py}             |  31 +--
 src/pydolphinscheduler/core/task.py                |  38 ++-
 .../examples/task_dependent_example.py             |   6 +-
 .../examples/tutorial_decorator.py                 |  91 +++++++
 src/pydolphinscheduler/exceptions.py               |  12 +-
 src/pydolphinscheduler/java_gateway.py             |   7 +-
 src/pydolphinscheduler/side/project.py             |   8 +-
 src/pydolphinscheduler/side/queue.py               |   6 +-
 src/pydolphinscheduler/side/tenant.py              |   8 +-
 src/pydolphinscheduler/side/user.py                |  21 +-
 src/pydolphinscheduler/tasks/func_wrap.py          |  61 +++++
 src/pydolphinscheduler/tasks/python.py             |  71 +++++-
 src/pydolphinscheduler/tasks/sql.py                |  30 ++-
 src/pydolphinscheduler/tasks/switch.py             |   6 +-
 src/pydolphinscheduler/utils/file.py               |  57 +++++
 src/pydolphinscheduler/utils/yaml_parser.py        | 159 ++++++++++++
 .../core => tests/cli}/__init__.py                 |  12 +-
 tests/cli/test_config.py                           | 198 +++++++++++++++
 tests/cli/test_version.py                          |  59 +++++
 tests/core/test_configuration.py                   | 272 +++++++++++++++++++++
 .../task.py => core/test_default_config_yaml.py}   |  25 +-
 tests/core/test_process_definition.py              | 141 ++++++++---
 .../task.py => core/test_resource_definition.py}   |  30 ++-
 tests/core/test_task.py                            |  60 ++++-
 tests/example/test_example.py                      |   4 +-
 .../core => tests/integration}/__init__.py         |  12 +-
 tests/integration/conftest.py                      |  46 ++++
 tests/integration/test_java_gateway.py             |  53 ++++
 .../integration/test_process_definition.py         |  38 ++-
 tests/integration/test_submit_examples.py          |  56 +++++
 tests/tasks/test_func_wrap.py                      | 169 +++++++++++++
 tests/tasks/test_python.py                         |  40 ++-
 tests/tasks/test_sql.py                            |  50 ++--
 tests/testing/cli.py                               |  87 +++++++
 tests/testing/constants.py                         |  20 ++
 .../core/__init__.py => tests/testing/decorator.py |  22 +-
 tests/testing/docker_wrapper.py                    |  98 ++++++++
 tests/testing/{task.py => file.py}                 |  22 +-
 tests/testing/path.py                              |  13 +-
 tests/testing/task.py                              |  15 ++
 tests/utils/test_file.py                           |  85 +++++++
 tests/utils/test_yaml_parser.py                    | 255 +++++++++++++++++++
 tox.ini                                            |  61 +++++
 71 files changed, 3826 insertions(+), 474 deletions(-)

diff --git a/.coveragerc b/.coveragerc
index 524cb73..1620509 100644
--- a/.coveragerc
+++ b/.coveragerc
@@ -20,8 +20,10 @@ command_line = -m pytest
 omit = 
     # Ignore all test cases in tests/
     tests/*
+    # Ignore examples directory
+    */pydolphinscheduler/examples/*
     # TODO. Temporary ignore java_gateway file, because we could not find good way to test it.
-    src/pydolphinscheduler/java_gateway.py
+    */pydolphinscheduler/java_gateway.py
 
 [report]
 # Don’t report files that are 100% covered
@@ -29,4 +31,4 @@ skip_covered = True
 show_missing = True
 precision = 2
 # Report will fail when coverage under 90.00%
-fail_under = 85
+fail_under = 90
diff --git a/.flake8 b/.flake8
index 7f659a2..120b42f 100644
--- a/.flake8
+++ b/.flake8
@@ -26,7 +26,9 @@ exclude =
     old,
     build,
     dist,
-    htmlcov
+    htmlcov,
+    .tox,
+    dist,
 ignore = 
     # It's clear and not need to add docstring
     D107,  # D107: Don't require docstrings on __init__
@@ -34,5 +36,5 @@ ignore =
     # Conflict to Black
     W503   # W503: Line breaks before binary operators
 per-file-ignores =
-    src/pydolphinscheduler/side/__init__.py:F401
-    src/pydolphinscheduler/tasks/__init__.py:F401
+    */pydolphinscheduler/side/__init__.py:F401
+    */pydolphinscheduler/tasks/__init__.py:F401
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
new file mode 100644
index 0000000..9e817a5
--- /dev/null
+++ b/.pre-commit-config.yaml
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# See https://pre-commit.com for more information
+# See https://pre-commit.com/hooks.html for more hooks
+
+default_stages: [commit, push]
+default_language_version:
+  # force all python hooks to run python3
+  python: python3
+repos:
+  - repo: https://github.com/pycqa/isort
+    rev: 5.10.1
+    hooks:
+      - id: isort
+        name: isort (python)
+  - repo: https://github.com/psf/black
+    rev: 22.1.0
+    hooks:
+      - id: black
+  - repo: https://github.com/pycqa/flake8
+    rev: 4.0.1
+    hooks:
+      - id: flake8
+        additional_dependencies: [
+            'flake8-docstrings>=1.6',
+            'flake8-black>=0.2',
+        ]
+        # pre-commit run in the root, so we have to point out the full path of configuration
+        args: [
+            --config, 
+            dolphinscheduler-python/pydolphinscheduler/.flake8
+        ]
+  - repo: https://github.com/pycqa/autoflake
+    rev: v1.4
+    hooks:
+      - id: autoflake
+        args: [
+            --remove-all-unused-imports,
+            --ignore-init-module-imports,
+            --in-place
+        ]
diff --git a/DEVELOP.md b/DEVELOP.md
index f22ab86..bdd0416 100644
--- a/DEVELOP.md
+++ b/DEVELOP.md
@@ -34,7 +34,7 @@ Now, we should install all dependence to make sure we could run test or check co
 
 ```shell
 cd dolphinscheduler/dolphinscheduler-python/pydolphinscheduler
-pip install .[dev]
+python -m pip install .[dev]
 ```
 
 Next, we have to open pydolphinscheduler project in you editor. We recommend you use [pycharm][pycharm]
@@ -60,7 +60,57 @@ pydolphinscheduler tasks object, we use tasks to define exact job we want Dolphi
 we only support `shell` task to execute shell task. [This link][all-task] list all tasks support in DolphinScheduler
 and would be implemented in the further.
 
-## Code Style
+## Test Your Code
+
+Linting and tests is very important for open source project, so we pay more attention to it. We have continuous
+integration service run by GitHub Action to test whether the patch is good or not, which you could jump to
+section [With GitHub Action](#with-github-action) see more detail.
+
+And to make more convenience to local tests, we also have the way to run your [test automated with tox](#automated-testing-with-tox)
+locally(*run all tests except integrate test with need docker environment*). It is helpful when your try to find out the
+detail when continuous integration in GitHub Action failed, or you have a great patch and want to test local first.
+
+Besides [automated testing with tox](#automated-testing-with-tox) locally, we also have a [manual way](#manually)
+run tests. And it is scattered commands to reproduce each step of the integration test we told about.
+
+* Remote
+  * [With GitHub Action](#with-github-action)
+* Local
+  * [Automated Testing With tox](#automated-testing-with-tox)(including all but integrate test)
+  * [Manually](#manually)(with integrate test)
+
+### With GitHub Action
+
+GitHub Action test in various environment for pydolphinscheduler, including different python version in
+`3.6|3.7|3.8|3.9` and operating system `linux|macOS|windows`. It will trigger and run automatically when you
+submit pull requests to `apache/dolphinscheduler`.
+
+### Automated Testing With tox
+
+[tox](https://tox.wiki) is a package aims to automate and standardize testing in Python, both our continuous
+integration and local test use it to run actual task. To use it, you should install it first
+
+```shell
+python -m pip install --upgrade tox
+```
+
+After installation, you could run a single command to run all the tests, it is almost like test in GitHub Action
+but not so much different environment.
+
+```shell
+tox -e local-ci
+```
+
+It will take a while when you run it the first time, because it has to install dependencies and make some prepare,
+and the next time you run it will be faster.
+
+If you failed section `lint` when you run command `tox -e local-ci`, you could try to run command `tox -e auto-lint`
+which we provider fix as many lints as possible. When I finish, you could run command `tox -e local-ci` to see
+whether the linter pass or not, you have to fix it by yourself if linter still fail. 
+
+### Manually
+
+#### Code Style
 
 We use [isort][isort] to automatically keep Python imports alphabetically, and use [Black][black] for code
 formatter and [Flake8][flake8] for pep8 checker. If you use [pycharm][pycharm]or [IntelliJ IDEA][idea],
@@ -69,18 +119,49 @@ maybe you could follow [Black-integration][black-editor] to configure them in yo
 Our Python API CI would automatically run code style checker and unittest when you submit pull request in
 GitHub, you could also run static check locally.
 
+We recommend [pre-commit](https://pre-commit.com/) to do the checker mentioned above before you develop locally. 
+You should install `pre-commit` by running
+
+```shell
+python -m pip install pre-commit 
+```
+
+in your development environment and then run `pre-commit install` to set up the git hooks scripts. After finish
+above steps, each time you run `git commit` or `git push` would run pre-commit check to make basic check before
+you create pull requests in GitHub.
+
 ```shell
 # We recommend you run isort and Black before Flake8, because Black could auto fix some code style issue
 # but Flake8 just hint when code style not match pep8
 
 # Run Isort
-isort .
+python -m isort .
 
 # Run Black
-black .
+python -m black .
 
 # Run Flake8
-flake8
+python -m flake8
+```
+
+#### Testing
+
+## Build Docs
+
+We use [sphinx][sphinx] to build docs. Dolphinscheduler Python API CI would automatically build docs when you submit pull request in 
+GitHub. You may locally ensure docs could be built suceessfully in case the failure blocks CI.
+
+To build docs locally, install sphinx and related python modules first via:
+
+```shell
+python -m pip install '.[doc]'
+``` 
+
+Then 
+
+```shell
+cd pydolphinscheduler/docs/
+make clean && make html
 ```
 
 ## Testing
@@ -88,25 +169,58 @@ flake8
 pydolphinscheduler using [pytest][pytest] to test our codebase. GitHub Action will run our test when you create
 pull request or commit to dev branch, with python version `3.6|3.7|3.8|3.9` and operating system `linux|macOS|windows`.
 
-To test locally, you could directly run pytest after set `PYTHONPATH` 
+pydolphinscheduler using [pytest][pytest] to run all tests in directory `tests`. You could run tests by the commands
 
 ```shell
-PYTHONPATH=src/ pytest
+python -m pytest --cov=pydolphinscheduler --cov-config=.coveragerc tests/
 ```
 
-We try to keep pydolphinscheduler usable through unit test coverage. 90% test coverage is our target, but for
-now, we require test coverage up to 85%, and each pull request leas than 85% would fail our CI step
-`Tests coverage`. We use [coverage][coverage] to check our test coverage, and you could check it locally by
-run command.
+Besides run tests, it will also check the unit test [coverage][coverage] threshold, for now when test cover less than 90%
+will fail the coverage, as well as our GitHub Action.
+
+The command above will check test coverage automatically, and you could also test the coverage by command.
 
 ```shell
-coverage run && coverage report
+python -m coverage run && python -m  coverage report
 ```
 
 It would not only run unit test but also show each file coverage which cover rate less than 100%, and `TOTAL`
 line show you total coverage of you code. If your CI failed with coverage you could go and find some reason by
 this command output.
 
+#### Integrate Test
+
+Integrate Test can not run when you execute command `tox -e local-ci` because it needs external environment
+including [Docker](https://docs.docker.com/get-docker/) and specific image build by [maven](https://maven.apache.org/install.html).
+Here we would show you the step to run integrate test in directory `dolphinscheduler-python/pydolphinscheduler/tests/integration`.
+
+```shell
+# Go to project root directory and build Docker image
+cd ../../
+
+# Build Docker image
+./mvnw -B clean install \
+    -Dmaven.test.skip \
+    -Dmaven.javadoc.skip \
+    -Dmaven.checkstyle.skip \
+    -Pdocker,release -Ddocker.tag=ci \
+    -pl dolphinscheduler-standalone-server -am
+
+# Go to pydolphinscheduler root directory and run integrate tests
+tox -e integrate-test
+```
+
+## Add LICENSE When New Dependencies Adding
+
+When you add a new package in pydolphinscheduler, you should also add the package's LICENSE to directory
+`dolphinscheduler-dist/release-docs/licenses/python-api-licenses`, and also add a short description to
+`dolphinscheduler-dist/release-docs/LICENSE`.
+
+## Update `UPDATING.md` when public class, method or interface is be changed
+
+When you change public class, method or interface, you should change the [UPDATING.md](./UPDATING.md) to notice
+users who may use it in other way.
+
 <!-- content -->
 [py4j]: https://www.py4j.org/index.html
 [pycharm]: https://www.jetbrains.com/pycharm
@@ -118,3 +232,4 @@ this command output.
 [black-editor]: https://black.readthedocs.io/en/stable/integrations/editors.html#pycharm-intellij-idea
 [coverage]: https://coverage.readthedocs.io/en/stable/
 [isort]: https://pycqa.github.io/isort/index.html
+[sphinx]: https://www.sphinx-doc.org/en/master/
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..a7359fa
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,228 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {yyyy} {name of copyright owner}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+============================================================================
+Apache DolphinScheduler Python API SUBCOMPONENTS:
+
+The Apache DolphinScheduler Python API project contains subcomponents
+with separate copyright notices and license terms. Your use of the source
+code for the these subcomponents is subject to the terms and conditions
+of the following licenses.
+
+========================================================================
+BSD licenses
+========================================================================
+
+The following components are provided under a BSD license. See project link for details.
+The text of each license is also included at licenses/LICENSE-[project].txt.
+
+    py4j v0.10 (https://github.com/py4j/py4j)
+    click v8.0 (https://github.com/pallets/click)
+
+========================================================================
+MIT licenses
+========================================================================
+
+The following components are provided under the MIT License. See project link for details.
+The text of each license is also included at licenses/LICENSE-[project].txt.
+
+    ruamel.yaml v0.17 (https://sourceforge.net/projects/ruamel-yaml/)
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..61acdab
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,5 @@
+Apache DolphinScheduler
+Copyright 2017-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
diff --git a/README.md b/README.md
index 9cc524d..316d604 100644
--- a/README.md
+++ b/README.md
@@ -38,11 +38,11 @@ your workflow by python code, aka workflow-as-codes.
 
 ```shell
 # Install
-$ pip install apache-dolphinscheduler
+python -m pip install apache-dolphinscheduler
 
-# Check installation, it is success if you see version output, here we use 0.1.0 as example
-$ python -c "import pydolphinscheduler; print(pydolphinscheduler.__version__)"
-0.1.0
+# Verify installation is successful, it will show the version of apache-dolphinscheduler, here we use 0.1.0 as example
+pydolphinscheduler version
+# 0.1.0
 ```
 
 Here we show you how to install and run a simple example of pydolphinscheduler
diff --git a/UPDATING.md b/UPDATING.md
new file mode 100644
index 0000000..d772c6f
--- /dev/null
+++ b/UPDATING.md
@@ -0,0 +1,37 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# UPDATING
+
+Updating is try to document non-backward compatible updates which notice users the detail changes about pydolphinscheduler.
+It started after version 2.0.5 released
+
+## dev
+
+* Change variable about where to keep pydolphinscheduler configuration from ``PYDOLPHINSCHEDULER_HOME`` to
+  ``PYDS_HOME`` which is same as other environment variable name.
+
+## 3.0.0a0
+
+* Integrate Python gateway server into Dolphinscheduler API server, and you could start Python gateway service by command
+  `./bin/dolphinscheduler-daemon.sh start api-server` instead of independent command
+  `./bin/dolphinscheduler-daemon.sh start python-gateway-server`.
+* Remove parameter `queue` from class `ProcessDefinition` to avoid confuse user when it change but not work
+* Change `yaml_parser.py` method `to_string` to magic method `__str__` make it more pythonic.
+* Use package ``ruamel.yaml`` replace ``pyyaml`` for write yaml file with comment.
diff --git a/docs/source/tasks/index.rst b/docs/source/cli.rst
similarity index 63%
copy from docs/source/tasks/index.rst
copy to docs/source/cli.rst
index 42dcdf9..60e8231 100644
--- a/docs/source/tasks/index.rst
+++ b/docs/source/cli.rst
@@ -15,27 +15,22 @@
    specific language governing permissions and limitations
    under the License.
 
-Tasks
-=====
-
-In this section 
-
-.. toctree::
-   :maxdepth: 1
-   
-   shell
-   sql
-   python
-   http
-
-   switch
-   condition
-   dependent
-
-   spark
-   flink
-   map_reduce
-   procedure
-
-   datax
-   sub_process
+Command Line Interface
+======================
+
+*PyDolphinScheduler* have mechanism call CLI(command line interface) to help user control it in Shell.
+
+Prepare
+-------
+
+You have to :ref:`install PyDolphinScheduler <start:installing pydolphinscheduler>` first before you using
+its CLI
+
+Usage
+-----
+
+Here is basic usage about the command line of *PyDolphinScheduler*
+
+.. click:: pydolphinscheduler.cli.commands:cli
+   :prog: pydolphinscheduler
+   :nested: full
diff --git a/docs/source/conf.py b/docs/source/conf.py
index 5ee73a5..b162e0c 100644
--- a/docs/source/conf.py
+++ b/docs/source/conf.py
@@ -55,6 +55,11 @@ extensions = [
     "sphinx.ext.viewcode",
     "sphinx.ext.autosectionlabel",
     "sphinx_rtd_theme",
+    # Documenting command line interface
+    "sphinx_click.ext",
+    # Add inline tabbed content
+    "sphinx_inline_tabs",
+    "sphinx_copybutton",
 ]
 
 # Add any paths that contain templates here, relative to this directory.
diff --git a/docs/source/config.rst b/docs/source/config.rst
new file mode 100644
index 0000000..feb147a
--- /dev/null
+++ b/docs/source/config.rst
@@ -0,0 +1,218 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Configuration
+=============
+
+pydolphinscheduler has a built-in module setting necessary configuration to start and run your workflow code.
+You could directly use them if you only want to run a quick start or for a simple job like POC. But if you
+want to deep use pydolphinscheduler and even use it in production. You should probably need to modify and
+change the built-in configuration.
+
+We have two ways to modify the configuration:
+
+- `Using Environment Variables`_: The more lightweight way to modify the configuration. it is useful in
+  containerization scenarios, like docker and k8s, or when you like to temporarily override configs in the
+  configuration file.
+- `Using Configuration File`_: The more general way to modify the configuration. It is useful when you want
+  to persist and manage configuration files in one single file.
+
+Using Environment Variables
+---------------------------
+
+You could change the configuration by adding or modifying the operating system's environment variables. No
+matter what way you used, as long as you can successfully modify the environment variables. We use two common
+ways, `Bash <by bash>`_ and `Python OS Module <by python os module>`_, as examples:
+
+By Bash
+^^^^^^^
+
+Setting environment variables via `Bash` is the most straightforward and easiest way. We give some examples about
+how to change them by Bash.
+
+.. code-block:: bash
+
+   # Modify Java Gateway Address
+   export PYDS_JAVA_GATEWAY_ADDRESS="192.168.1.1"
+
+   # Modify Workflow Default User
+   export PYDS_WORKFLOW_USER="custom-user"
+
+After executing the commands above, both ``PYDS_JAVA_GATEWAY_ADDRESS`` and ``PYDS_WORKFLOW_USER`` will be changed.
+The next time you execute and submit your workflow, it will submit to host `192.168.1.1`, and with workflow's user
+named `custom-user`.
+
+By Python OS Module
+^^^^^^^^^^^^^^^^^^^
+
+pydolphinscheduler is a Python API for Apache DolphinScheduler, and you could modify or add system environment
+variables via Python ``os`` module. In this example, we change variables as the same value as we change in
+`Bash <by bash>`_. It will take effect the next time you run your workflow, and call workflow ``run`` or ``submit``
+method next to ``os.environ`` statement.
+
+.. code-block:: python
+
+   import os
+   # Modify Java Gateway Address
+   os.environ["PYDS_JAVA_GATEWAY_ADDRESS"] = "192.168.1.1"
+
+   # Modify Workflow Default User
+   os.environ["PYDS_WORKFLOW_USER"] = "custom-user"
+
+All Configurations in Environment Variables
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+All environment variables as below, and you could modify their value via `Bash <by bash>`_ or `Python OS Module <by python os module>`_
+
++------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| Variable Section | Variable Name                      | description                                                                                                        |
++==================+====================================+====================================================================================================================+
+|                  | ``PYDS_JAVA_GATEWAY_ADDRESS``      | Default Java gateway address, will use its value when it is set.                                                   |
++                  +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+|   Java Gateway   | ``PYDS_JAVA_GATEWAY_PORT``         | Default Java gateway port, will use its value when it is set.                                                      |
++                  +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java gateway auto convert, will use its value when it is set.                                      |
++------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_USER_NAME``                 | Default user name, will use when user's ``name`` when does not specify.                                            |
++                  +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_USER_PASSWORD``             | Default user password, will use when user's ``password`` when does not specify.                                    |
++                  +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+|   Default User   | ``PYDS_USER_EMAIL``                | Default user email, will use when user's ``email`` when does not specify.                                          |
++                  +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_USER_PHONE``                | Default user phone, will use when user's ``phone`` when does not specify.                                          |
++                  +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_USER_STATE``                | Default user state, will use when user's ``state`` when does not specify.                                          |
++------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_PROJECT``          | Default workflow project name, will use its value when workflow does not specify the attribute ``project``.        |
++                  +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_TENANT``           | Default workflow tenant, will use its value when workflow does not specify the attribute ``tenant``.               |
++                  +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+| Default Workflow | ``PYDS_WORKFLOW_USER``             | Default workflow user, will use its value when workflow does not specify the attribute ``user``.                   |
++                  +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_QUEUE``            | Default workflow queue, will use its value when workflow does not specify the attribute ``queue``.                 |
++                  +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_WORKER_GROUP``     | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``.   |
++                  +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_RELEASE_STATE``    | Default workflow release state, will use its value when workflow does not specify the attribute ``release_state``. |
++                  +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_TIME_ZONE``        | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``.       |
++                  +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_WARNING_TYPE``     | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``.   |
++------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+
+.. note::
+
+   The scope of setting configuration via environment variable is in the workflow, and it will not change the
+   value of the configuration file. The :doc:`CLI <cli>` command ``config --get`` and ``config --set`` operate
+   the value of the configuration file, so the command ``config --get`` may return a different value from what
+   you set in the environment variable, and command ``config --get`` will never change your environment variable.
+
+Using Configuration File
+------------------------
+
+If you want to persist and manage configuration in a file instead of environment variables, or maybe you want
+want to save your configuration file to a version control system, like Git or SVN, and the way to change
+configuration by file is the best choice.
+
+Export Configuration File
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+pydolphinscheduler allows you to change the built-in configurations via CLI or editor you like. pydolphinscheduler
+integrated built-in configurations in its package, but you could also export it locally by CLI
+
+.. code-block:: bash
+
+   pydolphinscheduler config --init
+
+And it will create a new YAML file in the path `~/pydolphinscheduler/config.yaml` by default. If you want to export
+it to another path, you should set `PYDS_HOME` before you run command :code:`pydolphinscheduler config --init`.
+
+.. code-block:: bash
+
+    export PYDS_HOME=<CUSTOM_PATH>
+    pydolphinscheduler config --init
+
+After that, your configuration file will export into `<CUSTOM_PATH>/config.yaml` instead of the default path.
+
+Change Configuration
+^^^^^^^^^^^^^^^^^^^^
+
+In section `export configuration file`_ you export the configuration file locally, and as a local file, you could
+edit it with any editor you like. After you save your change in your editor, the latest configuration will work
+when you run your workflow code.
+
+You could also query or change the configuration via CLI :code:`config --get <config>` or :code:`config --get <config> <val>`.
+Both `--get` and `--set` could be called one or more times in single command, and you could only set the leaf
+node of the configuration but could get the parent configuration, there are simple examples below:
+
+.. code-block:: bash
+
+   # Get single configuration in the leaf node,
+   # The output look like below:
+   # java_gateway.address = 127.0.0.1
+   pydolphinscheduler config --get java_gateway.address
+
+   # Get multiple configuration in the leaf node,
+   # The output look like below:
+   # java_gateway.address = 127.0.0.1
+   # java_gateway.port = 25333
+   pydolphinscheduler config --get java_gateway.address --get java_gateway.port
+
+
+   # Get parent configuration which contain multiple leaf nodes,
+   # The output look like below:
+   # java_gateway = ordereddict([('address', '127.0.0.1'), ('port', 25333), ('auto_convert', True)])
+   pydolphinscheduler config --get java_gateway
+
+   # Set single configuration,
+   # The output look like below:
+   # Set configuration done.
+   pydolphinscheduler config --set java_gateway.address 192.168.1.1
+
+   # Set multiple configuration
+   # The output look like below:
+   # Set configuration done.
+   pydolphinscheduler config --set java_gateway.address 192.168.1.1 --set java_gateway.port 25334
+
+   # Set configuration not in leaf node will fail
+   # The output look like below:
+   # Raise error.
+   pydolphinscheduler config --set java_gateway 192.168.1.1,25334,True
+
+For more information about our CLI, you could see document :doc:`cli`.
+
+All Configurations in File
+^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Here are all our configurations for pydolphinscheduler.
+
+.. literalinclude:: ../../src/pydolphinscheduler/core/default_config.yaml
+   :language: yaml
+   :lines: 18-
+
+Priority
+--------
+
+We have two ways to modify the configuration and there is a built-in config in pydolphinscheduler too. It is
+very important to understand the priority of the configuration when you use them. The overview of configuration
+priority is.
+
+``Environment Variables > Configurations File > Built-in Configurations``
+
+This means that your setting in environment variables or configurations file will overwrite the built-in one.
+And you could temporarily modify configurations by setting environment variables without modifying the global
+config in the configuration file.
diff --git a/docs/source/tasks/index.rst b/docs/source/howto/index.rst
similarity index 71%
copy from docs/source/tasks/index.rst
copy to docs/source/howto/index.rst
index 42dcdf9..a0b3c29 100644
--- a/docs/source/tasks/index.rst
+++ b/docs/source/howto/index.rst
@@ -15,27 +15,16 @@
    specific language governing permissions and limitations
    under the License.
 
-Tasks
-=====
+HOWTOs
+======
 
-In this section 
+pydolphinscheduler HOWTOs are documents that cover a single, specific topic, and attempt to cover it fairly
+completely. This collection is an effort to foster documentation that is more detailed than the :doc:`../concept`
+and :doc:`../tutorial`.
+
+Currently, the HOWTOs are:
 
 .. toctree::
-   :maxdepth: 1
+   :maxdepth: 2
    
-   shell
-   sql
-   python
-   http
-
-   switch
-   condition
-   dependent
-
-   spark
-   flink
-   map_reduce
-   procedure
-
-   datax
-   sub_process
+   remote-submit
diff --git a/docs/source/howto/remote-submit.rst b/docs/source/howto/remote-submit.rst
new file mode 100644
index 0000000..b7efdf4
--- /dev/null
+++ b/docs/source/howto/remote-submit.rst
@@ -0,0 +1,51 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+   or more contributor license agreements.  See the NOTICE file
+   distributed with this work for additional information
+   regarding copyright ownership.  The ASF licenses this file
+   to you under the Apache License, Version 2.0 (the
+   "License"); you may not use this file except in compliance
+   with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+   software distributed under the License is distributed on an
+   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   KIND, either express or implied.  See the License for the
+   specific language governing permissions and limitations
+   under the License.
+
+Submit Your Code from Different machine
+=======================================
+
+Generally, we use pydolphinscheduler as a client to DolphinScheduler, and consider we may change our workflow
+code frequently, the best practice is running :ref:`python gateway service <start:start python gateway service>`
+in your server machine and submit the workflow code from your development machine, like a laptop or PC. This behavior
+is supported by pydolphinscheduler out of box with one or two single command lines. 
+
+Export Configuration File
+-------------------------
+
+.. code-block:: bash
+
+   pydolphinscheduler config --init
+
+your could find more detail in :ref:`configuration exporting <config:export configuration file>`
+
+Run API Server in Other Host
+----------------------------
+
+.. code-block:: bash
+
+   pydolphinscheduler config --set java_gateway.address <your-api-server-ip-or-hostname>
+
+your could find more detail in :ref:`configuration setting <config:change configuration>`
+
+Run API Server in Other Port
+----------------------------
+
+.. code-block:: bash
+
+   pydolphinscheduler config --set java_gateway.port <your-python-gateway-service-port>
+
+your could find more detail in :ref:`configuration setting <config:change configuration>`
diff --git a/docs/source/index.rst b/docs/source/index.rst
index b04c26f..24ad107 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -32,6 +32,9 @@ then go and see :doc:`tutorial` for more detail.
    tutorial
    concept
    tasks/index
+   howto/index
+   cli
+   config
    api
 
 Indices and tables
diff --git a/docs/source/start.rst b/docs/source/start.rst
index 0af90d5..270b5b8 100644
--- a/docs/source/start.rst
+++ b/docs/source/start.rst
@@ -41,8 +41,9 @@ without error(here is a example after Python 3.8.7 installed)
 
 .. code-block:: bash
 
-    $ python --version
-    Python 3.8.7
+   python --version
+
+Will see detail of Python version, such as *Python 3.8.7*
 
 Installing PyDolphinScheduler
 -----------------------------
@@ -52,16 +53,24 @@ After Python is already installed on your machine following section
 
 .. code-block:: bash
 
-    $ pip install apache-dolphinscheduler
+   python -m pip install apache-dolphinscheduler
 
 The latest version of *PyDolphinScheduler* would be installed after you run above
-command in your terminal. You could go and `start Python Gateway Server`_ to finish
+command in your terminal. You could go and `start Python Gateway Service`_ to finish
 the prepare, and then go to :doc:`tutorial` to make your hand dirty. But if you
 want to install the unreleased version of *PyDolphinScheduler*, you could go and see
-section `installing PyDolphinScheduler in dev`_ for more detail.
+section `installing PyDolphinScheduler in dev branch`_ for more detail.
+
+.. note::
 
-Installing PyDolphinScheduler In Dev
-------------------------------------
+   Currently, we released multiple pre-release package in PyPI, you can see all released package
+   including pre-release in `release history <https://pypi.org/project/apache-dolphinscheduler/#history>`_.
+   You can fix the the package version if you want to install pre-release package, for example if
+   you want to install version `3.0.0-beta-2` package, you can run command
+   :code:`python -m pip install apache-dolphinscheduler==3.0.0b2`.
+
+Installing PyDolphinScheduler In DEV Branch
+-------------------------------------------
 
 Because the project is developing and some of the features still not release.
 If you want to try some thing unreleased you could install from the source code
@@ -69,45 +78,94 @@ which we hold in GitHub
 
 .. code-block:: bash
 
-    # Clone Apache DolphinScheduler repository
-    $ git clone git@github.com:apache/dolphinscheduler.git
-    # Install PyDolphinScheduler in develop mode
-    $ cd dolphinscheduler-python/pydolphinscheduler && pip install -e .
+   # Clone Apache DolphinScheduler repository
+   git clone git@github.com:apache/dolphinscheduler.git
+   # Install PyDolphinScheduler in develop mode
+   cd dolphinscheduler-python/pydolphinscheduler && python -m pip install -e .
 
-After you installed *PyDolphinScheduler*, please remember `start Python Gateway Server`_
+After you installed *PyDolphinScheduler*, please remember `start Python Gateway Service`_
 which waiting for *PyDolphinScheduler*'s workflow definition require.
 
-Start Python Gateway Server
----------------------------
+Above command will clone whole dolphinscheduler source code to local, maybe you want to install latest pydolphinscheduler
+package directly and do not care about other code(including Python gateway service code), you can execute command
+
+.. code-block:: bash
+
+   # Must escape the '&' character by adding '\' 
+   pip install -e "git+https://github.com/apache/dolphinscheduler.git#egg=apache-dolphinscheduler&subdirectory=dolphinscheduler-python/pydolphinscheduler"
+
+Start Python Gateway Service
+----------------------------
 
 Since **PyDolphinScheduler** is Python API for `Apache DolphinScheduler`_, it
 could define workflow and tasks structure, but could not run it unless you
-`install Apache DolphinScheduler`_ and start Python gateway server. We only
-and some key steps here and you could go `install Apache DolphinScheduler`_
-for more detail
+`install Apache DolphinScheduler`_ and start its API server which including
+Python gateway service in it. We only and some key steps here and you could
+go `install Apache DolphinScheduler`_ for more detail
 
 .. code-block:: bash
 
-    # Start pythonGatewayServer
-    $ ./bin/dolphinscheduler-daemon.sh start pythonGatewayServer
+   # Start DolphinScheduler api-server which including python gateway service
+   ./bin/dolphinscheduler-daemon.sh start api-server
 
 To check whether the server is alive or not, you could run :code:`jps`. And
-the server is health if keyword `PythonGatewayServer` in the console. 
+the server is health if keyword `ApiApplicationServer` in the console.
+
+.. code-block:: bash
+
+   jps
+   # ....
+   # 201472 ApiApplicationServer
+   # ....
+
+.. note::
+
+   Please make sure you already enabled started Python gateway service along with `api-server`. The configuration is in
+   yaml config path `python-gateway.enabled : true` in api-server's configuration path in `api-server/conf/application.yaml`.
+   The default value is true and Python gateway service start when api server is been started.
+
+Run an Example
+--------------
+
+Before run an example for pydolphinscheduler, you should get the example code from it source code. You could run
+single bash command to get it
 
 .. code-block:: bash
 
-    $ jps
-    ....
-    201472 PythonGatewayServer
-    ....
+   wget https://raw.githubusercontent.com/apache/dolphinscheduler/dev/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial.py
+
+or you could copy-paste the content from `tutorial source code`_. And then you could run the example in your
+terminal
+
+.. code-block:: bash
+
+   python tutorial.py
+
+If you want to submit your workflow to a remote API server, which means that your workflow script is different
+from the API server, you should first change pydolphinscheduler configuration and then submit the workflow script
+
+.. code-block:: bash
+
+   pydolphinscheduler config --init
+   pydolphinscheduler config --set java_gateway.address <YOUR-API-SERVER-IP-OR-HOSTNAME>
+   python tutorial.py
+
+.. note::
+
+   You could see more information in :doc:`config` about all the configurations pydolphinscheduler supported.
+
+After that, you could go and see your DolphinScheduler web UI to find out a new workflow created by pydolphinscheduler,
+and the path of web UI is `Project -> Workflow -> Workflow Definition`.
+
 
 What's More
 -----------
 
-If you do not familiar with *PyDolphinScheduler*, you could go to :doc:`tutorial`
-and see how it work. But if you already know the inside of *PyDolphinScheduler*,
-maybe you could go and play with all :doc:`tasks/index` *PyDolphinScheduler* supports.
+If you do not familiar with *PyDolphinScheduler*, you could go to :doc:`tutorial` and see how it works. But
+if you already know the basic usage or concept of *PyDolphinScheduler*, you could go and play with all
+:doc:`tasks/index` *PyDolphinScheduler* supports, or see our :doc:`howto/index` about useful cases.
 
 .. _`instructions for all platforms here`: https://wiki.python.org/moin/BeginnersGuide/Download
 .. _`Apache DolphinScheduler`: https://dolphinscheduler.apache.org
 .. _`install Apache DolphinScheduler`: https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/installation/standalone.html
+.. _`tutorial source code`: https://raw.githubusercontent.com/apache/dolphinscheduler/dev/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial.py
diff --git a/docs/source/tasks/index.rst b/docs/source/tasks/func_wrap.rst
similarity index 69%
copy from docs/source/tasks/index.rst
copy to docs/source/tasks/func_wrap.rst
index 42dcdf9..5f41b80 100644
--- a/docs/source/tasks/index.rst
+++ b/docs/source/tasks/func_wrap.rst
@@ -15,27 +15,19 @@
    specific language governing permissions and limitations
    under the License.
 
-Tasks
-=====
-
-In this section 
-
-.. toctree::
-   :maxdepth: 1
-   
-   shell
-   sql
-   python
-   http
-
-   switch
-   condition
-   dependent
-
-   spark
-   flink
-   map_reduce
-   procedure
-
-   datax
-   sub_process
+Python Function Wrapper
+=======================
+
+A decorator covert Python function into pydolphinscheduler's task.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/tutorial_decorator.py
+   :start-after: [start tutorial]
+   :end-before: [end tutorial]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.func_wrap
\ No newline at end of file
diff --git a/docs/source/tasks/index.rst b/docs/source/tasks/index.rst
index 42dcdf9..d6bbb96 100644
--- a/docs/source/tasks/index.rst
+++ b/docs/source/tasks/index.rst
@@ -23,6 +23,7 @@ In this section
 .. toctree::
    :maxdepth: 1
    
+   func_wrap
    shell
    sql
    python
diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst
index 83c5746..6366c80 100644
--- a/docs/source/tutorial.rst
+++ b/docs/source/tutorial.rst
@@ -18,129 +18,202 @@
 Tutorial
 ========
 
-This tutorial show you the basic concept of *PyDolphinScheduler* and tell all
+This tutorial shows you the basic concept of *PyDolphinScheduler* and tells all
 things you should know before you submit or run your first workflow. If you
-still not install *PyDolphinScheduler* and start Apache DolphinScheduler, you
-could go and see :ref:`how to getting start PyDolphinScheduler <start:getting started>`
+still have not installed *PyDolphinScheduler* and start DolphinScheduler, you
+could go and see :ref:`how to getting start PyDolphinScheduler <start:getting started>` firstly.
 
 Overview of Tutorial
 --------------------
 
-Here have an overview of our tutorial, and it look a little complex but do not
-worry about that because we explain this example below as detailed as possible.
+Here have an overview of our tutorial, and it looks a little complex but does not
+worry about that because we explain this example below as detail as possible.
 
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
-   :start-after: [start tutorial]
-   :end-before: [end tutorial]
+There are two types of tutorials: traditional and task decorator.
+
+- **Traditional Way**: More general, support many :doc:`built-in task types <tasks/index>`, it is convenient
+  when you build your workflow at the beginning.
+- **Task Decorator**: A Python decorator allow you to wrap your function into pydolphinscheduler's task. Less
+  versatility to the traditional way because it only supported Python functions and without build-in tasks
+  supported. But it is helpful if your workflow is all built with Python or if you already have some Python
+  workflow code and want to migrate them to pydolphinscheduler.
+
+.. tab:: Tradition
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+      :dedent: 0
+      :start-after: [start tutorial]
+      :end-before: [end tutorial]
+
+.. tab:: Task Decorator
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+      :dedent: 0
+      :start-after: [start tutorial]
+      :end-before: [end tutorial]
 
 Import Necessary Module
 -----------------------
 
-First of all, we should importing necessary module which we would use later just
-like other Python package. We just create a minimum demo here, so we just import
-:class:`pydolphinscheduler.core.process_definition` and
-:class:`pydolphinscheduler.tasks.shell`.
+First of all, we should import the necessary module which we would use later just like other Python packages.
 
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
-   :start-after: [start package_import]
-   :end-before: [end package_import]
+.. tab:: Tradition
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+      :dedent: 0
+      :start-after: [start package_import]
+      :end-before: [end package_import]
 
-If you want to use other task type you could click and
-:doc:`see all tasks we support <tasks/index>`
+   In tradition tutorial we import :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` and
+   :class:`pydolphinscheduler.tasks.shell.Shell`.
+
+   If you want to use other task type you could click and :doc:`see all tasks we support <tasks/index>`
+
+.. tab:: Task Decorator
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+      :dedent: 0
+      :start-after: [start package_import]
+      :end-before: [end package_import]
+
+   In task decorator tutorial we import :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` and
+   :func:`pydolphinscheduler.tasks.func_wrap.task`.
 
 Process Definition Declaration
 ------------------------------
 
-We should instantiate object after we import them from `import necessary module`_.
-Here we declare basic arguments for process definition(aka, workflow). We define
-the name of process definition, using `Python context manager`_ and it
-**the only required argument** for object process definition. Beside that we also
-declare three arguments named `schedule`, `start_time` which setting workflow schedule
-interval and schedule start_time, and argument `tenant` which changing workflow's
-task running user in the worker, :ref:`section tenant <concept:tenant>` in *PyDolphinScheduler*
-:doc:`concept` page have more detail information.
+We should instantiate :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` object after we
+import them from `import necessary module`_. Here we declare basic arguments for process definition(aka, workflow).
+We define the name of :code:`ProcessDefinition`, using `Python context manager`_ and it **the only required argument**
+for `ProcessDefinition`. Besides, we also declare three arguments named :code:`schedule` and :code:`start_time`
+which setting workflow schedule interval and schedule start_time, and argument :code:`tenant` defines which tenant
+will be running this task in the DolphinScheduler worker. See :ref:`section tenant <concept:tenant>` in
+*PyDolphinScheduler* :doc:`concept` for more information.
 
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
-   :start-after: [start workflow_declare]
-   :end-before: [end workflow_declare]
+.. tab:: Tradition
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+      :dedent: 0
+      :start-after: [start workflow_declare]
+      :end-before: [end workflow_declare]
+
+.. tab:: Task Decorator
 
-We could find more detail about process definition in
-:ref:`concept about process definition <concept:process definition>` if you interested in it.
-For all arguments of object process definition, you could find in the
-:class:`pydolphinscheduler.core.process_definition` api documentation.
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+      :dedent: 0
+      :start-after: [start workflow_declare]
+      :end-before: [end workflow_declare]
+
+We could find more detail about :code:`ProcessDefinition` in :ref:`concept about process definition <concept:process definition>`
+if you are interested in it. For all arguments of object process definition, you could find in the
+:class:`pydolphinscheduler.core.process_definition` API documentation.
 
 Task Declaration
 ----------------
 
-Here we declare four tasks, and bot of them are simple task of
-:class:`pydolphinscheduler.tasks.shell` which running `echo` command in terminal.
-Beside the argument `command`, we also need setting argument `name` for each task *(not
-only shell task, `name` is required for each type of task)*.
+.. tab:: Tradition
 
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
-   :dedent: 0
-   :start-after: [start task_declare]
-   :end-before: [end task_declare]
+   We declare four tasks to show how to create tasks, and both of them are simple tasks of
+   :class:`pydolphinscheduler.tasks.shell` which runs `echo` command in the terminal. Besides the argument
+   `command` with :code:`echo` command, we also need to set the argument `name` for each task
+   *(not only shell task, `name` is required for each type of task)*.
+   
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+      :dedent: 0
+      :start-after: [start task_declare]
+      :end-before: [end task_declare]
+
+   Besides shell task, *PyDolphinScheduler* supports multiple tasks and you could find in :doc:`tasks/index`.
+
+.. tab:: Task Decorator
 
-Beside shell task, *PyDolphinScheduler* support multiple tasks and you could
-find in :doc:`tasks/index`.
+   We declare four tasks to show how to create tasks, and both of them are created by the task decorator which
+   using :func:`pydolphinscheduler.tasks.func_wrap.task`. All we have to do is add a decorator named
+   :code:`@task` to existing Python function, and then use them inside :class:`pydolphinscheduler.core.process_definition`
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+      :dedent: 0
+      :start-after: [start task_declare]
+      :end-before: [end task_declare]
+
+   It makes our workflow more Pythonic, but be careful that when we use task decorator mode mean we only use
+   Python function as a task and could not use the :doc:`built-in tasks <tasks/index>` most of the cases.
 
 Setting Task Dependence
 -----------------------
 
-After we declare both process definition and task, we have one workflow with
-four tasks, both all tasks is independent so that they would run in parallel.
-We should reorder the sort and the dependence of tasks. It useful when we need
-run prepare task before we run actual task or we need tasks running is specific
-rule. We both support attribute `set_downstream` and `set_upstream`, or bitwise
-operators `>>` and `<<`.
+After we declare both process definition and task, we have four tasks that are independent and will be running
+in parallel. If you want to start one task until some task is finished, you have to set dependence on those
+tasks.
 
-In this example, we set task `task_parent` is the upstream task of task
-`task_child_one` and `task_child_two`, and task `task_union` is the downstream
-task of both these two task.
+Set task dependence is quite easy by task's attribute :code:`set_downstream` and :code:`set_upstream` or by
+bitwise operators :code:`>>` and :code:`<<`
 
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
-   :dedent: 0
-   :start-after: [start task_relation_declare]
-   :end-before: [end task_relation_declare]
+In this tutorial, task `task_parent` is the leading task of the whole workflow, then task `task_child_one` and
+task `task_child_two` are its downstream tasks. Task `task_union` will not run unless both task `task_child_one`
+and task `task_child_two` was done, because both two task is `task_union`'s upstream.
+
+.. tab:: Tradition
+   
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+      :dedent: 0
+      :start-after: [start task_relation_declare]
+      :end-before: [end task_relation_declare]
 
-Please notice that we could grouping some tasks and set dependence if they have
-same downstream or upstream. We declare task `task_child_one` and `task_child_two`
-as a group here, named as `task_group` and set task `task_parent` as upstream of
-both of them. You could see more detail in :ref:`concept:Tasks Dependence` section in concept
-documentation.
+.. tab:: Task Decorator
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+      :dedent: 0
+      :start-after: [start task_relation_declare]
+      :end-before: [end task_relation_declare]
+
+.. note::
+
+   We could set task dependence in batch mode if they have the same downstream or upstream by declaring those
+   tasks as task groups. In tutorial, We declare task `task_child_one` and `task_child_two` as task group named
+   `task_group`, then set `task_group` as downstream of task `task_parent`. You could see more detail in
+   :ref:`concept:Tasks Dependence` for more detail about how to set task dependence.
 
 Submit Or Run Workflow
 ----------------------
 
-Now we finish our workflow definition, with task and task dependence, but all
-these things are in local, we should let Apache DolphinScheduler daemon know what we
-define our workflow. So the last thing we have to do here is submit our workflow to
-Apache DolphinScheduler daemon.
+After that, we finish our workflow definition, with four tasks and task dependence, but all these things are
+local, we should let the DolphinScheduler daemon know how the definition of workflow. So the last thing we
+have to do is submit the workflow to the DolphinScheduler daemon.
 
-We here in the example using `ProcessDefinition` attribute `run` to submit workflow
-to the daemon, and set the schedule time we just declare in `process definition declaration`_.
+Fortunately, we have a convenient method to submit workflow via `ProcessDefinition` attribute :code:`run` which
+will create workflow definition as well as workflow schedule.
 
-Now, we could run the Python code like other Python script, for the basic usage run
-:code:`python tutorial.py` to trigger and run it.
+.. tab:: Tradition
+   
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+      :dedent: 0
+      :start-after: [start submit_or_run]
+      :end-before: [end submit_or_run]
 
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
-   :dedent: 0
-   :start-after: [start submit_or_run]
-   :end-before: [end submit_or_run]
+.. tab:: Task Decorator
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+      :dedent: 0
+      :start-after: [start submit_or_run]
+      :end-before: [end submit_or_run]
+
+At last, we could execute this workflow code in your terminal like other Python scripts, running
+:code:`python tutorial.py` to trigger and execute it.
+
+.. note::
 
-If you not start your Apache DolphinScheduler server, you could find the way in
-:ref:`start:start Python gateway server` and it would have more detail about related server
-start. Beside attribute `run`, we have attribute `submit` for object `ProcessDefinition`
-and it just submit workflow to the daemon but not setting the schedule information. For
-more detail you could see :ref:`concept:process definition`.
+   If you do not start your DolphinScheduler API server, you could find how to start it in
+   :ref:`start:start Python gateway service` for more detail. Besides attribute :code:`run`, we have attribute
+   :code:`submit` for object `ProcessDefinition` which just submits workflow to the daemon but does not set
+   the workflow schedule information. For more detail, you could see :ref:`concept:process definition`.
 
 DAG Graph After Tutorial Run
 ----------------------------
 
-After we run the tutorial code, you could login Apache DolphinScheduler web UI,
-go and see the `DolphinScheduler project page`_. they is a new process definition be
-created and named "Tutorial". It create by *PyDolphinScheduler* and the DAG graph as below
+After we run the tutorial code, you could log in DolphinScheduler web UI, go and see the
+`DolphinScheduler project page`_. They is a new process definition be created by *PyDolphinScheduler* and it
+named "tutorial" or "tutorial_decorator". The task graph of workflow like below:
 
 .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
    :language: text
diff --git a/pytest.ini b/pytest.ini
index f2c7ae6..b1aa850 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -14,9 +14,8 @@
 # limitations under the License.
 
 [pytest]
-# Do not test test_java_gateway.py due to we can not mock java gateway for now
-addopts = --ignore=tests/test_java_gateway.py
-
 # add path here to skip pytest scan it
 norecursedirs =
     tests/testing
+    # Integration test run seperated which do not calculate coverage, it will run in `tox -e integrate-test`
+    tests/integration
diff --git a/setup.py b/setup.py
index 81e1f4f..b1e3742 100644
--- a/setup.py
+++ b/setup.py
@@ -32,11 +32,13 @@ if sys.version_info[0] < 3:
 
 logger = logging.getLogger(__name__)
 
-version = "2.0.7"
+version = "3.0.0"
 
 # Start package required
 prod = [
+    "click>=8.0.0",
     "py4j~=0.10",
+    "ruamel.yaml",
 ]
 
 build = [
@@ -48,12 +50,17 @@ build = [
 doc = [
     "sphinx>=4.3",
     "sphinx_rtd_theme>=1.0",
+    "sphinx-click>=3.0",
+    "sphinx-inline-tabs",
+    "sphinx-copybutton>=0.4.0",
 ]
 
 test = [
     "pytest>=6.2",
     "freezegun>=1.1",
     "coverage>=6.1",
+    "pytest-cov>=3.0",
+    "docker>=5.0.3",
 ]
 
 style = [
@@ -61,6 +68,7 @@ style = [
     "flake8-docstrings>=1.6",
     "flake8-black>=0.2",
     "isort>=5.10",
+    "autoflake>=1.4",
 ]
 
 dev = style + test + doc + build
@@ -91,11 +99,9 @@ class CleanCommand(Command):
 
     def initialize_options(self) -> None:
         """Set default values for options."""
-        pass
 
     def finalize_options(self) -> None:
         """Set final values for options."""
-        pass
 
     def run(self) -> None:
         """Run and remove temporary files."""
@@ -130,8 +136,10 @@ setup(
     project_urls={
         "Homepage": "https://dolphinscheduler.apache.org",
         "Documentation": "https://dolphinscheduler.apache.org/python/index.html",
-        "Source": "https://github.com/apache/dolphinscheduler/dolphinscheduler-python/pydolphinscheduler",
-        "Issue Tracker": "https://github.com/apache/dolphinscheduler/issues",
+        "Source": "https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-python/"
+        "pydolphinscheduler",
+        "Issue Tracker": "https://github.com/apache/dolphinscheduler/issues?"
+        "q=is%3Aissue+is%3Aopen+label%3APython",
         "Discussion": "https://github.com/apache/dolphinscheduler/discussions",
         "Twitter": "https://twitter.com/dolphinschedule",
     },
@@ -139,7 +147,7 @@ setup(
     package_dir={"": "src"},
     include_package_data=True,
     package_data={
-        "examples": ["examples.tutorial.py"],
+        "pydolphinscheduler": ["core/default_config.yaml"],
     },
     platforms=["any"],
     classifiers=[
@@ -173,4 +181,9 @@ setup(
     cmdclass={
         "pre_clean": CleanCommand,
     },
+    entry_points={
+        "console_scripts": [
+            "pydolphinscheduler = pydolphinscheduler.cli.commands:cli",
+        ],
+    },
 )
diff --git a/src/pydolphinscheduler/core/__init__.py b/src/pydolphinscheduler/cli/__init__.py
similarity index 73%
copy from src/pydolphinscheduler/core/__init__.py
copy to src/pydolphinscheduler/cli/__init__.py
index 31dc944..5f30c83 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/src/pydolphinscheduler/cli/__init__.py
@@ -15,14 +15,4 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Init pydolphinscheduler.core package."""
-
-from pydolphinscheduler.core.database import Database
-from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.core.task import Task
-
-__all__ = [
-    "ProcessDefinition",
-    "Task",
-    "Database",
-]
+"""Commands line interface of pydolphinscheduler."""
diff --git a/src/pydolphinscheduler/cli/commands.py b/src/pydolphinscheduler/cli/commands.py
new file mode 100644
index 0000000..e2ca86b
--- /dev/null
+++ b/src/pydolphinscheduler/cli/commands.py
@@ -0,0 +1,92 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Commands line interface's command of pydolphinscheduler."""
+
+import click
+from click import echo
+
+from pydolphinscheduler import __version__
+from pydolphinscheduler.core.configuration import (
+    get_single_config,
+    init_config_file,
+    set_single_config,
+)
+
+version_option_val = ["major", "minor", "micro"]
+
+
+@click.group()
+def cli():
+    """Apache DolphinScheduler Python API's command line interface."""
+
+
+@cli.command()
+@click.option(
+    "--part",
+    "-p",
+    required=False,
+    type=click.Choice(version_option_val, case_sensitive=False),
+    multiple=False,
+    help="The part of version your want to get.",
+)
+def version(part: str) -> None:
+    """Show current version of pydolphinscheduler."""
+    if part:
+        idx = version_option_val.index(part)
+        echo(f"{__version__.split('.')[idx]}")
+    else:
+        echo(f"{__version__}")
+
+
+@cli.command()
+@click.option(
+    "--init",
+    "-i",
+    is_flag=True,
+    help="Initialize and create configuration file to `PYDS_HOME`.",
+)
+@click.option(
+    "--set",
+    "-s",
+    "setter",
+    multiple=True,
+    type=click.Tuple([str, str]),
+    help="Set specific setting to config file."
+    "Use multiple ``--set <KEY> <VAL>`` options to set multiple configs",
+)
+@click.option(
+    "--get",
+    "-g",
+    "getter",
+    multiple=True,
+    type=str,
+    help="Get specific setting from config file."
+    "Use multiple ``--get <KEY>`` options to get multiple configs",
+)
+def config(getter, setter, init) -> None:
+    """Manage the configuration for pydolphinscheduler."""
+    if init:
+        init_config_file()
+    elif getter:
+        click.echo("The configuration query as below:\n")
+        configs_kv = [f"{key} = {get_single_config(key)}" for key in getter]
+        click.echo("\n".join(configs_kv))
+    elif setter:
+        for key, val in setter:
+            set_single_config(key, val)
+        click.echo("Set configuration done.")
diff --git a/src/pydolphinscheduler/constants.py b/src/pydolphinscheduler/constants.py
index 65bf6c5..a5089ac 100644
--- a/src/pydolphinscheduler/constants.py
+++ b/src/pydolphinscheduler/constants.py
@@ -18,29 +18,6 @@
 """Constants for pydolphinscheduler."""
 
 
-class ProcessDefinitionReleaseState:
-    """Constants for :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` release state."""
-
-    ONLINE: str = "ONLINE"
-    OFFLINE: str = "OFFLINE"
-
-
-class ProcessDefinitionDefault:
-    """Constants default value for :class:`pydolphinscheduler.core.process_definition.ProcessDefinition`."""
-
-    PROJECT: str = "project-pydolphin"
-    TENANT: str = "tenant_pydolphin"
-    USER: str = "userPythonGateway"
-    # TODO simple set password same as username
-    USER_PWD: str = "userPythonGateway"
-    USER_EMAIL: str = "userPythonGateway@dolphinscheduler.com"
-    USER_PHONE: str = "11111111111"
-    USER_STATE: int = 1
-    QUEUE: str = "queuePythonGateway"
-    WORKER_GROUP: str = "default"
-    TIME_ZONE: str = "Asia/Shanghai"
-
-
 class TaskPriority(str):
     """Constants for task priority."""
 
@@ -99,10 +76,6 @@ class JavaGatewayDefault(str):
 
     RESULT_DATA = "data"
 
-    SERVER_ADDRESS = "127.0.0.1"
-    SERVER_PORT = 25333
-    AUTO_CONVERT = True
-
 
 class Delimiter(str):
     """Constants for delimiter."""
@@ -127,3 +100,9 @@ class Time(str):
 
     FMT_STD_TIME = "%H:%M:%S"
     FMT_NO_COLON_TIME = "%H%M%S"
+
+
+class ResourceKey(str):
+    """Constants for key of resource."""
+
+    ID = "id"
diff --git a/src/pydolphinscheduler/core/__init__.py b/src/pydolphinscheduler/core/__init__.py
index 31dc944..7497d1f 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/src/pydolphinscheduler/core/__init__.py
@@ -18,10 +18,12 @@
 """Init pydolphinscheduler.core package."""
 
 from pydolphinscheduler.core.database import Database
+from pydolphinscheduler.core.engine import Engine
 from pydolphinscheduler.core.process_definition import ProcessDefinition
 from pydolphinscheduler.core.task import Task
 
 __all__ = [
+    "Engine",
     "ProcessDefinition",
     "Task",
     "Database",
diff --git a/src/pydolphinscheduler/core/base_side.py b/src/pydolphinscheduler/core/base_side.py
index ed20d70..dca1f12 100644
--- a/src/pydolphinscheduler/core/base_side.py
+++ b/src/pydolphinscheduler/core/base_side.py
@@ -19,7 +19,7 @@
 
 from typing import Optional
 
-from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core import configuration
 from pydolphinscheduler.core.base import Base
 
 
@@ -34,7 +34,7 @@ class BaseSide(Base):
         cls,
         # TODO comment for avoiding cycle import
         # user: Optional[User] = ProcessDefinitionDefault.USER
-        user=ProcessDefinitionDefault.USER,
+        user=configuration.WORKFLOW_USER,
     ):
         """Create Base if not exists."""
         raise NotImplementedError
diff --git a/src/pydolphinscheduler/core/configuration.py b/src/pydolphinscheduler/core/configuration.py
new file mode 100644
index 0000000..860f986
--- /dev/null
+++ b/src/pydolphinscheduler/core/configuration.py
@@ -0,0 +1,193 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Configuration module for pydolphinscheduler."""
+import os
+from pathlib import Path
+from typing import Any
+
+from pydolphinscheduler.exceptions import PyDSConfException
+from pydolphinscheduler.utils import file
+from pydolphinscheduler.utils.yaml_parser import YamlParser
+
+BUILD_IN_CONFIG_PATH = Path(__file__).resolve().parent.joinpath("default_config.yaml")
+
+
+def config_path() -> Path:
+    """Get the path of pydolphinscheduler configuration file."""
+    pyds_home = os.environ.get("PYDS_HOME", "~/pydolphinscheduler")
+    config_file_path = Path(pyds_home).joinpath("config.yaml").expanduser()
+    return config_file_path
+
+
+def get_configs() -> YamlParser:
+    """Get all configuration settings from configuration file.
+
+    Will use custom configuration file first if it exists, otherwise default configuration file in
+    default path.
+    """
+    path = str(config_path()) if config_path().exists() else BUILD_IN_CONFIG_PATH
+    with open(path, mode="r") as f:
+        return YamlParser(f.read())
+
+
+def init_config_file() -> None:
+    """Initialize configuration file by default configs."""
+    if config_path().exists():
+        raise PyDSConfException(
+            "Initialize configuration false to avoid overwrite configure by accident, file already exists "
+            "in %s, if you wan to overwrite the exists configure please remove the exists file manually.",
+            str(config_path()),
+        )
+    file.write(content=str(get_configs()), to_path=str(config_path()))
+
+
+def get_single_config(key: str) -> Any:
+    """Get single config to configuration file.
+
+    Support get from nested keys by delimiter ``.``.
+
+    For example, yaml config as below:
+
+    .. code-block:: yaml
+
+        one:
+          two1:
+            three: value1
+          two2: value2
+
+    you could get ``value1`` and ``value2`` by nested path
+
+    .. code-block:: python
+
+        value1 = get_single_config("one.two1.three")
+        value2 = get_single_config("one.two2")
+
+    :param key: The config key want to get it value.
+    """
+    config = get_configs()
+    if key not in config:
+        raise PyDSConfException(
+            "Configuration path %s do not exists. Can not get configuration.", key
+        )
+    return config[key]
+
+
+def set_single_config(key: str, value: Any) -> None:
+    """Change single config to configuration file.
+
+    For example, yaml config as below:
+
+    .. code-block:: yaml
+
+        one:
+          two1:
+            three: value1
+          two2: value2
+
+    you could change ``value1`` to ``value3``, also change ``value2`` to ``value4`` by nested path assigned
+
+    .. code-block:: python
+
+        set_single_config["one.two1.three"] = "value3"
+        set_single_config["one.two2"] = "value4"
+
+    :param key: The config key want change.
+    :param value: The new value want to set.
+    """
+    config = get_configs()
+    if key not in config:
+        raise PyDSConfException(
+            "Configuration path %s do not exists. Can not set configuration.", key
+        )
+    config[key] = value
+    file.write(content=str(config), to_path=str(config_path()), overwrite=True)
+
+
+def get_int(val: Any) -> int:
+    """Covert value to int."""
+    return int(val)
+
+
+def get_bool(val: Any) -> bool:
+    """Covert value to boolean."""
+    if isinstance(val, str):
+        return val.lower() in {"true", "t"}
+    elif isinstance(val, int):
+        return val == 1
+    else:
+        return bool(val)
+
+
+# Start Common Configuration Settings
+
+# Add configs as module variables to avoid read configuration multiple times when
+#  Get common configuration setting
+#  Set or get multiple configs in single time
+configs: YamlParser = get_configs()
+
+# Java Gateway Settings
+JAVA_GATEWAY_ADDRESS = os.environ.get(
+    "PYDS_JAVA_GATEWAY_ADDRESS", configs.get("java_gateway.address")
+)
+JAVA_GATEWAY_PORT = get_int(
+    os.environ.get("PYDS_JAVA_GATEWAY_PORT", configs.get("java_gateway.port"))
+)
+JAVA_GATEWAY_AUTO_CONVERT = get_bool(
+    os.environ.get(
+        "PYDS_JAVA_GATEWAY_AUTO_CONVERT", configs.get("java_gateway.auto_convert")
+    )
+)
+
+# User Settings
+USER_NAME = os.environ.get("PYDS_USER_NAME", configs.get("default.user.name"))
+USER_PASSWORD = os.environ.get(
+    "PYDS_USER_PASSWORD", configs.get("default.user.password")
+)
+USER_EMAIL = os.environ.get("PYDS_USER_EMAIL", configs.get("default.user.email"))
+USER_PHONE = str(os.environ.get("PYDS_USER_PHONE", configs.get("default.user.phone")))
+USER_STATE = get_int(
+    os.environ.get("PYDS_USER_STATE", configs.get("default.user.state"))
+)
+
+# Workflow Settings
+WORKFLOW_PROJECT = os.environ.get(
+    "PYDS_WORKFLOW_PROJECT", configs.get("default.workflow.project")
+)
+WORKFLOW_TENANT = os.environ.get(
+    "PYDS_WORKFLOW_TENANT", configs.get("default.workflow.tenant")
+)
+WORKFLOW_USER = os.environ.get(
+    "PYDS_WORKFLOW_USER", configs.get("default.workflow.user")
+)
+WORKFLOW_QUEUE = os.environ.get(
+    "PYDS_WORKFLOW_QUEUE", configs.get("default.workflow.queue")
+)
+WORKFLOW_RELEASE_STATE = os.environ.get(
+    "PYDS_WORKFLOW_RELEASE_STATE", configs.get("default.workflow.release_state")
+)
+WORKFLOW_WORKER_GROUP = os.environ.get(
+    "PYDS_WORKFLOW_WORKER_GROUP", configs.get("default.workflow.worker_group")
+)
+WORKFLOW_TIME_ZONE = os.environ.get(
+    "PYDS_WORKFLOW_TIME_ZONE", configs.get("default.workflow.time_zone")
+)
+WORKFLOW_WARNING_TYPE = os.environ.get(
+    "PYDS_WORKFLOW_WARNING_TYPE", configs.get("default.workflow.warning_type")
+)
+
+# End Common Configuration Setting
diff --git a/src/pydolphinscheduler/core/default_config.yaml b/src/pydolphinscheduler/core/default_config.yaml
new file mode 100644
index 0000000..5541af7
--- /dev/null
+++ b/src/pydolphinscheduler/core/default_config.yaml
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Setting about Java gateway server
+java_gateway:
+  # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
+  # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
+  address: 127.0.0.1
+
+  # The port of Python gateway server start. Define which port you could connect to Python gateway server from
+  # Python API side.
+  port: 25333
+
+  # Whether automatically convert Python objects to Java Objects. Default value is ``True``. There is some
+  # performance lost when set to ``True`` but for now pydolphinscheduler do not handle the convert issue between
+  # java and Python, mark it as TODO item in the future.
+  auto_convert: true
+
+# Setting about dolphinscheduler default value, will use the value set below if property do not set, which
+# including ``user``, ``workflow`` 
+default:
+  # Default value for dolphinscheduler's user object
+  user:
+    name: userPythonGateway
+    password: userPythonGateway
+    email: userPythonGateway@dolphinscheduler.com
+    tenant: tenant_pydolphin
+    phone: 11111111111
+    state: 1
+  # Default value for dolphinscheduler's workflow object
+  workflow:
+    project: project-pydolphin
+    tenant: tenant_pydolphin
+    user: userPythonGateway
+    queue: queuePythonGateway
+    worker_group: default
+    # Release state of workflow, default value is ``online`` which mean setting workflow online when it submits
+    # to Java gateway, if you want to set workflow offline set its value to ``offline``
+    release_state: online
+    time_zone: Asia/Shanghai
+    # Warning type of the workflow, default value is ``NONE`` mean do not warn user in any cases of workflow state,
+    # change to ``FAILURE`` if you want to warn users when workflow failed. All available enum value are
+    # ``NONE``, ``SUCCESS``, ``FAILURE``, ``ALL`` 
+    warning_type: NONE
diff --git a/src/pydolphinscheduler/core/process_definition.py b/src/pydolphinscheduler/core/process_definition.py
index 1c123fc..63e0808 100644
--- a/src/pydolphinscheduler/core/process_definition.py
+++ b/src/pydolphinscheduler/core/process_definition.py
@@ -21,11 +21,8 @@ import json
 from datetime import datetime
 from typing import Any, Dict, List, Optional, Set
 
-from pydolphinscheduler.constants import (
-    ProcessDefinitionDefault,
-    ProcessDefinitionReleaseState,
-    TaskType,
-)
+from pydolphinscheduler.constants import TaskType
+from pydolphinscheduler.core import configuration
 from pydolphinscheduler.core.base import Base
 from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException
 from pydolphinscheduler.java_gateway import launch_gateway
@@ -58,6 +55,17 @@ class ProcessDefinition(Base):
     """process definition object, will define process definition attribute, task, relation.
 
     TODO: maybe we should rename this class, currently use DS object name.
+
+    :param user: The user for current process definition. Will create a new one if it do not exists. If your
+        parameter ``project`` already exists but project's create do not belongs to ``user``, will grant
+        ``project`` to ``user`` automatically.
+    :param project: The project for current process definition. You could see the workflow in this project
+        thought Web UI after it :func:`submit` or :func:`run`. It will create a new project belongs to
+        ``user`` if it does not exists. And when ``project`` exists but project's create do not belongs
+        to ``user``, will grant `project` to ``user`` automatically.
+    :param resource_list: Resource files required by the current process definition.You can create and modify
+        resource files from this field. When the process definition is submitted, these resource files are
+        also submitted along with it.
     """
 
     # key attribute for identify ProcessDefinition object
@@ -75,12 +83,15 @@ class ProcessDefinition(Base):
         "_project",
         "_tenant",
         "worker_group",
+        "warning_type",
+        "warning_group_id",
         "timeout",
         "release_state",
         "param",
         "tasks",
         "task_definition_json",
         "task_relation_json",
+        "resource_list",
     }
 
     def __init__(
@@ -90,15 +101,17 @@ class ProcessDefinition(Base):
         schedule: Optional[str] = None,
         start_time: Optional[str] = None,
         end_time: Optional[str] = None,
-        timezone: Optional[str] = ProcessDefinitionDefault.TIME_ZONE,
-        user: Optional[str] = ProcessDefinitionDefault.USER,
-        project: Optional[str] = ProcessDefinitionDefault.PROJECT,
-        tenant: Optional[str] = ProcessDefinitionDefault.TENANT,
-        queue: Optional[str] = ProcessDefinitionDefault.QUEUE,
-        worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP,
+        timezone: Optional[str] = configuration.WORKFLOW_TIME_ZONE,
+        user: Optional[str] = configuration.WORKFLOW_USER,
+        project: Optional[str] = configuration.WORKFLOW_PROJECT,
+        tenant: Optional[str] = configuration.WORKFLOW_TENANT,
+        worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
+        warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE,
+        warning_group_id: Optional[int] = 0,
         timeout: Optional[int] = 0,
-        release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
+        release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
         param: Optional[Dict] = None,
+        resource_list: Optional[List] = None,
     ):
         super().__init__(name, description)
         self.schedule = schedule
@@ -108,15 +121,23 @@ class ProcessDefinition(Base):
         self._user = user
         self._project = project
         self._tenant = tenant
-        self._queue = queue
         self.worker_group = worker_group
+        self.warning_type = warning_type
+        if warning_type.strip().upper() not in ("FAILURE", "SUCCESS", "ALL", "NONE"):
+            raise PyDSParamException(
+                "Parameter `warning_type` with unexpect value `%s`", warning_type
+            )
+        else:
+            self.warning_type = warning_type.strip().upper()
+        self.warning_group_id = warning_group_id
         self.timeout = timeout
-        self.release_state = release_state
+        self._release_state = release_state
         self.param = param
         self.tasks: dict = {}
         # TODO how to fix circle import
         self._task_relations: set["TaskRelation"] = set()  # noqa: F821
         self._process_definition_code = None
+        self.resource_list = resource_list or []
 
     def __enter__(self) -> "ProcessDefinition":
         ProcessDefinitionContext.set(self)
@@ -151,15 +172,7 @@ class ProcessDefinition(Base):
 
         For now we just get from python side but not from java gateway side, so it may not correct.
         """
-        return User(
-            self._user,
-            ProcessDefinitionDefault.USER_PWD,
-            ProcessDefinitionDefault.USER_EMAIL,
-            ProcessDefinitionDefault.USER_PHONE,
-            self._tenant,
-            self._queue,
-            ProcessDefinitionDefault.USER_STATE,
-        )
+        return User(name=self._user, tenant=self._tenant)
 
     @staticmethod
     def _parse_datetime(val: Any) -> Any:
@@ -190,6 +203,25 @@ class ProcessDefinition(Base):
         """Set attribute end_time."""
         self._end_time = val
 
+    @property
+    def release_state(self) -> int:
+        """Get attribute release_state."""
+        rs_ref = {
+            "online": 1,
+            "offline": 0,
+        }
+        if self._release_state not in rs_ref:
+            raise PyDSParamException(
+                "Parameter release_state only support `online` or `offline` but get %",
+                self._release_state,
+            )
+        return rs_ref[self._release_state]
+
+    @release_state.setter
+    def release_state(self, val: str) -> None:
+        """Set attribute release_state."""
+        self._release_state = val.lower()
+
     @property
     def param_json(self) -> Optional[List[Dict]]:
         """Return param json base on self.param."""
@@ -334,8 +366,6 @@ class ProcessDefinition(Base):
         :class:`pydolphinscheduler.constants.ProcessDefinitionDefault`.
         """
         # TODO used metaclass for more pythonic
-        self.tenant.create_if_not_exists(self._queue)
-        # model User have to create after Tenant created
         self.user.create_if_not_exists()
         # Project model need User object exists
         self.project.create_if_not_exists(self._user)
@@ -345,14 +375,16 @@ class ProcessDefinition(Base):
 
         This method should be called before process definition submit to java gateway
         For now, we have below checker:
-        * `self.param` should be set if task `switch` in this workflow.
+        * `self.param` or at least one local param of task should be set if task `switch` in this workflow.
         """
         if (
             any([task.task_type == TaskType.SWITCH for task in self.tasks.values()])
             and self.param is None
+            and all([len(task.local_params) == 0 for task in self.tasks.values()])
         ):
             raise PyDSParamException(
-                "Parameter param must be provider if task Switch in process definition."
+                "Parameter param or at least one local_param of task must "
+                "be provider if task Switch in process definition."
             )
 
     def submit(self) -> int:
@@ -368,15 +400,26 @@ class ProcessDefinition(Base):
             str(self.description) if self.description else "",
             json.dumps(self.param_json),
             json.dumps(self.schedule_json) if self.schedule_json else None,
+            self.warning_type,
+            self.warning_group_id,
             json.dumps(self.task_location),
             self.timeout,
             self.worker_group,
             self._tenant,
+            self.release_state,
             # TODO add serialization function
             json.dumps(self.task_relation_json),
             json.dumps(self.task_definition_json),
             None,
         )
+        if len(self.resource_list) > 0:
+            for res in self.resource_list:
+                gateway.entry_point.createOrUpdateResource(
+                    self._user,
+                    res.name,
+                    res.description,
+                    res.content,
+                )
         return self._process_definition_code
 
     def start(self) -> None:
@@ -391,5 +434,7 @@ class ProcessDefinition(Base):
             self.name,
             "",
             self.worker_group,
+            self.warning_type,
+            self.warning_group_id,
             24 * 3600,
         )
diff --git a/src/pydolphinscheduler/core/base_side.py b/src/pydolphinscheduler/core/resource.py
similarity index 62%
copy from src/pydolphinscheduler/core/base_side.py
copy to src/pydolphinscheduler/core/resource.py
index ed20d70..bd4ffd4 100644
--- a/src/pydolphinscheduler/core/base_side.py
+++ b/src/pydolphinscheduler/core/resource.py
@@ -15,26 +15,29 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Module for side object."""
+"""Module resource."""
 
 from typing import Optional
 
-from pydolphinscheduler.constants import ProcessDefinitionDefault
 from pydolphinscheduler.core.base import Base
 
 
-class BaseSide(Base):
-    """Base class for side object, it declare base behavior for them."""
+class Resource(Base):
+    """resource object, will define the resources that you want to create or update.
 
-    def __init__(self, name: str, description: Optional[str] = None):
-        super().__init__(name, description)
+    :param name: The fullname of resource.Includes path and suffix.
+    :param content: The description of resource.
+    :param description: The description of resource.
+    """
+
+    _DEFINE_ATTR = {"name", "content", "description"}
 
-    @classmethod
-    def create_if_not_exists(
-        cls,
-        # TODO comment for avoiding cycle import
-        # user: Optional[User] = ProcessDefinitionDefault.USER
-        user=ProcessDefinitionDefault.USER,
+    def __init__(
+        self,
+        name: str,
+        content: str,
+        description: Optional[str] = None,
     ):
-        """Create Base if not exists."""
-        raise NotImplementedError
+        super().__init__(name, description)
+        self.content = content
+        self._resource_code = None
diff --git a/src/pydolphinscheduler/core/task.py b/src/pydolphinscheduler/core/task.py
index 693f508..90c0e89 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -17,16 +17,17 @@
 
 """DolphinScheduler Task and TaskRelation object."""
 
-import logging
+from logging import getLogger
 from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
 
 from pydolphinscheduler.constants import (
     Delimiter,
-    ProcessDefinitionDefault,
+    ResourceKey,
     TaskFlag,
     TaskPriority,
     TaskTimeoutFlag,
 )
+from pydolphinscheduler.core import configuration
 from pydolphinscheduler.core.base import Base
 from pydolphinscheduler.core.process_definition import (
     ProcessDefinition,
@@ -34,6 +35,8 @@ from pydolphinscheduler.core.process_definition import (
 )
 from pydolphinscheduler.java_gateway import launch_gateway
 
+logger = getLogger(__name__)
+
 
 class TaskRelation(Base):
     """TaskRelation object, describe the relation of exactly two tasks."""
@@ -104,7 +107,7 @@ class Task(Base):
         description: Optional[str] = None,
         flag: Optional[str] = TaskFlag.YES,
         task_priority: Optional[str] = TaskPriority.MEDIUM,
-        worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP,
+        worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
         delay_time: Optional[int] = 0,
         fail_retry_times: Optional[int] = 0,
         fail_retry_interval: Optional[int] = 1,
@@ -146,14 +149,14 @@ class Task(Base):
         ):
             self.process_definition.add_task(self)
         else:
-            logging.warning(
+            logger.warning(
                 "Task code %d already in process definition, prohibit re-add task.",
                 self.code,
             )
 
         # Attribute for task param
         self.local_params = local_params or []
-        self.resource_list = resource_list or []
+        self._resource_list = resource_list or []
         self.dependence = dependence or {}
         self.wait_start_timeout = wait_start_timeout or {}
         self._condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
@@ -168,6 +171,22 @@ class Task(Base):
         """Set attribute process_definition."""
         self._process_definition = process_definition
 
+    @property
+    def resource_list(self) -> List:
+        """Get task define attribute `resource_list`."""
+        resources = set()
+        for resource in self._resource_list:
+            if type(resource) == str:
+                resources.add(self.query_resource(resource).get(ResourceKey.ID))
+            elif type(resource) == dict and resource.get(ResourceKey.ID) is not None:
+                logger.warning(
+                    """`resource_list` should be defined using List[str] with resource paths,
+                       the use of ids to define resources will be remove in version 3.2.0.
+                    """
+                )
+                resources.add(resource.get(ResourceKey.ID))
+        return [{ResourceKey.ID: r} for r in resources]
+
     @property
     def condition_result(self) -> Dict:
         """Get attribute condition_result."""
@@ -271,8 +290,15 @@ class Task(Base):
         # TODO get code from specific project process definition and task name
         gateway = launch_gateway()
         result = gateway.entry_point.getCodeAndVersion(
-            self.process_definition._project, self.name
+            self.process_definition._project, self.process_definition.name, self.name
         )
         # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
         # gateway_result_checker(result)
         return result.get("code"), result.get("version")
+
+    def query_resource(self, full_name):
+        """Get resource info from java gateway, contains resource id, name."""
+        gateway = launch_gateway()
+        return gateway.entry_point.queryResourcesFileInfo(
+            self.process_definition.user.name, full_name
+        )
diff --git a/src/pydolphinscheduler/examples/task_dependent_example.py b/src/pydolphinscheduler/examples/task_dependent_example.py
index ae19d95..88d6ea2 100644
--- a/src/pydolphinscheduler/examples/task_dependent_example.py
+++ b/src/pydolphinscheduler/examples/task_dependent_example.py
@@ -35,7 +35,7 @@ task_dependent:
 
 task_dependent(this task dependent on task_dependent_external.task_1 and task_dependent_external.task_2).
 """
-from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core import configuration
 from pydolphinscheduler.core.process_definition import ProcessDefinition
 from pydolphinscheduler.tasks.dependent import And, Dependent, DependentItem, Or
 from pydolphinscheduler.tasks.shell import Shell
@@ -58,12 +58,12 @@ with ProcessDefinition(
         dependence=And(
             Or(
                 DependentItem(
-                    project_name=ProcessDefinitionDefault.PROJECT,
+                    project_name=configuration.WORKFLOW_PROJECT,
                     process_definition_name="task_dependent_external",
                     dependent_task_name="task_1",
                 ),
                 DependentItem(
-                    project_name=ProcessDefinitionDefault.PROJECT,
+                    project_name=configuration.WORKFLOW_PROJECT,
                     process_definition_name="task_dependent_external",
                     dependent_task_name="task_2",
                 ),
diff --git a/src/pydolphinscheduler/examples/tutorial_decorator.py b/src/pydolphinscheduler/examples/tutorial_decorator.py
new file mode 100644
index 0000000..986c1bb
--- /dev/null
+++ b/src/pydolphinscheduler/examples/tutorial_decorator.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+r"""
+A tutorial example take you to experience pydolphinscheduler.
+
+After tutorial.py file submit to Apache DolphinScheduler server a DAG would be create,
+and workflow DAG graph as below:
+
+                  --> task_child_one
+                /                    \
+task_parent -->                        -->  task_union
+                \                    /
+                  --> task_child_two
+
+it will instantiate and run all the task it have.
+"""
+
+# [start tutorial]
+# [start package_import]
+# Import ProcessDefinition object to define your workflow attributes
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+
+# Import task Shell object cause we would create some shell tasks later
+from pydolphinscheduler.tasks.func_wrap import task
+
+# [end package_import]
+
+
+# [start task_declare]
+@task
+def task_parent():
+    """First task in this workflow."""
+    print("echo hello pydolphinscheduler")
+
+
+@task
+def task_child_one():
+    """Child task will be run parallel after task ``task_parent`` finished."""
+    print("echo 'child one'")
+
+
+@task
+def task_child_two():
+    """Child task will be run parallel after task ``task_parent`` finished."""
+    print("echo 'child two'")
+
+
+@task
+def task_union():
+    """Last task in this workflow."""
+    print("echo union")
+
+
+# [end task_declare]
+
+
+# [start workflow_declare]
+with ProcessDefinition(
+    name="tutorial_decorator",
+    schedule="0 0 0 * * ? *",
+    start_time="2021-01-01",
+    tenant="tenant_exists",
+) as pd:
+    # [end workflow_declare]
+
+    # [start task_relation_declare]
+    task_group = [task_child_one(), task_child_two()]
+    task_parent().set_downstream(task_group)
+
+    task_union() << task_group
+    # [end task_relation_declare]
+
+    # [start submit_or_run]
+    pd.run()
+    # [end submit_or_run]
+# [end tutorial]
diff --git a/src/pydolphinscheduler/exceptions.py b/src/pydolphinscheduler/exceptions.py
index 745ef3e..4d70a58 100644
--- a/src/pydolphinscheduler/exceptions.py
+++ b/src/pydolphinscheduler/exceptions.py
@@ -21,26 +21,22 @@
 class PyDSBaseException(Exception):
     """Base exception for pydolphinscheduler."""
 
-    pass
-
 
 class PyDSParamException(PyDSBaseException):
     """Exception for pydolphinscheduler parameter verify error."""
 
-    pass
-
 
 class PyDSTaskNoFoundException(PyDSBaseException):
     """Exception for pydolphinscheduler workflow task no found error."""
 
-    pass
-
 
 class PyDSJavaGatewayException(PyDSBaseException):
     """Exception for pydolphinscheduler Java gateway error."""
 
-    pass
-
 
 class PyDSProcessDefinitionNotAssignException(PyDSBaseException):
     """Exception for pydolphinscheduler process definition not assign error."""
+
+
+class PyDSConfException(PyDSBaseException):
+    """Exception for pydolphinscheduler configuration error."""
diff --git a/src/pydolphinscheduler/java_gateway.py b/src/pydolphinscheduler/java_gateway.py
index 2876ed5..8560638 100644
--- a/src/pydolphinscheduler/java_gateway.py
+++ b/src/pydolphinscheduler/java_gateway.py
@@ -23,6 +23,7 @@ from py4j.java_collections import JavaMap
 from py4j.java_gateway import GatewayParameters, JavaGateway
 
 from pydolphinscheduler.constants import JavaGatewayDefault
+from pydolphinscheduler.core import configuration
 from pydolphinscheduler.exceptions import PyDSJavaGatewayException
 
 
@@ -38,9 +39,9 @@ def launch_gateway(
     This is why automatic conversion is disabled by default.
     """
     gateway_parameters = GatewayParameters(
-        address=address or JavaGatewayDefault.SERVER_ADDRESS,
-        port=port or JavaGatewayDefault.SERVER_PORT,
-        auto_convert=auto_convert or JavaGatewayDefault.AUTO_CONVERT,
+        address=address or configuration.JAVA_GATEWAY_ADDRESS,
+        port=port or configuration.JAVA_GATEWAY_PORT,
+        auto_convert=auto_convert or configuration.JAVA_GATEWAY_AUTO_CONVERT,
     )
     gateway = JavaGateway(gateway_parameters=gateway_parameters)
     return gateway
diff --git a/src/pydolphinscheduler/side/project.py b/src/pydolphinscheduler/side/project.py
index 02382ca..750e3b8 100644
--- a/src/pydolphinscheduler/side/project.py
+++ b/src/pydolphinscheduler/side/project.py
@@ -19,7 +19,7 @@
 
 from typing import Optional
 
-from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core import configuration
 from pydolphinscheduler.core.base_side import BaseSide
 from pydolphinscheduler.java_gateway import launch_gateway
 
@@ -29,14 +29,14 @@ class Project(BaseSide):
 
     def __init__(
         self,
-        name: str = ProcessDefinitionDefault.PROJECT,
+        name: str = configuration.WORKFLOW_PROJECT,
         description: Optional[str] = None,
     ):
         super().__init__(name, description)
 
-    def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None:
+    def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
         """Create Project if not exists."""
         gateway = launch_gateway()
-        gateway.entry_point.createProject(user, self.name, self.description)
+        gateway.entry_point.createOrGrantProject(user, self.name, self.description)
         # TODO recover result checker
         # gateway_result_checker(result, None)
diff --git a/src/pydolphinscheduler/side/queue.py b/src/pydolphinscheduler/side/queue.py
index 9d6664e..e7c68e1 100644
--- a/src/pydolphinscheduler/side/queue.py
+++ b/src/pydolphinscheduler/side/queue.py
@@ -19,7 +19,7 @@
 
 from typing import Optional
 
-from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core import configuration
 from pydolphinscheduler.core.base_side import BaseSide
 from pydolphinscheduler.java_gateway import gateway_result_checker, launch_gateway
 
@@ -29,12 +29,12 @@ class Queue(BaseSide):
 
     def __init__(
         self,
-        name: str = ProcessDefinitionDefault.QUEUE,
+        name: str = configuration.WORKFLOW_QUEUE,
         description: Optional[str] = "",
     ):
         super().__init__(name, description)
 
-    def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None:
+    def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
         """Create Queue if not exists."""
         gateway = launch_gateway()
         # Here we set Queue.name and Queue.queueName same as self.name
diff --git a/src/pydolphinscheduler/side/tenant.py b/src/pydolphinscheduler/side/tenant.py
index 508c033..6aaabfe 100644
--- a/src/pydolphinscheduler/side/tenant.py
+++ b/src/pydolphinscheduler/side/tenant.py
@@ -19,7 +19,7 @@
 
 from typing import Optional
 
-from pydolphinscheduler.constants import ProcessDefinitionDefault
+from pydolphinscheduler.core import configuration
 from pydolphinscheduler.core.base_side import BaseSide
 from pydolphinscheduler.java_gateway import launch_gateway
 
@@ -29,15 +29,15 @@ class Tenant(BaseSide):
 
     def __init__(
         self,
-        name: str = ProcessDefinitionDefault.TENANT,
-        queue: str = ProcessDefinitionDefault.QUEUE,
+        name: str = configuration.WORKFLOW_TENANT,
+        queue: str = configuration.WORKFLOW_QUEUE,
         description: Optional[str] = None,
     ):
         super().__init__(name, description)
         self.queue = queue
 
     def create_if_not_exists(
-        self, queue_name: str, user=ProcessDefinitionDefault.USER
+        self, queue_name: str, user=configuration.USER_NAME
     ) -> None:
         """Create Tenant if not exists."""
         gateway = launch_gateway()
diff --git a/src/pydolphinscheduler/side/user.py b/src/pydolphinscheduler/side/user.py
index cd0145a..510e3a8 100644
--- a/src/pydolphinscheduler/side/user.py
+++ b/src/pydolphinscheduler/side/user.py
@@ -19,8 +19,10 @@
 
 from typing import Optional
 
+from pydolphinscheduler.core import configuration
 from pydolphinscheduler.core.base_side import BaseSide
 from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.side.tenant import Tenant
 
 
 class User(BaseSide):
@@ -39,12 +41,12 @@ class User(BaseSide):
     def __init__(
         self,
         name: str,
-        password: str,
-        email: str,
-        phone: str,
-        tenant: str,
-        queue: Optional[str] = None,
-        status: Optional[int] = 1,
+        password: Optional[str] = configuration.USER_PASSWORD,
+        email: Optional[str] = configuration.USER_EMAIL,
+        phone: Optional[str] = configuration.USER_PHONE,
+        tenant: Optional[str] = configuration.WORKFLOW_TENANT,
+        queue: Optional[str] = configuration.WORKFLOW_QUEUE,
+        status: Optional[int] = configuration.USER_STATE,
     ):
         super().__init__(name)
         self.password = password
@@ -54,8 +56,15 @@ class User(BaseSide):
         self.queue = queue
         self.status = status
 
+    def create_tenant_if_not_exists(self) -> None:
+        """Create tenant object."""
+        tenant = Tenant(name=self.tenant, queue=self.queue)
+        tenant.create_if_not_exists(self.queue)
+
     def create_if_not_exists(self, **kwargs):
         """Create User if not exists."""
+        # Should make sure queue already exists.
+        self.create_tenant_if_not_exists()
         gateway = launch_gateway()
         gateway.entry_point.createUser(
             self.name,
diff --git a/src/pydolphinscheduler/tasks/func_wrap.py b/src/pydolphinscheduler/tasks/func_wrap.py
new file mode 100644
index 0000000..c0b73a1
--- /dev/null
+++ b/src/pydolphinscheduler/tasks/func_wrap.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task function wrapper allows using decorator to create a task."""
+
+import functools
+import inspect
+import itertools
+import types
+
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.tasks.python import Python
+
+
+def _get_func_str(func: types.FunctionType) -> str:
+    """Get Python function string without indent from decorator.
+
+    :param func: The function which wraps by decorator ``@task``.
+    """
+    lines = inspect.getsourcelines(func)[0]
+
+    src_strip = ""
+    lead_space_num = None
+    for line in lines:
+        if lead_space_num is None:
+            lead_space_num = sum(1 for _ in itertools.takewhile(str.isspace, line))
+        if line.strip() == "@task":
+            continue
+        elif line.strip().startswith("@"):
+            raise PyDSParamException(
+                "Do no support other decorators for function ``task`` decorator."
+            )
+        src_strip += line[lead_space_num:]
+    return src_strip
+
+
+def task(func: types.FunctionType):
+    """Decorate which covert Python function into pydolphinscheduler task."""
+
+    @functools.wraps(func)
+    def wrapper(*args, **kwargs):
+        func_str = _get_func_str(func)
+        return Python(
+            name=kwargs.get("name", func.__name__), definition=func_str, *args, **kwargs
+        )
+
+    return wrapper
diff --git a/src/pydolphinscheduler/tasks/python.py b/src/pydolphinscheduler/tasks/python.py
index 7950480..52903d4 100644
--- a/src/pydolphinscheduler/tasks/python.py
+++ b/src/pydolphinscheduler/tasks/python.py
@@ -18,34 +18,85 @@
 """Task Python."""
 
 import inspect
+import logging
+import re
 import types
-from typing import Any
+from typing import Union
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSParamException
 
+log = logging.getLogger(__file__)
+
 
 class Python(Task):
-    """Task Python object, declare behavior for Python task to dolphinscheduler."""
+    """Task Python object, declare behavior for Python task to dolphinscheduler.
+
+    Python task support two types of parameters for :param:``code``, and here is an example:
+
+    Using str type of :param:``code``
+
+    .. code-block:: python
+
+        python_task = Python(name="str_type", code="print('Hello Python task.')")
+
+    Or using Python callable type of :param:``code``
+
+    .. code-block:: python
+
+        def foo():
+            print("Hello Python task.")
+
+        python_task = Python(name="str_type", code=foo)
+
+    :param name: The name for Python task. It define the task name.
+    :param definition: String format of Python script you want to execute or Python callable you
+        want to execute.
+    """
 
     _task_custom_attr = {
         "raw_script",
     }
 
-    def __init__(self, name: str, code: Any, *args, **kwargs):
+    def __init__(
+        self, name: str, definition: Union[str, types.FunctionType], *args, **kwargs
+    ):
         super().__init__(name, TaskType.PYTHON, *args, **kwargs)
-        self._code = code
+        self.definition = definition
+
+    def _build_exe_str(self) -> str:
+        """Build executable string from given definition.
+
+        Attribute ``self.definition`` almost is a function, we need to call this function after parsing it
+        to string. The easier way to call a function is using syntax ``func()`` and we use it to call it too.
+        """
+        if isinstance(self.definition, types.FunctionType):
+            py_function = inspect.getsource(self.definition)
+            func_str = f"{py_function}{self.definition.__name__}()"
+        else:
+            pattern = re.compile("^def (\\w+)\\(")
+            find = pattern.findall(self.definition)
+            if not find:
+                log.warning(
+                    "Python definition is simple script instead of function, with value %s",
+                    self.definition,
+                )
+                return self.definition
+            # Keep function str and function callable always have one blank line
+            func_str = (
+                f"{self.definition}{find[0]}()"
+                if self.definition.endswith("\n")
+                else f"{self.definition}\n{find[0]}()"
+            )
+        return func_str
 
     @property
     def raw_script(self) -> str:
         """Get python task define attribute `raw_script`."""
-        if isinstance(self._code, str):
-            return self._code
-        elif isinstance(self._code, types.FunctionType):
-            py_function = inspect.getsource(self._code)
-            return py_function
+        if isinstance(self.definition, (str, types.FunctionType)):
+            return self._build_exe_str()
         else:
             raise PyDSParamException(
-                "Parameter code do not support % for now.", type(self._code)
+                "Parameter definition do not support % for now.", type(self.definition)
             )
diff --git a/src/pydolphinscheduler/tasks/sql.py b/src/pydolphinscheduler/tasks/sql.py
index b5be3e4..a125982 100644
--- a/src/pydolphinscheduler/tasks/sql.py
+++ b/src/pydolphinscheduler/tasks/sql.py
@@ -17,6 +17,7 @@
 
 """Task sql."""
 
+import logging
 import re
 from typing import Dict, Optional
 
@@ -24,12 +25,14 @@ from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.database import Database
 from pydolphinscheduler.core.task import Task
 
+log = logging.getLogger(__file__)
+
 
 class SqlType:
     """SQL type, for now it just contain `SELECT` and `NO_SELECT`."""
 
-    SELECT = 0
-    NOT_SELECT = 1
+    SELECT = "0"
+    NOT_SELECT = "1"
 
 
 class Sql(Task):
@@ -61,6 +64,7 @@ class Sql(Task):
         name: str,
         datasource_name: str,
         sql: str,
+        sql_type: Optional[str] = None,
         pre_statements: Optional[str] = None,
         post_statements: Optional[str] = None,
         display_rows: Optional[int] = 10,
@@ -69,16 +73,32 @@ class Sql(Task):
     ):
         super().__init__(name, TaskType.SQL, *args, **kwargs)
         self.sql = sql
+        self.param_sql_type = sql_type
         self.datasource_name = datasource_name
         self.pre_statements = pre_statements or []
         self.post_statements = post_statements or []
         self.display_rows = display_rows
 
     @property
-    def sql_type(self) -> int:
-        """Judgement sql type, use regexp to check which type of the sql is."""
+    def sql_type(self) -> str:
+        """Judgement sql type, it will return the SQL type for type `SELECT` or `NOT_SELECT`.
+
+        If `param_sql_type` dot not specific, will use regexp to check
+        which type of the SQL is. But if `param_sql_type` is specific
+        will use the parameter overwrites the regexp way
+        """
+        if (
+            self.param_sql_type == SqlType.SELECT
+            or self.param_sql_type == SqlType.NOT_SELECT
+        ):
+            log.info(
+                "The sql type is specified by a parameter, with value %s",
+                self.param_sql_type,
+            )
+            return self.param_sql_type
         pattern_select_str = (
-            "^(?!(.* |)insert |(.* |)delete |(.* |)drop |(.* |)update |(.* |)alter ).*"
+            "^(?!(.* |)insert |(.* |)delete |(.* |)drop "
+            "|(.* |)update |(.* |)alter |(.* |)create ).*"
         )
         pattern_select = re.compile(pattern_select_str, re.IGNORECASE)
         if pattern_select.match(self.sql) is None:
diff --git a/src/pydolphinscheduler/tasks/switch.py b/src/pydolphinscheduler/tasks/switch.py
index 28032f8..0c9a2b8 100644
--- a/src/pydolphinscheduler/tasks/switch.py
+++ b/src/pydolphinscheduler/tasks/switch.py
@@ -129,7 +129,11 @@ class SwitchCondition(Base):
 
 
 class Switch(Task):
-    """Task switch object, declare behavior for switch task to dolphinscheduler."""
+    """Task switch object, declare behavior for switch task to dolphinscheduler.
+
+    Param of process definition or at least one local param of task must be set
+    if task `switch` in this workflow.
+    """
 
     def __init__(self, name: str, condition: SwitchCondition, *args, **kwargs):
         super().__init__(name, TaskType.SWITCH, *args, **kwargs)
diff --git a/src/pydolphinscheduler/utils/file.py b/src/pydolphinscheduler/utils/file.py
new file mode 100644
index 0000000..075b902
--- /dev/null
+++ b/src/pydolphinscheduler/utils/file.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""File util for pydolphinscheduler."""
+
+from pathlib import Path
+from typing import Optional
+
+
+def write(
+    content: str,
+    to_path: str,
+    create: Optional[bool] = True,
+    overwrite: Optional[bool] = False,
+) -> None:
+    """Write configs dict to configuration file.
+
+    :param content: The source string want to write to :param:`to_path`.
+    :param to_path: The path want to write content.
+    :param create: Whether create the file parent directory or not if it does not exist.
+      If set ``True`` will create file with :param:`to_path` if path not exists, otherwise
+      ``False`` will not create. Default ``True``.
+    :param overwrite: Whether overwrite the file or not if it exists. If set ``True``
+      will overwrite the exists content, otherwise ``False`` will not overwrite it. Default ``True``.
+    """
+    path = Path(to_path)
+    if not path.parent.exists():
+        if create:
+            path.parent.mkdir(parents=True)
+        else:
+            raise ValueError(
+                "Parent directory do not exists and set param `create` to `False`."
+            )
+    if not path.exists():
+        with path.open(mode="w") as f:
+            f.write(content)
+    elif overwrite:
+        with path.open(mode="w") as f:
+            f.write(content)
+    else:
+        raise FileExistsError(
+            "File %s already exists and you choose not overwrite mode.", to_path
+        )
diff --git a/src/pydolphinscheduler/utils/yaml_parser.py b/src/pydolphinscheduler/utils/yaml_parser.py
new file mode 100644
index 0000000..46ee08c
--- /dev/null
+++ b/src/pydolphinscheduler/utils/yaml_parser.py
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""YAML parser utils, parser yaml string to ``ruamel.yaml`` object and nested key dict."""
+
+import copy
+import io
+from typing import Any, Dict, Optional
+
+from ruamel.yaml import YAML
+from ruamel.yaml.comments import CommentedMap
+
+
+class YamlParser:
+    """A parser to parse Yaml file and provider easier way to access or change value.
+
+    This parser provider delimiter string key to get or set :class:`ruamel.yaml.YAML` object
+
+    For example, yaml config named ``test.yaml`` and its content as below:
+
+    .. code-block:: yaml
+
+        one:
+          two1:
+            three: value1
+          two2: value2
+
+    you could get ``value1`` and ``value2`` by nested path
+
+    .. code-block:: python
+
+        yaml_parser = YamlParser("test.yaml")
+
+        # Use function ``get`` to get value
+        value1 = yaml_parser.get("one.two1.three")
+        # Or use build-in ``__getitem__`` to get value
+        value2 = yaml_parser["one.two2"]
+
+    or you could change ``value1`` to ``value3``, also change ``value2`` to ``value4`` by nested path assigned
+
+    .. code-block:: python
+
+        yaml_parser["one.two1.three"] = "value3"
+        yaml_parser["one.two2"] = "value4"
+    """
+
+    def __init__(self, content: str, delimiter: Optional[str] = "."):
+        self._content = content
+        self.src_parser = content
+        self._delimiter = delimiter
+
+    @property
+    def src_parser(self) -> CommentedMap:
+        """Get src_parser property."""
+        return self._src_parser
+
+    @src_parser.setter
+    def src_parser(self, content: str) -> None:
+        """Set src_parser property."""
+        self._yaml = YAML()
+        self._src_parser = self._yaml.load(content)
+
+    def parse_nested_dict(
+        self, result: Dict, commented_map: CommentedMap, key: str
+    ) -> None:
+        """Parse :class:`ruamel.yaml.comments.CommentedMap` to nested dict using :param:`delimiter`."""
+        if not isinstance(commented_map, CommentedMap):
+            return
+        for sub_key in set(commented_map.keys()):
+            next_key = f"{key}{self._delimiter}{sub_key}"
+            result[next_key] = commented_map[sub_key]
+            self.parse_nested_dict(result, commented_map[sub_key], next_key)
+
+    @property
+    def dict_parser(self) -> Dict:
+        """Get :class:`CommentedMap` to nested dict using :param:`delimiter` as key delimiter.
+
+        Use Depth-First-Search get all nested key and value, and all key connect by :param:`delimiter`.
+        It make users could easier access or change :class:`CommentedMap` object.
+
+        For example, yaml config named ``test.yaml`` and its content as below:
+
+        .. code-block:: yaml
+
+            one:
+              two1:
+                three: value1
+              two2: value2
+
+        It could parser to nested dict as
+
+        .. code-block:: python
+
+            {
+                "one": ordereddict([('two1', ordereddict([('three', 'value1')])), ('two2', 'value2')]),
+                "one.two1": ordereddict([('three', 'value1')]),
+                "one.two1.three": "value1",
+                "one.two2": "value2",
+            }
+        """
+        res = dict()
+        src_parser_copy = copy.deepcopy(self.src_parser)
+
+        base_keys = set(src_parser_copy.keys())
+        if not base_keys:
+            return res
+        else:
+            for key in base_keys:
+                res[key] = src_parser_copy[key]
+                self.parse_nested_dict(res, src_parser_copy[key], key)
+            return res
+
+    def __contains__(self, key) -> bool:
+        return key in self.dict_parser
+
+    def __getitem__(self, key: str) -> Any:
+        return self.dict_parser[key]
+
+    def __setitem__(self, key: str, val: Any) -> None:
+        if key not in self.dict_parser:
+            raise KeyError("Key %s do not exists.", key)
+
+        mid = None
+        keys = key.split(self._delimiter)
+        for idx, k in enumerate(keys, 1):
+            if idx == len(keys):
+                mid[k] = val
+            else:
+                mid = mid[k] if mid else self.src_parser[k]
+
+    def get(self, key: str) -> Any:
+        """Get value by key, is call ``__getitem__``."""
+        return self[key]
+
+    def __str__(self) -> str:
+        """Transfer :class:`YamlParser` to string object.
+
+        It is useful when users want to output the :class:`YamlParser` object they change just now.
+        """
+        buf = io.StringIO()
+        self._yaml.dump(self.src_parser, buf)
+        return buf.getvalue()
+
+    def __repr__(self) -> str:
+        return f"YamlParser({str(self)})"
diff --git a/src/pydolphinscheduler/core/__init__.py b/tests/cli/__init__.py
similarity index 73%
copy from src/pydolphinscheduler/core/__init__.py
copy to tests/cli/__init__.py
index 31dc944..f1a4396 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/tests/cli/__init__.py
@@ -15,14 +15,4 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Init pydolphinscheduler.core package."""
-
-from pydolphinscheduler.core.database import Database
-from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.core.task import Task
-
-__all__ = [
-    "ProcessDefinition",
-    "Task",
-    "Database",
-]
+"""Init command line interface tests."""
diff --git a/tests/cli/test_config.py b/tests/cli/test_config.py
new file mode 100644
index 0000000..d913277
--- /dev/null
+++ b/tests/cli/test_config.py
@@ -0,0 +1,198 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test command line interface subcommand `config`."""
+
+import os
+from pathlib import Path
+
+import pytest
+
+from pydolphinscheduler.cli.commands import cli
+from pydolphinscheduler.core.configuration import BUILD_IN_CONFIG_PATH, config_path
+from tests.testing.cli import CliTestWrapper
+from tests.testing.constants import DEV_MODE, ENV_PYDS_HOME
+from tests.testing.file import get_file_content
+
+config_file = "config.yaml"
+
+
+@pytest.fixture
+def teardown_file_env():
+    """Util for deleting temp configuration file and pop env var after test finish."""
+    yield
+    config_file_path = config_path()
+    if config_file_path.exists():
+        config_file_path.unlink()
+    # pop environment variable to keep test cases dependent
+    os.environ.pop(ENV_PYDS_HOME, None)
+    assert ENV_PYDS_HOME not in os.environ
+
+
+@pytest.mark.parametrize(
+    "home",
+    [
+        None,
+        "/tmp/pydolphinscheduler",
+        "/tmp/test_abc",
+    ],
+)
+def test_config_init(teardown_file_env, home):
+    """Test command line interface `config --init`."""
+    if home:
+        os.environ[ENV_PYDS_HOME] = home
+    elif DEV_MODE:
+        pytest.skip(
+            "Avoid delete ~/pydolphinscheduler/config.yaml by accident when test locally."
+        )
+
+    config_file_path = config_path()
+    assert not config_file_path.exists()
+
+    cli_test = CliTestWrapper(cli, ["config", "--init"])
+    cli_test.assert_success()
+
+    assert config_file_path.exists()
+    assert get_file_content(config_file_path) == get_file_content(BUILD_IN_CONFIG_PATH)
+
+
+@pytest.mark.parametrize(
+    "key, expect",
+    [
+        # We test each key in one single section
+        ("java_gateway.address", "127.0.0.1"),
+        ("default.user.name", "userPythonGateway"),
+        ("default.workflow.project", "project-pydolphin"),
+    ],
+)
+def test_config_get(teardown_file_env, key: str, expect: str):
+    """Test command line interface `config --get XXX`."""
+    os.environ[ENV_PYDS_HOME] = "/tmp/pydolphinscheduler"
+    cli_test = CliTestWrapper(cli, ["config", "--init"])
+    cli_test.assert_success()
+
+    cli_test = CliTestWrapper(cli, ["config", "--get", key])
+    cli_test.assert_success(output=f"{key} = {expect}", fuzzy=True)
+
+
+@pytest.mark.parametrize(
+    "keys, expects",
+    [
+        # We test mix section keys
+        (("java_gateway.address", "java_gateway.port"), ("127.0.0.1", "25333")),
+        (
+            ("java_gateway.auto_convert", "default.user.tenant"),
+            ("True", "tenant_pydolphin"),
+        ),
+        (
+            (
+                "java_gateway.port",
+                "default.user.state",
+                "default.workflow.worker_group",
+            ),
+            ("25333", "1", "default"),
+        ),
+    ],
+)
+def test_config_get_multiple(teardown_file_env, keys: str, expects: str):
+    """Test command line interface `config --get KEY1 --get KEY2 ...`."""
+    os.environ[ENV_PYDS_HOME] = "/tmp/pydolphinscheduler"
+    cli_test = CliTestWrapper(cli, ["config", "--init"])
+    cli_test.assert_success()
+
+    get_args = ["config"]
+    for key in keys:
+        get_args.append("--get")
+        get_args.append(key)
+    cli_test = CliTestWrapper(cli, get_args)
+
+    for idx, expect in enumerate(expects):
+        cli_test.assert_success(output=f"{keys[idx]} = {expect}", fuzzy=True)
+
+
+@pytest.mark.parametrize(
+    "key, value",
+    [
+        # We test each key in one single section
+        ("java_gateway.address", "127.1.1.1"),
+        ("default.user.name", "editUserPythonGateway"),
+        ("default.workflow.project", "edit-project-pydolphin"),
+    ],
+)
+def test_config_set(teardown_file_env, key: str, value: str):
+    """Test command line interface `config --set KEY VALUE`."""
+    path = "/tmp/pydolphinscheduler"
+    assert not Path(path).joinpath(config_file).exists()
+    os.environ[ENV_PYDS_HOME] = path
+    cli_test = CliTestWrapper(cli, ["config", "--init"])
+    cli_test.assert_success()
+
+    # Make sure value do not exists first
+    cli_test = CliTestWrapper(cli, ["config", "--get", key])
+    assert f"{key} = {value}" not in cli_test.result.output
+
+    cli_test = CliTestWrapper(cli, ["config", "--set", key, value])
+    cli_test.assert_success()
+
+    cli_test = CliTestWrapper(cli, ["config", "--get", key])
+    assert f"{key} = {value}" in cli_test.result.output
+
+
+@pytest.mark.parametrize(
+    "keys, values",
+    [
+        # We test each key in mixture section
+        (("java_gateway.address", "java_gateway.port"), ("127.1.1.1", "25444")),
+        (
+            ("java_gateway.auto_convert", "default.user.tenant"),
+            ("False", "edit_tenant_pydolphin"),
+        ),
+        (
+            (
+                "java_gateway.port",
+                "default.user.state",
+                "default.workflow.worker_group",
+            ),
+            ("25555", "0", "not-default"),
+        ),
+    ],
+)
+def test_config_set_multiple(teardown_file_env, keys: str, values: str):
+    """Test command line interface `config --set KEY1 VAL1 --set KEY2 VAL2`."""
+    path = "/tmp/pydolphinscheduler"
+    assert not Path(path).joinpath(config_file).exists()
+    os.environ[ENV_PYDS_HOME] = path
+    cli_test = CliTestWrapper(cli, ["config", "--init"])
+    cli_test.assert_success()
+
+    set_args = ["config"]
+    for idx, key in enumerate(keys):
+        # Make sure values do not exists first
+        cli_test = CliTestWrapper(cli, ["config", "--get", key])
+        assert f"{key} = {values[idx]}" not in cli_test.result.output
+
+        set_args.append("--set")
+        set_args.append(key)
+        set_args.append(values[idx])
+
+    cli_test = CliTestWrapper(cli, set_args)
+    cli_test.assert_success()
+
+    for idx, key in enumerate(keys):
+        # Make sure values exists after `config --set` run
+        cli_test = CliTestWrapper(cli, ["config", "--get", key])
+        assert f"{key} = {values[idx]}" in cli_test.result.output
diff --git a/tests/cli/test_version.py b/tests/cli/test_version.py
new file mode 100644
index 0000000..f0dcb0e
--- /dev/null
+++ b/tests/cli/test_version.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test command line interface subcommand `version`."""
+
+import pytest
+
+from pydolphinscheduler import __version__
+from pydolphinscheduler.cli.commands import cli
+from tests.testing.cli import CliTestWrapper
+
+
+def test_version():
+    """Test whether subcommand `version` correct."""
+    cli_test = CliTestWrapper(cli, ["version"])
+    cli_test.assert_success(output=f"{__version__}")
+
+
+@pytest.mark.parametrize(
+    "part, idx",
+    [
+        ("major", 0),
+        ("minor", 1),
+        ("micro", 2),
+    ],
+)
+def test_version_part(part: str, idx: int):
+    """Test subcommand `version` option `--part`."""
+    cli_test = CliTestWrapper(cli, ["version", "--part", part])
+    cli_test.assert_success(output=f"{__version__.split('.')[idx]}")
+
+
+@pytest.mark.parametrize(
+    "option, output",
+    [
+        # not support option
+        (["version", "--not-support"], "No such option"),
+        # not support option value
+        (["version", "--part", "abc"], "Invalid value for '--part'"),
+    ],
+)
+def test_version_not_support_option(option, output):
+    """Test subcommand `version` not support option or option value."""
+    cli_test = CliTestWrapper(cli, option)
+    cli_test.assert_fail(ret_code=2, output=output, fuzzy=True)
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
new file mode 100644
index 0000000..c7e217a
--- /dev/null
+++ b/tests/core/test_configuration.py
@@ -0,0 +1,272 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test class :mod:`pydolphinscheduler.core.configuration`' method."""
+
+import importlib
+import os
+from pathlib import Path
+from typing import Any
+
+import pytest
+
+from pydolphinscheduler.core import configuration
+from pydolphinscheduler.core.configuration import (
+    BUILD_IN_CONFIG_PATH,
+    config_path,
+    get_single_config,
+    set_single_config,
+)
+from pydolphinscheduler.exceptions import PyDSConfException
+from pydolphinscheduler.utils.yaml_parser import YamlParser
+from tests.testing.constants import DEV_MODE, ENV_PYDS_HOME
+from tests.testing.file import get_file_content
+
+
+@pytest.fixture
+def teardown_file_env():
+    """Util for deleting temp configuration file and pop env var after test finish."""
+    yield
+    config_file_path = config_path()
+    if config_file_path.exists():
+        config_file_path.unlink()
+    os.environ.pop(ENV_PYDS_HOME, None)
+
+
+@pytest.mark.parametrize(
+    "val, expect",
+    [
+        ("1", 1),
+        ("123", 123),
+        ("4567", 4567),
+        (b"1234", 1234),
+    ],
+)
+def test_get_int(val: Any, expect: int):
+    """Test function :func:`configuration.get_int`."""
+    assert configuration.get_int(val) == expect
+
+
+@pytest.mark.parametrize(
+    "val",
+    [
+        "a",
+        "1a",
+        "1d2",
+        "1723-",
+    ],
+)
+def test_get_int_error(val: Any):
+    """Test function :func:`configuration.get_int`."""
+    with pytest.raises(ValueError):
+        configuration.get_int(val)
+
+
+@pytest.mark.parametrize(
+    "val, expect",
+    [
+        ("t", True),
+        ("true", True),
+        (1, True),
+        (True, True),
+        ("f", False),
+        ("false", False),
+        (0, False),
+        (123, False),
+        ("abc", False),
+        ("abc1", False),
+        (False, False),
+    ],
+)
+def test_get_bool(val: Any, expect: bool):
+    """Test function :func:`configuration.get_bool`."""
+    assert configuration.get_bool(val) == expect
+
+
+@pytest.mark.parametrize(
+    "home, expect",
+    [
+        (None, "~/pydolphinscheduler/config.yaml"),
+        ("/tmp/pydolphinscheduler", "/tmp/pydolphinscheduler/config.yaml"),
+        ("/tmp/test_abc", "/tmp/test_abc/config.yaml"),
+    ],
+)
+def test_config_path(home: Any, expect: str):
+    """Test function :func:`config_path`."""
+    if home:
+        os.environ[ENV_PYDS_HOME] = home
+    assert Path(expect).expanduser() == configuration.config_path()
+
+
+@pytest.mark.parametrize(
+    "home",
+    [
+        None,
+        "/tmp/pydolphinscheduler",
+        "/tmp/test_abc",
+    ],
+)
+def test_init_config_file(teardown_file_env, home: Any):
+    """Test init config file."""
+    if home:
+        os.environ[ENV_PYDS_HOME] = home
+    elif DEV_MODE:
+        pytest.skip(
+            "Avoid delete ~/pydolphinscheduler/config.yaml by accident when test locally."
+        )
+    assert not config_path().exists()
+    configuration.init_config_file()
+    assert config_path().exists()
+
+    assert get_file_content(config_path()) == get_file_content(BUILD_IN_CONFIG_PATH)
+
+
+@pytest.mark.parametrize(
+    "home",
+    [
+        None,
+        "/tmp/pydolphinscheduler",
+        "/tmp/test_abc",
+    ],
+)
+def test_init_config_file_duplicate(teardown_file_env, home: Any):
+    """Test raise error with init config file which already exists."""
+    if home:
+        os.environ[ENV_PYDS_HOME] = home
+    elif DEV_MODE:
+        pytest.skip(
+            "Avoid delete ~/pydolphinscheduler/config.yaml by accident when test locally."
+        )
+    assert not config_path().exists()
+    configuration.init_config_file()
+    assert config_path().exists()
+
+    with pytest.raises(PyDSConfException, match=".*file already exists.*"):
+        configuration.init_config_file()
+
+
+def test_get_configs_build_in():
+    """Test function :func:`get_configs` with build-in config file."""
+    content = get_file_content(BUILD_IN_CONFIG_PATH)
+    assert YamlParser(content).src_parser == configuration.get_configs().src_parser
+    assert YamlParser(content).dict_parser == configuration.get_configs().dict_parser
+
+
+@pytest.mark.parametrize(
+    "key, val, new_val",
+    [
+        ("java_gateway.address", "127.0.0.1", "127.1.1.1"),
+        ("java_gateway.port", 25333, 25555),
+        ("java_gateway.auto_convert", True, False),
+        ("default.user.name", "userPythonGateway", "editUserPythonGateway"),
+        ("default.user.password", "userPythonGateway", "editUserPythonGateway"),
+        (
+            "default.user.email",
+            "userPythonGateway@dolphinscheduler.com",
+            "userPythonGateway@edit.com",
+        ),
+        ("default.user.phone", 11111111111, 22222222222),
+        ("default.user.state", 1, 0),
+        ("default.workflow.project", "project-pydolphin", "eidt-project-pydolphin"),
+        ("default.workflow.tenant", "tenant_pydolphin", "edit_tenant_pydolphin"),
+        ("default.workflow.user", "userPythonGateway", "editUserPythonGateway"),
+        ("default.workflow.queue", "queuePythonGateway", "editQueuePythonGateway"),
+        ("default.workflow.worker_group", "default", "specific"),
+        ("default.workflow.time_zone", "Asia/Shanghai", "Asia/Beijing"),
+        ("default.workflow.warning_type", "NONE", "ALL"),
+    ],
+)
+def test_single_config_get_set(teardown_file_env, key: str, val: Any, new_val: Any):
+    """Test function :func:`get_single_config` and :func:`set_single_config`."""
+    assert val == get_single_config(key)
+    set_single_config(key, new_val)
+    assert new_val == get_single_config(key)
+
+
+def test_single_config_get_set_not_exists_key():
+    """Test function :func:`get_single_config` and :func:`set_single_config` error while key not exists."""
+    not_exists_key = "i_am_not_exists_key"
+    with pytest.raises(PyDSConfException, match=".*do not exists.*"):
+        get_single_config(not_exists_key)
+    with pytest.raises(PyDSConfException, match=".*do not exists.*"):
+        set_single_config(not_exists_key, not_exists_key)
+
+
+@pytest.mark.parametrize(
+    "config_name, expect",
+    [
+        ("JAVA_GATEWAY_ADDRESS", "127.0.0.1"),
+        ("JAVA_GATEWAY_PORT", 25333),
+        ("JAVA_GATEWAY_AUTO_CONVERT", True),
+        ("USER_NAME", "userPythonGateway"),
+        ("USER_PASSWORD", "userPythonGateway"),
+        ("USER_EMAIL", "userPythonGateway@dolphinscheduler.com"),
+        ("USER_PHONE", "11111111111"),
+        ("USER_STATE", 1),
+        ("WORKFLOW_PROJECT", "project-pydolphin"),
+        ("WORKFLOW_TENANT", "tenant_pydolphin"),
+        ("WORKFLOW_USER", "userPythonGateway"),
+        ("WORKFLOW_QUEUE", "queuePythonGateway"),
+        ("WORKFLOW_WORKER_GROUP", "default"),
+        ("WORKFLOW_TIME_ZONE", "Asia/Shanghai"),
+        ("WORKFLOW_WARNING_TYPE", "NONE"),
+    ],
+)
+def test_get_configuration(config_name: str, expect: Any):
+    """Test get exists attribute in :mod:`configuration`."""
+    assert expect == getattr(configuration, config_name)
+
+
+@pytest.mark.parametrize(
+    "config_name, src, dest",
+    [
+        ("JAVA_GATEWAY_ADDRESS", "127.0.0.1", "192.168.1.1"),
+        ("JAVA_GATEWAY_PORT", 25333, 25334),
+        ("JAVA_GATEWAY_AUTO_CONVERT", True, False),
+        ("USER_NAME", "userPythonGateway", "envUserPythonGateway"),
+        ("USER_PASSWORD", "userPythonGateway", "envUserPythonGateway"),
+        (
+            "USER_EMAIL",
+            "userPythonGateway@dolphinscheduler.com",
+            "userPythonGateway@dolphinscheduler.com",
+        ),
+        ("USER_PHONE", "11111111111", "22222222222"),
+        ("USER_STATE", 1, 0),
+        ("WORKFLOW_PROJECT", "project-pydolphin", "env-project-pydolphin"),
+        ("WORKFLOW_TENANT", "tenant_pydolphin", "env-tenant_pydolphin"),
+        ("WORKFLOW_USER", "userPythonGateway", "envUserPythonGateway"),
+        ("WORKFLOW_QUEUE", "queuePythonGateway", "envQueuePythonGateway"),
+        ("WORKFLOW_WORKER_GROUP", "default", "custom"),
+        ("WORKFLOW_TIME_ZONE", "Asia/Shanghai", "America/Los_Angeles"),
+        ("WORKFLOW_WARNING_TYPE", "NONE", "ALL"),
+    ],
+)
+def test_get_configuration_env(config_name: str, src: Any, dest: Any):
+    """Test get exists attribute from environment variable in :mod:`configuration`."""
+    assert getattr(configuration, config_name) == src
+
+    env_name = f"PYDS_{config_name}"
+    os.environ[env_name] = str(dest)
+    # reload module configuration to re-get config from environment.
+    importlib.reload(configuration)
+    assert getattr(configuration, config_name) == dest
+
+    # pop and reload configuration to test whether this config equal to `src` value
+    os.environ.pop(env_name, None)
+    importlib.reload(configuration)
+    assert getattr(configuration, config_name) == src
+    assert env_name not in os.environ
diff --git a/tests/testing/task.py b/tests/core/test_default_config_yaml.py
similarity index 50%
copy from tests/testing/task.py
copy to tests/core/test_default_config_yaml.py
index e0affc9..b4d5e07 100644
--- a/tests/testing/task.py
+++ b/tests/core/test_default_config_yaml.py
@@ -15,18 +15,25 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Mock class Task for other test."""
+"""Test default config file."""
 
-import uuid
+from ruamel.yaml import YAML
+from ruamel.yaml.comments import CommentedMap
 
-from pydolphinscheduler.core.task import Task as SourceTask
+from tests.testing.path import path_default_config_yaml
 
 
-class Task(SourceTask):
-    """Mock class :class:`pydolphinscheduler.core.task.Task` for unittest."""
+def nested_key_check(comment_map: CommentedMap) -> None:
+    """Test whether default configuration file exists specific character."""
+    for key, val in comment_map.items():
+        assert "." not in key, f"There is not allowed special character in key `{key}`."
+        if isinstance(val, CommentedMap):
+            nested_key_check(val)
 
-    DEFAULT_VERSION = 1
 
-    def gen_code_and_version(self):
-        """Mock java gateway code and version, convenience method for unittest."""
-        return uuid.uuid1().time, self.DEFAULT_VERSION
+def test_key_without_dot_delimiter():
+    """Test wrapper of whether default configuration file exists specific character."""
+    yaml = YAML()
+    with open(path_default_config_yaml, "r") as f:
+        comment_map = yaml.load(f.read())
+        nested_key_check(comment_map)
diff --git a/tests/core/test_process_definition.py b/tests/core/test_process_definition.py
index f51338d..5cb6dab 100644
--- a/tests/core/test_process_definition.py
+++ b/tests/core/test_process_definition.py
@@ -18,17 +18,15 @@
 """Test process definition."""
 
 from datetime import datetime
-from typing import Any
+from typing import Any, List
 from unittest.mock import patch
 
 import pytest
 from freezegun import freeze_time
 
-from pydolphinscheduler.constants import (
-    ProcessDefinitionDefault,
-    ProcessDefinitionReleaseState,
-)
+from pydolphinscheduler.core import configuration
 from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.core.resource import Resource
 from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.side import Project, Tenant, User
 from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition
@@ -51,23 +49,25 @@ def test_process_definition_key_attr(func):
 @pytest.mark.parametrize(
     "name,value",
     [
-        ("timezone", ProcessDefinitionDefault.TIME_ZONE),
-        ("project", Project(ProcessDefinitionDefault.PROJECT)),
-        ("tenant", Tenant(ProcessDefinitionDefault.TENANT)),
+        ("timezone", configuration.WORKFLOW_TIME_ZONE),
+        ("project", Project(configuration.WORKFLOW_PROJECT)),
+        ("tenant", Tenant(configuration.WORKFLOW_TENANT)),
         (
             "user",
             User(
-                ProcessDefinitionDefault.USER,
-                ProcessDefinitionDefault.USER_PWD,
-                ProcessDefinitionDefault.USER_EMAIL,
-                ProcessDefinitionDefault.USER_PHONE,
-                ProcessDefinitionDefault.TENANT,
-                ProcessDefinitionDefault.QUEUE,
-                ProcessDefinitionDefault.USER_STATE,
+                configuration.USER_NAME,
+                configuration.USER_PASSWORD,
+                configuration.USER_EMAIL,
+                configuration.USER_PHONE,
+                configuration.WORKFLOW_TENANT,
+                configuration.WORKFLOW_QUEUE,
+                configuration.USER_STATE,
             ),
         ),
-        ("worker_group", ProcessDefinitionDefault.WORKER_GROUP),
-        ("release_state", ProcessDefinitionReleaseState.ONLINE),
+        ("worker_group", configuration.WORKFLOW_WORKER_GROUP),
+        ("warning_type", configuration.WORKFLOW_WARNING_TYPE),
+        ("warning_group_id", 0),
+        ("release_state", 1),
     ],
 )
 def test_process_definition_default_value(name, value):
@@ -87,9 +87,15 @@ def test_process_definition_default_value(name, value):
         ("schedule", str, "schedule"),
         ("timezone", str, "timezone"),
         ("worker_group", str, "worker_group"),
+        ("warning_type", str, "FAILURE"),
+        ("warning_group_id", int, 1),
         ("timeout", int, 1),
-        ("release_state", str, "OFFLINE"),
         ("param", dict, {"key": "value"}),
+        (
+            "resource_list",
+            List,
+            [Resource(name="/dev/test.py", content="hello world", description="desc")],
+        ),
     ],
 )
 def test_set_attr(name, cls, expect):
@@ -101,6 +107,41 @@ def test_set_attr(name, cls, expect):
         ), f"ProcessDefinition set attribute `{name}` do not work expect"
 
 
+@pytest.mark.parametrize(
+    "value,expect",
+    [
+        ("online", 1),
+        ("offline", 0),
+    ],
+)
+def test_set_release_state(value, expect):
+    """Test process definition set release_state attributes."""
+    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value) as pd:
+        assert (
+            getattr(pd, "release_state") == expect
+        ), "ProcessDefinition set attribute release_state do not return expect value."
+
+
+@pytest.mark.parametrize(
+    "value",
+    [
+        "oneline",
+        "offeline",
+        1,
+        0,
+        None,
+    ],
+)
+def test_set_release_state_error(value):
+    """Test process definition set release_state attributes with error."""
+    pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, release_state=value)
+    with pytest.raises(
+        PyDSParamException,
+        match="Parameter release_state only support `online` or `offline` but get.*",
+    ):
+        pd.release_state
+
+
 @pytest.mark.parametrize(
     "set_attr,set_val,get_attr,get_val",
     [
@@ -154,6 +195,21 @@ def test__parse_datetime_not_support_type(val: Any):
             pd._parse_datetime(val)
 
 
+@pytest.mark.parametrize(
+    "val",
+    [
+        "ALLL",
+        "nonee",
+    ],
+)
+def test_warn_type_not_support_type(val: str):
+    """Test process definition param warning_type not support type error."""
+    with pytest.raises(
+        PyDSParamException, match="Parameter `warning_type` with unexpect value.*?"
+    ):
+        ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, warning_type=val)
+
+
 @pytest.mark.parametrize(
     "param, expect",
     [
@@ -223,25 +279,55 @@ def test__pre_submit_check_switch_without_param(mock_code_version):
         parent >> switch
         with pytest.raises(
             PyDSParamException,
-            match="Parameter param must be provider if task Switch in process definition.",
+            match="Parameter param or at least one local_param of task must "
+            "be provider if task Switch in process definition.",
         ):
             pd._pre_submit_check()
 
 
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+def test__pre_submit_check_switch_with_local_params(mock_code_version):
+    """Test :func:`_pre_submit_check` if process definition with switch with local params of task."""
+    with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
+        parent = Task(
+            name="parent",
+            task_type=TEST_TASK_TYPE,
+            local_params=[
+                {"prop": "var", "direct": "OUT", "type": "VARCHAR", "value": ""}
+            ],
+        )
+        switch_child_1 = Task(name="switch_child_1", task_type=TEST_TASK_TYPE)
+        switch_child_2 = Task(name="switch_child_2", task_type=TEST_TASK_TYPE)
+        switch_condition = SwitchCondition(
+            Branch(condition="${var} > 1", task=switch_child_1),
+            Default(task=switch_child_2),
+        )
+
+        switch = Switch(name="switch", condition=switch_condition)
+        parent >> switch
+        pd._pre_submit_check()
+
+
 def test_process_definition_get_define_without_task():
     """Test process definition function get_define without task."""
     expect = {
         "name": TEST_PROCESS_DEFINITION_NAME,
         "description": None,
-        "project": ProcessDefinitionDefault.PROJECT,
-        "tenant": ProcessDefinitionDefault.TENANT,
-        "workerGroup": ProcessDefinitionDefault.WORKER_GROUP,
+        "project": configuration.WORKFLOW_PROJECT,
+        "tenant": configuration.WORKFLOW_TENANT,
+        "workerGroup": configuration.WORKFLOW_WORKER_GROUP,
+        "warningType": configuration.WORKFLOW_WARNING_TYPE,
+        "warningGroupId": 0,
         "timeout": 0,
-        "releaseState": ProcessDefinitionReleaseState.ONLINE,
+        "releaseState": 1,
         "param": None,
         "tasks": {},
         "taskDefinitionJson": [{}],
         "taskRelationJson": [{}],
+        "resourceList": [],
     }
     with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
         assert pd.get_define() == expect
@@ -311,15 +397,12 @@ def test_process_definition_simple_separate():
     "user_attrs",
     [
         {"tenant": "tenant_specific"},
-        {"queue": "queue_specific"},
-        {"tenant": "tenant_specific", "queue": "queue_specific"},
     ],
 )
 def test_set_process_definition_user_attr(user_attrs):
     """Test user with correct attributes if we specific assigned to process definition object."""
     default_value = {
-        "tenant": ProcessDefinitionDefault.TENANT,
-        "queue": ProcessDefinitionDefault.QUEUE,
+        "tenant": configuration.WORKFLOW_TENANT,
     }
     with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd:
         user = pd.user
@@ -407,13 +490,13 @@ def test_schedule_json_start_and_end_time(start_time, end_time, expect_date):
         "crontab": schedule,
         "startTime": expect_date["start_time"],
         "endTime": expect_date["end_time"],
-        "timezoneId": ProcessDefinitionDefault.TIME_ZONE,
+        "timezoneId": configuration.WORKFLOW_TIME_ZONE,
     }
     with ProcessDefinition(
         TEST_PROCESS_DEFINITION_NAME,
         schedule=schedule,
         start_time=start_time,
         end_time=end_time,
-        timezone=ProcessDefinitionDefault.TIME_ZONE,
+        timezone=configuration.WORKFLOW_TIME_ZONE,
     ) as pd:
         assert pd.schedule_json == expect
diff --git a/tests/testing/task.py b/tests/core/test_resource_definition.py
similarity index 59%
copy from tests/testing/task.py
copy to tests/core/test_resource_definition.py
index e0affc9..ebfb893 100644
--- a/tests/testing/task.py
+++ b/tests/core/test_resource_definition.py
@@ -15,18 +15,24 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Mock class Task for other test."""
+"""Test resource definition."""
 
-import uuid
+from pydolphinscheduler.core.resource import Resource
 
-from pydolphinscheduler.core.task import Task as SourceTask
 
-
-class Task(SourceTask):
-    """Mock class :class:`pydolphinscheduler.core.task.Task` for unittest."""
-
-    DEFAULT_VERSION = 1
-
-    def gen_code_and_version(self):
-        """Mock java gateway code and version, convenience method for unittest."""
-        return uuid.uuid1().time, self.DEFAULT_VERSION
+def test_resource():
+    """Test resource set attributes which get with same type."""
+    name = "/dev/test.py"
+    content = """print("hello world")"""
+    description = "hello world"
+    expect = {
+        "name": name,
+        "content": content,
+        "description": description,
+    }
+    resourceDefinition = Resource(
+        name=name,
+        content=content,
+        description=description,
+    )
+    assert resourceDefinition.get_define() == expect
diff --git a/tests/core/test_task.py b/tests/core/test_task.py
index 6af731b..65555c1 100644
--- a/tests/core/test_task.py
+++ b/tests/core/test_task.py
@@ -16,13 +16,16 @@
 # under the License.
 
 """Test Task class function."""
-
+import logging
+import re
 from unittest.mock import patch
 
 import pytest
 
+from pydolphinscheduler.core.process_definition import ProcessDefinition
 from pydolphinscheduler.core.task import Task, TaskRelation
 from tests.testing.task import Task as testTask
+from tests.testing.task import TaskWithCode
 
 TEST_TASK_RELATION_SET = set()
 TEST_TASK_RELATION_SIZE = 0
@@ -51,7 +54,7 @@ TEST_TASK_RELATION_SIZE = 0
             },
             {
                 "localParams": ["foo", "bar"],
-                "resourceList": ["foo", "bar"],
+                "resourceList": [{"id": 1}],
                 "dependence": {"foo", "bar"},
                 "waitStartTimeout": {"foo", "bar"},
                 "conditionResult": {"foo": ["bar"]},
@@ -59,7 +62,11 @@ TEST_TASK_RELATION_SIZE = 0
         ),
     ],
 )
-def test_property_task_params(attr, expect):
+@patch(
+    "pydolphinscheduler.core.task.Task.query_resource",
+    return_value=({"id": 1, "name": "foo"}),
+)
+def test_property_task_params(mock_resource, attr, expect):
     """Test class task property."""
     task = testTask(
         "test-property-task-params",
@@ -222,3 +229,50 @@ def test_tasks_list_shift(dep_expr: str, flag: str):
 
     assert all([1 == len(getattr(t, reverse_direction_attr)) for t in tasks])
     assert all([task.code in getattr(t, reverse_direction_attr) for t in tasks])
+
+
+def test_add_duplicate(caplog):
+    """Test add task which code already in process definition."""
+    with ProcessDefinition("test_add_duplicate_workflow") as _:
+        TaskWithCode(name="test_task_1", task_type="test", code=123, version=1)
+        with caplog.at_level(logging.WARNING):
+            TaskWithCode(
+                name="test_task_duplicate_code", task_type="test", code=123, version=2
+            )
+        assert all(
+            [
+                caplog.text.startswith("WARNING  pydolphinscheduler"),
+                re.findall("already in process definition", caplog.text),
+            ]
+        )
+
+
+@pytest.mark.parametrize(
+    "resources, expect",
+    [
+        (
+            ["/dev/test.py"],
+            [{"id": 1}],
+        ),
+        (
+            ["/dev/test.py", {"id": 2}],
+            [{"id": 1}, {"id": 2}],
+        ),
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.query_resource",
+    return_value=({"id": 1, "name": "/dev/test.py"}),
+)
+def test_python_resource_list(mock_code_version, mock_resource, resources, expect):
+    """Test python task resource list."""
+    task = Task(
+        name="python_resource_list.",
+        task_type="PYTHON",
+        resource_list=resources,
+    )
+    assert task.resource_list == expect
diff --git a/tests/example/test_example.py b/tests/example/test_example.py
index 5bf897f..70f3677 100644
--- a/tests/example/test_example.py
+++ b/tests/example/test_example.py
@@ -44,7 +44,7 @@ def test_task_without_example():
     Avoiding add new type of tasks but without adding example describe how to use it.
     """
     # We use example/tutorial.py as shell task example
-    ignore_name = {"__init__.py", "shell.py"}
+    ignore_name = {"__init__.py", "shell.py", "func_wrap.py"}
     all_tasks = {task.stem for task in get_tasks(ignore_name=ignore_name)}
 
     have_example_tasks = set()
@@ -97,7 +97,7 @@ def test_example_basic():
         ), f"We expect all examples is python script, but get {ex.name}."
 
         # All except tutorial and __init__ is end with keyword "_example"
-        if ex.stem != "tutorial" and ex.stem != "__init__":
+        if ex.stem not in ("tutorial", "tutorial_decorator") and ex.stem != "__init__":
             assert ex.stem.endswith(
                 "_example"
             ), f"We expect all examples script end with keyword '_example', but get {ex.stem}."
diff --git a/src/pydolphinscheduler/core/__init__.py b/tests/integration/__init__.py
similarity index 73%
copy from src/pydolphinscheduler/core/__init__.py
copy to tests/integration/__init__.py
index 31dc944..65625a9 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/tests/integration/__init__.py
@@ -15,14 +15,4 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Init pydolphinscheduler.core package."""
-
-from pydolphinscheduler.core.database import Database
-from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.core.task import Task
-
-__all__ = [
-    "ProcessDefinition",
-    "Task",
-    "Database",
-]
+"""Test integration between Python API and PythonGatewayService."""
diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py
new file mode 100644
index 0000000..a9cd352
--- /dev/null
+++ b/tests/integration/conftest.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""py.test conftest.py file for package integration test."""
+
+import pytest
+
+from tests.testing.docker_wrapper import DockerWrapper
+
+
+@pytest.fixture(scope="package", autouse=True)
+def docker_setup_teardown():
+    """Fixture for whole package tests, Set up and teardown docker env.
+
+    Fixture in file named ``conftest.py`` with ``scope=package`` could be auto import in the
+    whole package, and with attribute ``autouse=True`` will be auto-use for each test cases.
+
+    .. seealso::
+        For more information about conftest.py see:
+        https://docs.pytest.org/en/latest/example/simple.html#package-directory-level-fixtures-setups
+    """
+    docker_wrapper = DockerWrapper(
+        image="apache/dolphinscheduler-standalone-server:ci",
+        container_name="ci-dolphinscheduler-standalone-server",
+    )
+    ports = {"25333/tcp": 25333}
+    container = docker_wrapper.run_until_log(
+        log="Started StandaloneServer in", tty=True, ports=ports
+    )
+    assert container is not None
+    yield
+    docker_wrapper.remove_container()
diff --git a/tests/integration/test_java_gateway.py b/tests/integration/test_java_gateway.py
new file mode 100644
index 0000000..8b7c5ff
--- /dev/null
+++ b/tests/integration/test_java_gateway.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test pydolphinscheduler java gateway."""
+
+
+from py4j.java_gateway import JavaGateway, java_import
+
+
+def test_gateway_connect():
+    """Test weather client could connect java gate way or not."""
+    gateway = JavaGateway()
+    app = gateway.entry_point
+    assert app.ping() == "PONG"
+
+
+def test_jvm_simple():
+    """Test use JVM build-in object and operator from java gateway."""
+    gateway = JavaGateway()
+    smallest = gateway.jvm.java.lang.Integer.MIN_VALUE
+    biggest = gateway.jvm.java.lang.Integer.MAX_VALUE
+    assert smallest is not None and biggest is not None
+    assert biggest > smallest
+
+
+def test_python_client_java_import_single():
+    """Test import single class from java gateway."""
+    gateway = JavaGateway()
+    java_import(gateway.jvm, "org.apache.dolphinscheduler.common.utils.FileUtils")
+    assert hasattr(gateway.jvm, "FileUtils")
+
+
+def test_python_client_java_import_package():
+    """Test import package contain multiple class from java gateway."""
+    gateway = JavaGateway()
+    java_import(gateway.jvm, "org.apache.dolphinscheduler.common.utils.*")
+    # test if jvm view have some common utils
+    for util in ("FileUtils", "OSUtils", "DateUtils"):
+        assert hasattr(gateway.jvm, util)
diff --git a/src/pydolphinscheduler/core/__init__.py b/tests/integration/test_process_definition.py
similarity index 50%
copy from src/pydolphinscheduler/core/__init__.py
copy to tests/integration/test_process_definition.py
index 31dc944..1672bde 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/tests/integration/test_process_definition.py
@@ -15,14 +15,36 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Init pydolphinscheduler.core package."""
+"""Test process definition in integration."""
+
+from typing import Dict
+
+import pytest
 
-from pydolphinscheduler.core.database import Database
 from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.core.task import Task
+from pydolphinscheduler.tasks.shell import Shell
+
+PROCESS_DEFINITION_NAME = "test_change_exists_attr_pd"
+TASK_NAME = f"task_{PROCESS_DEFINITION_NAME}"
+
 
-__all__ = [
-    "ProcessDefinition",
-    "Task",
-    "Database",
-]
+@pytest.mark.parametrize(
+    "pre, post",
+    [
+        (
+            {
+                "user": "pre_user",
+            },
+            {
+                "user": "post_user",
+            },
+        )
+    ],
+)
+def test_change_process_definition_attr(pre: Dict, post: Dict):
+    """Test whether process definition success when specific attribute change."""
+    assert pre.keys() == post.keys(), "Not equal keys for pre and post attribute."
+    for attrs in [pre, post]:
+        with ProcessDefinition(name=PROCESS_DEFINITION_NAME, **attrs) as pd:
+            Shell(name=TASK_NAME, command="echo 1")
+            pd.submit()
diff --git a/tests/integration/test_submit_examples.py b/tests/integration/test_submit_examples.py
new file mode 100644
index 0000000..393b0cc
--- /dev/null
+++ b/tests/integration/test_submit_examples.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test whether success submit examples DAG to PythonGatewayService."""
+
+import subprocess
+from pathlib import Path
+
+import pytest
+
+from tests.testing.constants import ignore_exec_examples
+from tests.testing.path import path_example
+
+
+@pytest.mark.parametrize(
+    "example_path",
+    [
+        path
+        for path in path_example.iterdir()
+        if path.is_file() and path.stem not in ignore_exec_examples
+    ],
+)
+def test_exec_white_list_example(example_path: Path):
+    """Test execute examples and submit DAG to PythonGatewayService."""
+    try:
+        # Because our task decorator used module ``inspect`` to get the source, and it will
+        # raise IOError when call it by built-in function ``exec``, so we change to ``subprocess.check_call``
+        subprocess.check_call(["python", str(example_path)])
+    except subprocess.CalledProcessError:
+        raise RuntimeError("Run example %s failed.", example_path.stem)
+
+
+def test_exec_multiple_times():
+    """Test whether process definition can be executed more than one times."""
+    tutorial_path = path_example.joinpath("tutorial.py")
+    time = 0
+    while time < 3:
+        try:
+            subprocess.check_call(["python", str(tutorial_path)])
+        except subprocess.CalledProcessError:
+            raise RuntimeError("Run example %s failed.", tutorial_path.stem)
+        time += 1
diff --git a/tests/tasks/test_func_wrap.py b/tests/tasks/test_func_wrap.py
new file mode 100644
index 0000000..628b6e7
--- /dev/null
+++ b/tests/tasks/test_func_wrap.py
@@ -0,0 +1,169 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test module about function wrap task decorator."""
+
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.tasks.func_wrap import task
+from tests.testing.decorator import foo as foo_decorator
+from tests.testing.task import Task
+
+PD_NAME = "test_process_definition"
+TASK_NAME = "test_task"
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
+)
+def test_single_task_outside(mock_code):
+    """Test single decorator task which outside process definition."""
+
+    @task
+    def foo():
+        print(TASK_NAME)
+
+    with ProcessDefinition(PD_NAME) as pd:
+        foo()
+
+    assert pd is not None and pd.name == PD_NAME
+    assert len(pd.tasks) == 1
+
+    pd_task = pd.tasks[12345]
+    assert pd_task.name == "foo"
+    assert pd_task.raw_script == "def foo():\n    print(TASK_NAME)\nfoo()"
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
+)
+def test_single_task_inside(mock_code):
+    """Test single decorator task which inside process definition."""
+    with ProcessDefinition(PD_NAME) as pd:
+
+        @task
+        def foo():
+            print(TASK_NAME)
+
+        foo()
+
+    assert pd is not None and pd.name == PD_NAME
+    assert len(pd.tasks) == 1
+
+    pd_task = pd.tasks[12345]
+    assert pd_task.name == "foo"
+    assert pd_task.raw_script == "def foo():\n    print(TASK_NAME)\nfoo()"
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
+)
+def test_addition_decorator_error(mock_code):
+    """Test error when using task decorator to a function already have decorator."""
+
+    @task
+    @foo_decorator
+    def foo():
+        print(TASK_NAME)
+
+    with ProcessDefinition(PD_NAME) as pd:  # noqa: F841
+        with pytest.raises(
+            PyDSParamException, match="Do no support other decorators for.*"
+        ):
+            foo()
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
+)
+def test_multiple_tasks_outside(mock_code):
+    """Test multiple decorator tasks which outside process definition."""
+
+    @task
+    def foo():
+        print(TASK_NAME)
+
+    @task
+    def bar():
+        print(TASK_NAME)
+
+    with ProcessDefinition(PD_NAME) as pd:
+        foo = foo()
+        bar = bar()
+
+        foo >> bar
+
+    assert pd is not None and pd.name == PD_NAME
+    assert len(pd.tasks) == 2
+
+    task_foo = pd.get_one_task_by_name("foo")
+    task_bar = pd.get_one_task_by_name("bar")
+    assert set(pd.task_list) == {task_foo, task_bar}
+    assert (
+        task_foo is not None
+        and task_foo._upstream_task_codes == set()
+        and task_foo._downstream_task_codes.pop() == task_bar.code
+    )
+    assert (
+        task_bar is not None
+        and task_bar._upstream_task_codes.pop() == task_foo.code
+        and task_bar._downstream_task_codes == set()
+    )
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
+)
+def test_multiple_tasks_inside(mock_code):
+    """Test multiple decorator tasks which inside process definition."""
+    with ProcessDefinition(PD_NAME) as pd:
+
+        @task
+        def foo():
+            print(TASK_NAME)
+
+        @task
+        def bar():
+            print(TASK_NAME)
+
+        foo = foo()
+        bar = bar()
+
+        foo >> bar
+
+    assert pd is not None and pd.name == PD_NAME
+    assert len(pd.tasks) == 2
+
+    task_foo = pd.get_one_task_by_name("foo")
+    task_bar = pd.get_one_task_by_name("bar")
+    assert set(pd.task_list) == {task_foo, task_bar}
+    assert (
+        task_foo is not None
+        and task_foo._upstream_task_codes == set()
+        and task_foo._downstream_task_codes.pop() == task_bar.code
+    )
+    assert (
+        task_bar is not None
+        and task_bar._upstream_task_codes.pop() == task_foo.code
+        and task_bar._downstream_task_codes == set()
+    )
diff --git a/tests/tasks/test_python.py b/tests/tasks/test_python.py
index dbcd298..1cdd85d 100644
--- a/tests/tasks/test_python.py
+++ b/tests/tasks/test_python.py
@@ -26,11 +26,15 @@ from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.tasks.python import Python
 
 
+def foo():  # noqa: D103
+    print("hello world.")
+
+
 @pytest.mark.parametrize(
     "attr, expect",
     [
         (
-            {"code": "print(1)"},
+            {"definition": "print(1)"},
             {
                 "rawScript": "print(1)",
                 "localParams": [],
@@ -39,7 +43,29 @@ from pydolphinscheduler.tasks.python import Python
                 "waitStartTimeout": {},
                 "conditionResult": {"successNode": [""], "failedNode": [""]},
             },
-        )
+        ),
+        (
+            {"definition": "def foo():\n    print('I am foo')"},
+            {
+                "rawScript": "def foo():\n    print('I am foo')\nfoo()",
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "waitStartTimeout": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+            },
+        ),
+        (
+            {"definition": foo},
+            {
+                "rawScript": 'def foo():  # noqa: D103\n    print("hello world.")\nfoo()',
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "waitStartTimeout": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+            },
+        ),
     ],
 )
 @patch(
@@ -66,15 +92,13 @@ def test_property_task_params(mock_code_version, attr, expect):
 def test_python_task_not_support_code(mock_code, script_code):
     """Test python task parameters."""
     name = "not_support_code_type"
-    with pytest.raises(PyDSParamException, match="Parameter code do not support .*?"):
+    with pytest.raises(
+        PyDSParamException, match="Parameter definition do not support .*?"
+    ):
         task = Python(name, script_code)
         task.raw_script
 
 
-def foo():  # noqa: D103
-    print("hello world.")
-
-
 @pytest.mark.parametrize(
     "name, script_code, raw",
     [
@@ -82,7 +106,7 @@ def foo():  # noqa: D103
         (
             "function_define",
             foo,
-            'def foo():  # noqa: D103\n    print("hello world.")\n',
+            'def foo():  # noqa: D103\n    print("hello world.")\nfoo()',
         ),
     ],
 )
diff --git a/tests/tasks/test_sql.py b/tests/tasks/test_sql.py
index 3f8209c..ee0acc4 100644
--- a/tests/tasks/test_sql.py
+++ b/tests/tasks/test_sql.py
@@ -26,24 +26,38 @@ from pydolphinscheduler.tasks.sql import Sql, SqlType
 
 
 @pytest.mark.parametrize(
-    "sql, sql_type",
+    "sql, param_sql_type, sql_type",
     [
-        ("select 1", SqlType.SELECT),
-        (" select 1", SqlType.SELECT),
-        (" select 1 ", SqlType.SELECT),
-        (" select 'insert' ", SqlType.SELECT),
-        (" select 'insert ' ", SqlType.SELECT),
-        ("with tmp as (select 1) select * from tmp ", SqlType.SELECT),
-        ("insert into table_name(col1, col2) value (val1, val2)", SqlType.NOT_SELECT),
+        ("select 1", None, SqlType.SELECT),
+        (" select 1", None, SqlType.SELECT),
+        (" select 1 ", None, SqlType.SELECT),
+        (" select 'insert' ", None, SqlType.SELECT),
+        (" select 'insert ' ", None, SqlType.SELECT),
+        ("with tmp as (select 1) select * from tmp ", None, SqlType.SELECT),
+        (
+            "insert into table_name(col1, col2) value (val1, val2)",
+            None,
+            SqlType.NOT_SELECT,
+        ),
         (
             "insert into table_name(select, col2) value ('select', val2)",
+            None,
+            SqlType.NOT_SELECT,
+        ),
+        ("update table_name SET col1=val1 where col1=val2", None, SqlType.NOT_SELECT),
+        (
+            "update table_name SET col1='select' where col1=val2",
+            None,
             SqlType.NOT_SELECT,
         ),
-        ("update table_name SET col1=val1 where col1=val2", SqlType.NOT_SELECT),
-        ("update table_name SET col1='select' where col1=val2", SqlType.NOT_SELECT),
-        ("delete from table_name where id < 10", SqlType.NOT_SELECT),
-        ("delete from table_name where id < 10", SqlType.NOT_SELECT),
-        ("alter table table_name add column col1 int", SqlType.NOT_SELECT),
+        ("delete from table_name where id < 10", None, SqlType.NOT_SELECT),
+        ("delete from table_name where id < 10", None, SqlType.NOT_SELECT),
+        ("alter table table_name add column col1 int", None, SqlType.NOT_SELECT),
+        ("create table table_name2 (col1 int)", None, SqlType.NOT_SELECT),
+        ("create table table_name2 (col1 int)", SqlType.SELECT, SqlType.SELECT),
+        ("select 1", SqlType.NOT_SELECT, SqlType.NOT_SELECT),
+        ("create table table_name2 (col1 int)", SqlType.NOT_SELECT, SqlType.NOT_SELECT),
+        ("select 1", SqlType.SELECT, SqlType.SELECT),
     ],
 )
 @patch(
@@ -54,11 +68,13 @@ from pydolphinscheduler.tasks.sql import Sql, SqlType
     "pydolphinscheduler.core.database.Database.get_database_info",
     return_value=({"id": 1, "type": "mock_type"}),
 )
-def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
+def test_get_sql_type(
+    mock_datasource, mock_code_version, sql, param_sql_type, sql_type
+):
     """Test property sql_type could return correct type."""
     name = "test_get_sql_type"
     datasource_name = "test_datasource"
-    task = Sql(name, datasource_name, sql)
+    task = Sql(name, datasource_name, sql, sql_type=param_sql_type)
     assert (
         sql_type == task.sql_type
     ), f"Sql {sql} expect sql type is {sql_type} but got {task.sql_type}"
@@ -73,7 +89,7 @@ def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
                 "sql": "select 1",
                 "type": "MYSQL",
                 "datasource": 1,
-                "sqlType": SqlType.SELECT,
+                "sqlType": "0",
                 "preStatements": [],
                 "postStatements": [],
                 "displayRows": 10,
@@ -122,7 +138,7 @@ def test_sql_get_define(mock_datasource):
             "type": "MYSQL",
             "datasource": 1,
             "sql": command,
-            "sqlType": SqlType.SELECT,
+            "sqlType": "0",
             "displayRows": 10,
             "preStatements": [],
             "postStatements": [],
diff --git a/tests/testing/cli.py b/tests/testing/cli.py
new file mode 100644
index 0000000..0d2c1d1
--- /dev/null
+++ b/tests/testing/cli.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Utils of command line test."""
+
+
+from click.testing import CliRunner
+
+from tests.testing.constants import DEV_MODE
+
+
+class CliTestWrapper:
+    """Wrap command click CliRunner.invoke."""
+
+    def __init__(self, *args, **kwargs):
+        runner = CliRunner()
+        self.result = runner.invoke(*args, **kwargs)
+        self.show_result_output()
+
+    def _assert_output(self, output: str = None, fuzzy: bool = False):
+        """Assert between `CliRunner.invoke.result.output` and parameter `output`.
+
+        :param output: The output will check compare to the ``CliRunner.invoke.output``.
+        :param fuzzy: A flag define whether assert :param:`output` in fuzzy or not.
+            Check if `CliRunner.invoke.output` contain :param:`output` is set ``True``
+            and CliRunner.invoke.output equal to :param:`output` if we set it ``False``.
+        """
+        if not output:
+            return
+        if fuzzy:
+            assert output in self.result.output
+        else:
+            assert self.result.output.rstrip("\n") == output
+
+    def show_result_output(self):
+        """Print `CliRunner.invoke.result` output content in debug mode.
+
+        It read variable named `PY_DOLPHINSCHEDULER_DEV_MODE` from env, when it set to `true` or `t` or `1`
+        will print result output when class :class:`CliTestWrapper` is initialization.
+        """
+        if DEV_MODE:
+            print(f"\n{self.result.output}\n")
+
+    def assert_success(self, output: str = None, fuzzy: bool = False):
+        """Assert test is success.
+
+        It would check whether `CliRunner.invoke.exit_code` equals to `0`, with no
+        exception at the same time. It's also can test the content of `CliRunner.invoke.output`.
+
+        :param output: The output will check compare to the ``CliRunner.invoke.output``.
+        :param fuzzy: A flag define whether assert :param:`output` in fuzzy or not.
+            Check if `CliRunner.invoke.output` contain :param:`output` is set ``True``
+            and CliRunner.invoke.output equal to :param:`output` if we set it ``False``.
+        """
+        assert self.result.exit_code == 0
+        if self.result.exception:
+            raise self.result.exception
+        self._assert_output(output, fuzzy)
+
+    def assert_fail(self, ret_code: int, output: str = None, fuzzy: bool = False):
+        """Assert test is fail.
+
+        It would check whether `CliRunner.invoke.exit_code` equals to :param:`ret_code`,
+        and it will also can test the content of `CliRunner.invoke.output`.
+
+        :param ret_code: The returning code of this fail test.
+        :param output: The output will check compare to the ``CliRunner.invoke.output``.
+        :param fuzzy: A flag define whether assert :param:`output` in fuzzy or not.
+            Check if `CliRunner.invoke.output` contain :param:`output` is set ``True``
+            and CliRunner.invoke.output equal to :param:`output` if we set it ``False``.
+        """
+        assert ret_code == self.result.exit_code
+        self._assert_output(output, fuzzy)
diff --git a/tests/testing/constants.py b/tests/testing/constants.py
index 1552192..ed2ee37 100644
--- a/tests/testing/constants.py
+++ b/tests/testing/constants.py
@@ -17,6 +17,8 @@
 
 """Constants variables for test module."""
 
+import os
+
 # Record some task without example in directory `example`. Some of them maybe can not write example,
 # but most of them just without adding by mistake, and we should add it later.
 task_without_example = {
@@ -26,3 +28,21 @@ task_without_example = {
     "python",
     "procedure",
 }
+
+# The examples ignore test to run it. Those examples could not be run directly cause it need other
+# support like resource files, data source and etc. But we should try to run them later for more coverage
+ignore_exec_examples = {
+    "task_datax_example",
+    "task_flink_example",
+    "task_map_reduce_example",
+    "task_spark_example",
+}
+
+# pydolphinscheduler environment home
+ENV_PYDS_HOME = "PYDS_HOME"
+
+# whether in dev mode, if true we will add or remove some tests. Or make be and more detail infos when
+# test failed.
+DEV_MODE = str(
+    os.environ.get("PY_DOLPHINSCHEDULER_DEV_MODE", False)
+).strip().lower() in {"true", "t", "1"}
diff --git a/src/pydolphinscheduler/core/__init__.py b/tests/testing/decorator.py
similarity index 73%
copy from src/pydolphinscheduler/core/__init__.py
copy to tests/testing/decorator.py
index 31dc944..78078ee 100644
--- a/src/pydolphinscheduler/core/__init__.py
+++ b/tests/testing/decorator.py
@@ -15,14 +15,18 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Init pydolphinscheduler.core package."""
+"""Decorator module for testing module."""
 
-from pydolphinscheduler.core.database import Database
-from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.core.task import Task
+import types
+from functools import wraps
 
-__all__ = [
-    "ProcessDefinition",
-    "Task",
-    "Database",
-]
+
+def foo(func: types.FunctionType):
+    """Decorate which do nothing for testing module."""
+
+    @wraps(func)
+    def wrapper():
+        print("foo decorator called.")
+        func()
+
+    return wrapper
diff --git a/tests/testing/docker_wrapper.py b/tests/testing/docker_wrapper.py
new file mode 100644
index 0000000..a3d0b6e
--- /dev/null
+++ b/tests/testing/docker_wrapper.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Wrap docker commands for easier create docker container."""
+
+import time
+from typing import Optional
+
+import docker
+from docker.errors import ImageNotFound
+from docker.models.containers import Container
+
+
+class DockerWrapper:
+    """Wrap docker commands for easier create docker container.
+
+    :param image: The image to create docker container.
+    """
+
+    def __init__(self, image: str, container_name: str):
+        self._client = docker.from_env()
+        self.image = image
+        self.container_name = container_name
+
+    def run(self, *args, **kwargs) -> Container:
+        """Create and run a new container.
+
+        This method would return immediately after the container started, if you wish it return container
+        object when specific service start, you could see :func:`run_until_log` which return container
+        object when specific output log appear in docker.
+        """
+        if not self.images_exists:
+            raise ValueError("Docker image named %s do not exists.", self.image)
+        return self._client.containers.run(
+            image=self.image, name=self.container_name, detach=True, *args, **kwargs
+        )
+
+    def run_until_log(
+        self, log: str, remove_exists: Optional[bool] = True, *args, **kwargs
+    ) -> Container:
+        """Create and run a new container, return when specific log appear.
+
+        It will call :func:`run` inside this method. And after container started, it would not
+        return it immediately but run command `docker logs` to see whether specific log appear.
+        It will raise `RuntimeError` when 10 minutes after but specific log do not appear.
+        """
+        if remove_exists:
+            self.remove_container()
+
+        log_byte = str.encode(log)
+        container = self.run(*args, **kwargs)
+
+        timeout_threshold = 10 * 60
+        start_time = time.time()
+        while time.time() <= start_time + timeout_threshold:
+            if log_byte in container.logs(tail=1000):
+                break
+            time.sleep(2)
+        # Stop container and raise error when reach timeout threshold but do not appear specific log output
+        else:
+            container.remove(force=True)
+            raise RuntimeError(
+                "Can not capture specific log `%s` in %d seconds, remove container.",
+                (log, timeout_threshold),
+            )
+        return container
+
+    def remove_container(self):
+        """Remove container which already running."""
+        containers = self._client.containers.list(
+            all=True, filters={"name": self.container_name}
+        )
+        if containers:
+            for container in containers:
+                container.remove(force=True)
+
+    @property
+    def images_exists(self) -> bool:
+        """Check whether the image exists in local docker repository or not."""
+        try:
+            self._client.images.get(self.image)
+            return True
+        except ImageNotFound:
+            return False
diff --git a/tests/testing/task.py b/tests/testing/file.py
similarity index 63%
copy from tests/testing/task.py
copy to tests/testing/file.py
index e0affc9..82e0837 100644
--- a/tests/testing/task.py
+++ b/tests/testing/file.py
@@ -15,18 +15,20 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Mock class Task for other test."""
+"""Testing util about file operating."""
 
-import uuid
+from pathlib import Path
+from typing import Union
 
-from pydolphinscheduler.core.task import Task as SourceTask
 
+def get_file_content(path: Union[str, Path]) -> str:
+    """Get file content in given path."""
+    with open(path, mode="r") as f:
+        return f.read()
 
-class Task(SourceTask):
-    """Mock class :class:`pydolphinscheduler.core.task.Task` for unittest."""
 
-    DEFAULT_VERSION = 1
-
-    def gen_code_and_version(self):
-        """Mock java gateway code and version, convenience method for unittest."""
-        return uuid.uuid1().time, self.DEFAULT_VERSION
+def delete_file(path: Union[str, Path]) -> None:
+    """Delete file in given path."""
+    path = Path(path).expanduser() if isinstance(path, str) else path.expanduser()
+    if path.exists():
+        path.unlink()
diff --git a/tests/testing/path.py b/tests/testing/path.py
index 2e75be2..d1e520b 100644
--- a/tests/testing/path.py
+++ b/tests/testing/path.py
@@ -20,13 +20,14 @@
 from pathlib import Path
 from typing import Any, Generator
 
-path_code_tasks = Path(__file__).parent.parent.parent.joinpath(
-    "src", "pydolphinscheduler", "tasks"
-)
-path_example = Path(__file__).parent.parent.parent.joinpath(
-    "src", "pydolphinscheduler", "examples"
+project_root = Path(__file__).parent.parent.parent
+
+path_code_tasks = project_root.joinpath("src", "pydolphinscheduler", "tasks")
+path_example = project_root.joinpath("src", "pydolphinscheduler", "examples")
+path_doc_tasks = project_root.joinpath("docs", "source", "tasks")
+path_default_config_yaml = project_root.joinpath(
+    "src", "pydolphinscheduler", "core", "default_config.yaml"
 )
-path_doc_tasks = Path(__file__).parent.parent.parent.joinpath("docs", "source", "tasks")
 
 
 def get_all_examples() -> Generator[Path, Any, None]:
diff --git a/tests/testing/task.py b/tests/testing/task.py
index e0affc9..11ffbf1 100644
--- a/tests/testing/task.py
+++ b/tests/testing/task.py
@@ -30,3 +30,18 @@ class Task(SourceTask):
     def gen_code_and_version(self):
         """Mock java gateway code and version, convenience method for unittest."""
         return uuid.uuid1().time, self.DEFAULT_VERSION
+
+
+class TaskWithCode(SourceTask):
+    """Mock class :class:`pydolphinscheduler.core.task.Task` and it return some code and version."""
+
+    def __init__(
+        self, name: str, task_type: str, code: int, version: int, *args, **kwargs
+    ):
+        self._constant_code = code
+        self._constant_version = version
+        super().__init__(name, task_type, *args, **kwargs)
+
+    def gen_code_and_version(self):
+        """Mock java gateway code and version, convenience method for unittest."""
+        return self._constant_code, self._constant_version
diff --git a/tests/utils/test_file.py b/tests/utils/test_file.py
new file mode 100644
index 0000000..4cc6df4
--- /dev/null
+++ b/tests/utils/test_file.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test file utils."""
+
+import shutil
+from pathlib import Path
+
+import pytest
+
+from pydolphinscheduler.utils import file
+from tests.testing.file import delete_file, get_file_content
+
+content = "test_content"
+file_path = "/tmp/test/file/test_file_write.txt"
+
+
+@pytest.fixture
+def teardown_del_file():
+    """Teardown about delete file."""
+    yield
+    delete_file(file_path)
+
+
+@pytest.fixture
+def setup_crt_first():
+    """Set up and teardown about create file first and then delete it."""
+    file.write(content=content, to_path=file_path)
+    yield
+    delete_file(file_path)
+
+
+def test_write_content(teardown_del_file):
+    """Test function :func:`write` on write behavior with correct content."""
+    assert not Path(file_path).exists()
+    file.write(content=content, to_path=file_path)
+    assert Path(file_path).exists()
+    assert content == get_file_content(file_path)
+
+
+def test_write_not_create_parent(teardown_del_file):
+    """Test function :func:`write` with parent not exists and do not create path."""
+    file_test_dir = Path(file_path).parent
+    if file_test_dir.exists():
+        shutil.rmtree(str(file_test_dir))
+    assert not file_test_dir.exists()
+    with pytest.raises(
+        ValueError,
+        match="Parent directory do not exists and set param `create` to `False`",
+    ):
+        file.write(content=content, to_path=file_path, create=False)
+
+
+def test_write_overwrite(setup_crt_first):
+    """Test success with file exists but set ``True`` to overwrite."""
+    assert Path(file_path).exists()
+
+    new_content = f"new_{content}"
+    file.write(content=new_content, to_path=file_path, overwrite=True)
+    assert new_content == get_file_content(file_path)
+
+
+def test_write_overwrite_error(setup_crt_first):
+    """Test error with file exists but set ``False`` to overwrite."""
+    assert Path(file_path).exists()
+
+    new_content = f"new_{content}"
+    with pytest.raises(
+        FileExistsError, match=".*already exists and you choose not overwrite mode\\."
+    ):
+        file.write(content=new_content, to_path=file_path)
diff --git a/tests/utils/test_yaml_parser.py b/tests/utils/test_yaml_parser.py
new file mode 100644
index 0000000..ad3aaf7
--- /dev/null
+++ b/tests/utils/test_yaml_parser.py
@@ -0,0 +1,255 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test utils.path_dict module."""
+
+from typing import Dict
+
+import pytest
+from ruamel.yaml import YAML
+
+from pydolphinscheduler.utils.yaml_parser import YamlParser
+from tests.testing.path import path_default_config_yaml
+
+yaml = YAML()
+
+expects = [
+    {
+        # yaml.load("no need test") is a flag about skipping it because it to different to maintainer
+        "name": yaml.load("no need test"),
+        "name.family": ("Smith", "SmithEdit"),
+        "name.given": ("Alice", "AliceEdit"),
+        "name.mark": yaml.load("no need test"),
+        "name.mark.name_mark": yaml.load("no need test"),
+        "name.mark.name_mark.key": ("value", "valueEdit"),
+    },
+    {
+        # yaml.load("no need test") is a flag about skipping it because it to different to maintainer
+        "java_gateway": yaml.load("no need test"),
+        "java_gateway.address": ("127.0.0.1", "127.1.1.1"),
+        "java_gateway.port": (25333, 25555),
+        "java_gateway.auto_convert": (True, False),
+        "default": yaml.load("no need test"),
+        "default.user": yaml.load("no need test"),
+        "default.user.name": ("userPythonGateway", "userPythonGatewayEdit"),
+        "default.user.password": ("userPythonGateway", "userPythonGatewayEdit"),
+        "default.user.email": (
+            "userPythonGateway@dolphinscheduler.com",
+            "userEdit@dolphinscheduler.com",
+        ),
+        "default.user.tenant": ("tenant_pydolphin", "tenant_pydolphinEdit"),
+        "default.user.phone": (11111111111, 22222222222),
+        "default.user.state": (1, 0),
+        "default.workflow": yaml.load("no need test"),
+        "default.workflow.project": ("project-pydolphin", "project-pydolphinEdit"),
+        "default.workflow.tenant": ("tenant_pydolphin", "SmithEdit"),
+        "default.workflow.user": ("userPythonGateway", "SmithEdit"),
+        "default.workflow.queue": ("queuePythonGateway", "queueEdit"),
+        "default.workflow.worker_group": ("default", "wgEdit"),
+        "default.workflow.release_state": ("online", "offline"),
+        "default.workflow.time_zone": ("Asia/Shanghai", "Europe/Amsterdam"),
+        "default.workflow.warning_type": ("NONE", "SUCCESS"),
+    },
+]
+
+param = [
+    """#example
+name:
+  # details
+  family: Smith   # very common
+  given: Alice    # one of the siblings
+  mark:
+    name_mark:
+      key: value
+"""
+]
+
+with open(path_default_config_yaml, "r") as f:
+    param.append(f.read())
+
+
+@pytest.mark.parametrize(
+    "src, delimiter, expect",
+    [
+        (
+            param[0],
+            "|",
+            expects[0],
+        ),
+        (
+            param[1],
+            "/",
+            expects[1],
+        ),
+    ],
+)
+def test_yaml_parser_specific_delimiter(src: str, delimiter: str, expect: Dict):
+    """Test specific delimiter for :class:`YamlParser`."""
+
+    def ch_dl(key):
+        return key.replace(".", delimiter)
+
+    yaml_parser = YamlParser(src, delimiter=delimiter)
+    assert all(
+        [
+            expect[key][0] == yaml_parser[ch_dl(key)]
+            for key in expect
+            if expect[key] != "no need test"
+        ]
+    )
+    assert all(
+        [
+            expect[key][0] == yaml_parser.get(ch_dl(key))
+            for key in expect
+            if expect[key] != "no need test"
+        ]
+    )
+
+
+@pytest.mark.parametrize(
+    "src, expect",
+    [
+        (
+            param[0],
+            expects[0],
+        ),
+        (
+            param[1],
+            expects[1],
+        ),
+    ],
+)
+def test_yaml_parser_contains(src: str, expect: Dict):
+    """Test magic function :func:`YamlParser.__contain__` also with `key in obj` syntax."""
+    yaml_parser = YamlParser(src)
+    assert len(expect.keys()) == len(
+        yaml_parser.dict_parser.keys()
+    ), "Parser keys length not equal to expect keys length"
+    assert all(
+        [key in yaml_parser for key in expect]
+    ), "Parser keys not equal to expect keys"
+
+
+@pytest.mark.parametrize(
+    "src, expect",
+    [
+        (
+            param[0],
+            expects[0],
+        ),
+        (
+            param[1],
+            expects[1],
+        ),
+    ],
+)
+def test_yaml_parser_get(src: str, expect: Dict):
+    """Test magic function :func:`YamlParser.__getitem__` also with `obj[key]` syntax."""
+    yaml_parser = YamlParser(src)
+    assert all(
+        [
+            expect[key][0] == yaml_parser[key]
+            for key in expect
+            if expect[key] != "no need test"
+        ]
+    )
+    assert all(
+        [
+            expect[key][0] == yaml_parser.get(key)
+            for key in expect
+            if expect[key] != "no need test"
+        ]
+    )
+
+
+@pytest.mark.parametrize(
+    "src, expect",
+    [
+        (
+            param[0],
+            expects[0],
+        ),
+        (
+            param[1],
+            expects[1],
+        ),
+    ],
+)
+def test_yaml_parser_set(src: str, expect: Dict):
+    """Test magic function :func:`YamlParser.__setitem__` also with `obj[key] = val` syntax."""
+    yaml_parser = YamlParser(src)
+    for key in expect:
+        assert key in yaml_parser.dict_parser.keys()
+        if expect[key] == "no need test":
+            continue
+        assert expect[key][0] == yaml_parser.dict_parser[key]
+        assert expect[key][1] != yaml_parser.dict_parser[key]
+
+        yaml_parser[key] = expect[key][1]
+        assert expect[key][0] != yaml_parser.dict_parser[key]
+        assert expect[key][1] == yaml_parser.dict_parser[key]
+
+
+@pytest.mark.parametrize(
+    "src, setter, expect",
+    [
+        (
+            param[0],
+            {"name.mark.name_mark.key": "edit"},
+            """#example
+name:
+  # details
+  family: Smith   # very common
+  given: Alice    # one of the siblings
+  mark:
+    name_mark:
+      key: edit
+""",
+        ),
+        (
+            param[0],
+            {
+                "name.family": "SmithEdit",
+                "name.given": "AliceEdit",
+                "name.mark.name_mark.key": "edit",
+            },
+            """#example
+name:
+  # details
+  family: SmithEdit # very common
+  given: AliceEdit # one of the siblings
+  mark:
+    name_mark:
+      key: edit
+""",
+        ),
+    ],
+)
+def test_yaml_parser_str_repr(src: str, setter: Dict, expect: str):
+    """Test function :func:`YamlParser.to_string`."""
+    yaml_parser = YamlParser(src)
+
+    # Equal before change
+    assert f"YamlParser({src})" == repr(yaml_parser)
+    assert src == str(yaml_parser)
+
+    for key, val in setter.items():
+        yaml_parser[key] = val
+
+    # Equal after changed
+    assert expect == str(yaml_parser)
+    assert f"YamlParser({expect})" == repr(yaml_parser)
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 0000000..8e9280f
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[tox]
+envlist = local-ci, auto-lint, lint, doc-build-test, code-test, integrate-test, py{36,37,38,39}
+
+[testenv]
+whitelist_externals = make
+
+[testenv:auto-lint]
+extras = style
+commands =
+    python -m isort .
+    python -m black .
+    python -m autoflake --in-place --remove-all-unused-imports --ignore-init-module-imports --recursive .
+
+[testenv:lint]
+extras = style
+commands =
+    python -m isort --check .
+    python -m black --check .
+    python -m flake8
+    python -m autoflake --remove-all-unused-imports --ignore-init-module-imports --check --recursive .
+    
+[testenv:code-test]
+extras = test
+# Run both tests and coverage
+commands =
+    python -m pytest --cov=pydolphinscheduler --cov-config={toxinidir}/.coveragerc tests/
+
+[testenv:doc-build-test]
+extras = doc
+commands =
+    make -C {toxinidir}/docs clean
+    make -C {toxinidir}/docs html
+
+[testenv:integrate-test]
+extras = test
+commands =
+    python -m pytest tests/integration/
+
+[testenv:local-ci]
+extras = dev
+commands =
+    {[testenv:lint]commands}
+    {[testenv:code-test]commands}
+    {[testenv:doc-build-test]commands}