You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/04/15 07:50:57 UTC

[dolphinscheduler] branch dev updated: [python] Add task decorator for python function (#9496)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1f48601c75 [python] Add task decorator for python function (#9496)
1f48601c75 is described below

commit 1f48601c759df8dae82e966bed1a346416c7449a
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Fri Apr 15 15:50:52 2022 +0800

    [python] Add task decorator for python function (#9496)
    
    * [python] Add task decorator for python function
    
    * Add decorator `@task`
    * Add a tutorial about it
    * Change tutorial doc and combine into traditional docs
      * Add sphinx-inline-tab for better view
    
    * revert not need change
    
    * Correct python function indent
    
    * Correct integration test
---
 .../pydolphinscheduler/docs/source/conf.py         |   2 +
 .../pydolphinscheduler/docs/source/howto/index.rst |   8 +-
 .../pydolphinscheduler/docs/source/start.rst       |   9 +-
 .../docs/source/tasks/{index.rst => func_wrap.rst} |  40 ++--
 .../pydolphinscheduler/docs/source/tasks/index.rst |   1 +
 .../pydolphinscheduler/docs/source/tutorial.rst    | 233 ++++++++++++++-------
 .../pydolphinscheduler/setup.py                    |   1 +
 .../src/pydolphinscheduler/core/__init__.py        |   2 +
 .../examples/tutorial_decorator.py                 |  91 ++++++++
 .../src/pydolphinscheduler/tasks/func_wrap.py      |  61 ++++++
 .../src/pydolphinscheduler/tasks/python.py         |  71 ++++++-
 .../tests/example/test_example.py                  |   4 +-
 .../tests/integration/test_submit_examples.py      |   5 +-
 .../tests/tasks/test_func_wrap.py                  | 169 +++++++++++++++
 .../pydolphinscheduler/tests/tasks/test_python.py  |  40 +++-
 .../__init__.py => tests/testing/decorator.py}     |  22 +-
 16 files changed, 620 insertions(+), 139 deletions(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py b/dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py
index e22b3bb1b8..efede5c299 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py
@@ -57,6 +57,8 @@ extensions = [
     "sphinx_rtd_theme",
     # Documenting command line interface
     "sphinx_click.ext",
+    # Add inline tabbed content
+    "sphinx_inline_tabs",
 ]
 
 # Add any paths that contain templates here, relative to this directory.
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/howto/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/howto/index.rst
index e83b1631cf..a0b3c29c0c 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/howto/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/howto/index.rst
@@ -15,10 +15,14 @@
    specific language governing permissions and limitations
    under the License.
 
-How To
+HOWTOs
 ======
 
-In this section 
+pydolphinscheduler HOWTOs are documents that cover a single, specific topic, and attempt to cover it fairly
+completely. This collection is an effort to foster documentation that is more detailed than the :doc:`../concept`
+and :doc:`../tutorial`.
+
+Currently, the HOWTOs are:
 
 .. toctree::
    :maxdepth: 2
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst
index e411d7bf72..6663c085e9 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst
@@ -137,13 +137,16 @@ from the API server, you should first change pydolphinscheduler configuration an
 
    You could see more information in :doc:`config` about all the configurations pydolphinscheduler supported.
 
+After that, you could go and see your DolphinScheduler web UI to find out a new workflow created by pydolphinscheduler,
+and the path of web UI is `Project -> Workflow -> Workflow Definition`.
+
 
 What's More
 -----------
 
-If you do not familiar with *PyDolphinScheduler*, you could go to :doc:`tutorial`
-and see how it work. But if you already know the inside of *PyDolphinScheduler*,
-maybe you could go and play with all :doc:`tasks/index` *PyDolphinScheduler* supports.
+If you do not familiar with *PyDolphinScheduler*, you could go to :doc:`tutorial` and see how it works. But
+if you already know the basic usage or concept of *PyDolphinScheduler*, you could go and play with all
+:doc:`tasks/index` *PyDolphinScheduler* supports, or see our :doc:`howto/index` about useful cases.
 
 .. _`instructions for all platforms here`: https://wiki.python.org/moin/BeginnersGuide/Download
 .. _`Apache DolphinScheduler`: https://dolphinscheduler.apache.org
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst
similarity index 69%
copy from dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
copy to dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst
index 42dcdf9c8c..5f41b80cfd 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst
@@ -15,27 +15,19 @@
    specific language governing permissions and limitations
    under the License.
 
-Tasks
-=====
-
-In this section 
-
-.. toctree::
-   :maxdepth: 1
-   
-   shell
-   sql
-   python
-   http
-
-   switch
-   condition
-   dependent
-
-   spark
-   flink
-   map_reduce
-   procedure
-
-   datax
-   sub_process
+Python Function Wrapper
+=======================
+
+A decorator covert Python function into pydolphinscheduler's task.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/tutorial_decorator.py
+   :start-after: [start tutorial]
+   :end-before: [end tutorial]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.func_wrap
\ No newline at end of file
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
index 42dcdf9c8c..d6bbb960c1 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
@@ -23,6 +23,7 @@ In this section
 .. toctree::
    :maxdepth: 1
    
+   func_wrap
    shell
    sql
    python
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
index e0f22fb816..6366c803bb 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
@@ -18,129 +18,202 @@
 Tutorial
 ========
 
-This tutorial show you the basic concept of *PyDolphinScheduler* and tell all
+This tutorial shows you the basic concept of *PyDolphinScheduler* and tells all
 things you should know before you submit or run your first workflow. If you
-still not install *PyDolphinScheduler* and start Apache DolphinScheduler, you
-could go and see :ref:`how to getting start PyDolphinScheduler <start:getting started>`
+still have not installed *PyDolphinScheduler* and start DolphinScheduler, you
+could go and see :ref:`how to getting start PyDolphinScheduler <start:getting started>` firstly.
 
 Overview of Tutorial
 --------------------
 
-Here have an overview of our tutorial, and it look a little complex but do not
-worry about that because we explain this example below as detailed as possible.
+Here have an overview of our tutorial, and it looks a little complex but does not
+worry about that because we explain this example below as detail as possible.
 
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
-   :start-after: [start tutorial]
-   :end-before: [end tutorial]
+There are two types of tutorials: traditional and task decorator.
+
+- **Traditional Way**: More general, support many :doc:`built-in task types <tasks/index>`, it is convenient
+  when you build your workflow at the beginning.
+- **Task Decorator**: A Python decorator allow you to wrap your function into pydolphinscheduler's task. Less
+  versatility to the traditional way because it only supported Python functions and without build-in tasks
+  supported. But it is helpful if your workflow is all built with Python or if you already have some Python
+  workflow code and want to migrate them to pydolphinscheduler.
+
+.. tab:: Tradition
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+      :dedent: 0
+      :start-after: [start tutorial]
+      :end-before: [end tutorial]
+
+.. tab:: Task Decorator
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+      :dedent: 0
+      :start-after: [start tutorial]
+      :end-before: [end tutorial]
 
 Import Necessary Module
 -----------------------
 
-First of all, we should importing necessary module which we would use later just
-like other Python package. We just create a minimum demo here, so we just import
-:class:`pydolphinscheduler.core.process_definition` and
-:class:`pydolphinscheduler.tasks.shell`.
+First of all, we should import the necessary module which we would use later just like other Python packages.
 
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
-   :start-after: [start package_import]
-   :end-before: [end package_import]
+.. tab:: Tradition
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+      :dedent: 0
+      :start-after: [start package_import]
+      :end-before: [end package_import]
 
-If you want to use other task type you could click and
-:doc:`see all tasks we support <tasks/index>`
+   In tradition tutorial we import :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` and
+   :class:`pydolphinscheduler.tasks.shell.Shell`.
+
+   If you want to use other task type you could click and :doc:`see all tasks we support <tasks/index>`
+
+.. tab:: Task Decorator
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+      :dedent: 0
+      :start-after: [start package_import]
+      :end-before: [end package_import]
+
+   In task decorator tutorial we import :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` and
+   :func:`pydolphinscheduler.tasks.func_wrap.task`.
 
 Process Definition Declaration
 ------------------------------
 
-We should instantiate object after we import them from `import necessary module`_.
-Here we declare basic arguments for process definition(aka, workflow). We define
-the name of process definition, using `Python context manager`_ and it
-**the only required argument** for object process definition. Beside that we also
-declare three arguments named `schedule`, `start_time` which setting workflow schedule
-interval and schedule start_time, and argument `tenant` which changing workflow's
-task running user in the worker, :ref:`section tenant <concept:tenant>` in *PyDolphinScheduler*
-:doc:`concept` page have more detail information.
+We should instantiate :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` object after we
+import them from `import necessary module`_. Here we declare basic arguments for process definition(aka, workflow).
+We define the name of :code:`ProcessDefinition`, using `Python context manager`_ and it **the only required argument**
+for `ProcessDefinition`. Besides, we also declare three arguments named :code:`schedule` and :code:`start_time`
+which setting workflow schedule interval and schedule start_time, and argument :code:`tenant` defines which tenant
+will be running this task in the DolphinScheduler worker. See :ref:`section tenant <concept:tenant>` in
+*PyDolphinScheduler* :doc:`concept` for more information.
 
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
-   :start-after: [start workflow_declare]
-   :end-before: [end workflow_declare]
+.. tab:: Tradition
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+      :dedent: 0
+      :start-after: [start workflow_declare]
+      :end-before: [end workflow_declare]
+
+.. tab:: Task Decorator
 
-We could find more detail about process definition in
-:ref:`concept about process definition <concept:process definition>` if you interested in it.
-For all arguments of object process definition, you could find in the
-:class:`pydolphinscheduler.core.process_definition` api documentation.
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+      :dedent: 0
+      :start-after: [start workflow_declare]
+      :end-before: [end workflow_declare]
+
+We could find more detail about :code:`ProcessDefinition` in :ref:`concept about process definition <concept:process definition>`
+if you are interested in it. For all arguments of object process definition, you could find in the
+:class:`pydolphinscheduler.core.process_definition` API documentation.
 
 Task Declaration
 ----------------
 
-Here we declare four tasks, and bot of them are simple task of
-:class:`pydolphinscheduler.tasks.shell` which running `echo` command in terminal.
-Beside the argument `command`, we also need setting argument `name` for each task *(not
-only shell task, `name` is required for each type of task)*.
+.. tab:: Tradition
 
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
-   :dedent: 0
-   :start-after: [start task_declare]
-   :end-before: [end task_declare]
+   We declare four tasks to show how to create tasks, and both of them are simple tasks of
+   :class:`pydolphinscheduler.tasks.shell` which runs `echo` command in the terminal. Besides the argument
+   `command` with :code:`echo` command, we also need to set the argument `name` for each task
+   *(not only shell task, `name` is required for each type of task)*.
+   
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+      :dedent: 0
+      :start-after: [start task_declare]
+      :end-before: [end task_declare]
+
+   Besides shell task, *PyDolphinScheduler* supports multiple tasks and you could find in :doc:`tasks/index`.
+
+.. tab:: Task Decorator
 
-Beside shell task, *PyDolphinScheduler* support multiple tasks and you could
-find in :doc:`tasks/index`.
+   We declare four tasks to show how to create tasks, and both of them are created by the task decorator which
+   using :func:`pydolphinscheduler.tasks.func_wrap.task`. All we have to do is add a decorator named
+   :code:`@task` to existing Python function, and then use them inside :class:`pydolphinscheduler.core.process_definition`
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+      :dedent: 0
+      :start-after: [start task_declare]
+      :end-before: [end task_declare]
+
+   It makes our workflow more Pythonic, but be careful that when we use task decorator mode mean we only use
+   Python function as a task and could not use the :doc:`built-in tasks <tasks/index>` most of the cases.
 
 Setting Task Dependence
 -----------------------
 
-After we declare both process definition and task, we have one workflow with
-four tasks, both all tasks is independent so that they would run in parallel.
-We should reorder the sort and the dependence of tasks. It useful when we need
-run prepare task before we run actual task or we need tasks running is specific
-rule. We both support attribute `set_downstream` and `set_upstream`, or bitwise
-operators `>>` and `<<`.
+After we declare both process definition and task, we have four tasks that are independent and will be running
+in parallel. If you want to start one task until some task is finished, you have to set dependence on those
+tasks.
 
-In this example, we set task `task_parent` is the upstream task of task
-`task_child_one` and `task_child_two`, and task `task_union` is the downstream
-task of both these two task.
+Set task dependence is quite easy by task's attribute :code:`set_downstream` and :code:`set_upstream` or by
+bitwise operators :code:`>>` and :code:`<<`
 
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
-   :dedent: 0
-   :start-after: [start task_relation_declare]
-   :end-before: [end task_relation_declare]
+In this tutorial, task `task_parent` is the leading task of the whole workflow, then task `task_child_one` and
+task `task_child_two` are its downstream tasks. Task `task_union` will not run unless both task `task_child_one`
+and task `task_child_two` was done, because both two task is `task_union`'s upstream.
+
+.. tab:: Tradition
+   
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+      :dedent: 0
+      :start-after: [start task_relation_declare]
+      :end-before: [end task_relation_declare]
 
-Please notice that we could grouping some tasks and set dependence if they have
-same downstream or upstream. We declare task `task_child_one` and `task_child_two`
-as a group here, named as `task_group` and set task `task_parent` as upstream of
-both of them. You could see more detail in :ref:`concept:Tasks Dependence` section in concept
-documentation.
+.. tab:: Task Decorator
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+      :dedent: 0
+      :start-after: [start task_relation_declare]
+      :end-before: [end task_relation_declare]
+
+.. note::
+
+   We could set task dependence in batch mode if they have the same downstream or upstream by declaring those
+   tasks as task groups. In tutorial, We declare task `task_child_one` and `task_child_two` as task group named
+   `task_group`, then set `task_group` as downstream of task `task_parent`. You could see more detail in
+   :ref:`concept:Tasks Dependence` for more detail about how to set task dependence.
 
 Submit Or Run Workflow
 ----------------------
 
-Now we finish our workflow definition, with task and task dependence, but all
-these things are in local, we should let Apache DolphinScheduler daemon know what we
-define our workflow. So the last thing we have to do here is submit our workflow to
-Apache DolphinScheduler daemon.
+After that, we finish our workflow definition, with four tasks and task dependence, but all these things are
+local, we should let the DolphinScheduler daemon know how the definition of workflow. So the last thing we
+have to do is submit the workflow to the DolphinScheduler daemon.
 
-We here in the example using `ProcessDefinition` attribute `run` to submit workflow
-to the daemon, and set the schedule time we just declare in `process definition declaration`_.
+Fortunately, we have a convenient method to submit workflow via `ProcessDefinition` attribute :code:`run` which
+will create workflow definition as well as workflow schedule.
 
-Now, we could run the Python code like other Python script, for the basic usage run
-:code:`python tutorial.py` to trigger and run it.
+.. tab:: Tradition
+   
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+      :dedent: 0
+      :start-after: [start submit_or_run]
+      :end-before: [end submit_or_run]
 
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
-   :dedent: 0
-   :start-after: [start submit_or_run]
-   :end-before: [end submit_or_run]
+.. tab:: Task Decorator
+
+   .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+      :dedent: 0
+      :start-after: [start submit_or_run]
+      :end-before: [end submit_or_run]
+
+At last, we could execute this workflow code in your terminal like other Python scripts, running
+:code:`python tutorial.py` to trigger and execute it.
+
+.. note::
 
-If you not start your Apache DolphinScheduler server, you could find the way in
-:ref:`start:start Python gateway service` and it would have more detail about related server
-start. Beside attribute `run`, we have attribute `submit` for object `ProcessDefinition`
-and it just submit workflow to the daemon but not setting the schedule information. For
-more detail you could see :ref:`concept:process definition`.
+   If you do not start your DolphinScheduler API server, you could find how to start it in
+   :ref:`start:start Python gateway service` for more detail. Besides attribute :code:`run`, we have attribute
+   :code:`submit` for object `ProcessDefinition` which just submits workflow to the daemon but does not set
+   the workflow schedule information. For more detail, you could see :ref:`concept:process definition`.
 
 DAG Graph After Tutorial Run
 ----------------------------
 
-After we run the tutorial code, you could login Apache DolphinScheduler web UI,
-go and see the `DolphinScheduler project page`_. they is a new process definition be
-created and named "Tutorial". It create by *PyDolphinScheduler* and the DAG graph as below
+After we run the tutorial code, you could log in DolphinScheduler web UI, go and see the
+`DolphinScheduler project page`_. They is a new process definition be created by *PyDolphinScheduler* and it
+named "tutorial" or "tutorial_decorator". The task graph of workflow like below:
 
 .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
    :language: text
diff --git a/dolphinscheduler-python/pydolphinscheduler/setup.py b/dolphinscheduler-python/pydolphinscheduler/setup.py
index 9632bb1fd9..62f20a54ce 100644
--- a/dolphinscheduler-python/pydolphinscheduler/setup.py
+++ b/dolphinscheduler-python/pydolphinscheduler/setup.py
@@ -51,6 +51,7 @@ doc = [
     "sphinx>=4.3",
     "sphinx_rtd_theme>=1.0",
     "sphinx-click>=3.0",
+    "sphinx-inline-tabs",
 ]
 
 test = [
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
index 31dc9446d8..7497d1f289 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
@@ -18,10 +18,12 @@
 """Init pydolphinscheduler.core package."""
 
 from pydolphinscheduler.core.database import Database
+from pydolphinscheduler.core.engine import Engine
 from pydolphinscheduler.core.process_definition import ProcessDefinition
 from pydolphinscheduler.core.task import Task
 
 __all__ = [
+    "Engine",
     "ProcessDefinition",
     "Task",
     "Database",
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_decorator.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_decorator.py
new file mode 100644
index 0000000000..986c1bbb6e
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_decorator.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+r"""
+A tutorial example take you to experience pydolphinscheduler.
+
+After tutorial.py file submit to Apache DolphinScheduler server a DAG would be create,
+and workflow DAG graph as below:
+
+                  --> task_child_one
+                /                    \
+task_parent -->                        -->  task_union
+                \                    /
+                  --> task_child_two
+
+it will instantiate and run all the task it have.
+"""
+
+# [start tutorial]
+# [start package_import]
+# Import ProcessDefinition object to define your workflow attributes
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+
+# Import task Shell object cause we would create some shell tasks later
+from pydolphinscheduler.tasks.func_wrap import task
+
+# [end package_import]
+
+
+# [start task_declare]
+@task
+def task_parent():
+    """First task in this workflow."""
+    print("echo hello pydolphinscheduler")
+
+
+@task
+def task_child_one():
+    """Child task will be run parallel after task ``task_parent`` finished."""
+    print("echo 'child one'")
+
+
+@task
+def task_child_two():
+    """Child task will be run parallel after task ``task_parent`` finished."""
+    print("echo 'child two'")
+
+
+@task
+def task_union():
+    """Last task in this workflow."""
+    print("echo union")
+
+
+# [end task_declare]
+
+
+# [start workflow_declare]
+with ProcessDefinition(
+    name="tutorial_decorator",
+    schedule="0 0 0 * * ? *",
+    start_time="2021-01-01",
+    tenant="tenant_exists",
+) as pd:
+    # [end workflow_declare]
+
+    # [start task_relation_declare]
+    task_group = [task_child_one(), task_child_two()]
+    task_parent().set_downstream(task_group)
+
+    task_union() << task_group
+    # [end task_relation_declare]
+
+    # [start submit_or_run]
+    pd.run()
+    # [end submit_or_run]
+# [end tutorial]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/func_wrap.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/func_wrap.py
new file mode 100644
index 0000000000..c0b73a1fc2
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/func_wrap.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task function wrapper allows using decorator to create a task."""
+
+import functools
+import inspect
+import itertools
+import types
+
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.tasks.python import Python
+
+
+def _get_func_str(func: types.FunctionType) -> str:
+    """Get Python function string without indent from decorator.
+
+    :param func: The function which wraps by decorator ``@task``.
+    """
+    lines = inspect.getsourcelines(func)[0]
+
+    src_strip = ""
+    lead_space_num = None
+    for line in lines:
+        if lead_space_num is None:
+            lead_space_num = sum(1 for _ in itertools.takewhile(str.isspace, line))
+        if line.strip() == "@task":
+            continue
+        elif line.strip().startswith("@"):
+            raise PyDSParamException(
+                "Do no support other decorators for function ``task`` decorator."
+            )
+        src_strip += line[lead_space_num:]
+    return src_strip
+
+
+def task(func: types.FunctionType):
+    """Decorate which covert Python function into pydolphinscheduler task."""
+
+    @functools.wraps(func)
+    def wrapper(*args, **kwargs):
+        func_str = _get_func_str(func)
+        return Python(
+            name=kwargs.get("name", func.__name__), definition=func_str, *args, **kwargs
+        )
+
+    return wrapper
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
index 79504808c8..52903d48d9 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
@@ -18,34 +18,85 @@
 """Task Python."""
 
 import inspect
+import logging
+import re
 import types
-from typing import Any
+from typing import Union
 
 from pydolphinscheduler.constants import TaskType
 from pydolphinscheduler.core.task import Task
 from pydolphinscheduler.exceptions import PyDSParamException
 
+log = logging.getLogger(__file__)
+
 
 class Python(Task):
-    """Task Python object, declare behavior for Python task to dolphinscheduler."""
+    """Task Python object, declare behavior for Python task to dolphinscheduler.
+
+    Python task support two types of parameters for :param:``code``, and here is an example:
+
+    Using str type of :param:``code``
+
+    .. code-block:: python
+
+        python_task = Python(name="str_type", code="print('Hello Python task.')")
+
+    Or using Python callable type of :param:``code``
+
+    .. code-block:: python
+
+        def foo():
+            print("Hello Python task.")
+
+        python_task = Python(name="str_type", code=foo)
+
+    :param name: The name for Python task. It define the task name.
+    :param definition: String format of Python script you want to execute or Python callable you
+        want to execute.
+    """
 
     _task_custom_attr = {
         "raw_script",
     }
 
-    def __init__(self, name: str, code: Any, *args, **kwargs):
+    def __init__(
+        self, name: str, definition: Union[str, types.FunctionType], *args, **kwargs
+    ):
         super().__init__(name, TaskType.PYTHON, *args, **kwargs)
-        self._code = code
+        self.definition = definition
+
+    def _build_exe_str(self) -> str:
+        """Build executable string from given definition.
+
+        Attribute ``self.definition`` almost is a function, we need to call this function after parsing it
+        to string. The easier way to call a function is using syntax ``func()`` and we use it to call it too.
+        """
+        if isinstance(self.definition, types.FunctionType):
+            py_function = inspect.getsource(self.definition)
+            func_str = f"{py_function}{self.definition.__name__}()"
+        else:
+            pattern = re.compile("^def (\\w+)\\(")
+            find = pattern.findall(self.definition)
+            if not find:
+                log.warning(
+                    "Python definition is simple script instead of function, with value %s",
+                    self.definition,
+                )
+                return self.definition
+            # Keep function str and function callable always have one blank line
+            func_str = (
+                f"{self.definition}{find[0]}()"
+                if self.definition.endswith("\n")
+                else f"{self.definition}\n{find[0]}()"
+            )
+        return func_str
 
     @property
     def raw_script(self) -> str:
         """Get python task define attribute `raw_script`."""
-        if isinstance(self._code, str):
-            return self._code
-        elif isinstance(self._code, types.FunctionType):
-            py_function = inspect.getsource(self._code)
-            return py_function
+        if isinstance(self.definition, (str, types.FunctionType)):
+            return self._build_exe_str()
         else:
             raise PyDSParamException(
-                "Parameter code do not support % for now.", type(self._code)
+                "Parameter definition do not support % for now.", type(self.definition)
             )
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py b/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
index 5bf897f560..70f367767c 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
@@ -44,7 +44,7 @@ def test_task_without_example():
     Avoiding add new type of tasks but without adding example describe how to use it.
     """
     # We use example/tutorial.py as shell task example
-    ignore_name = {"__init__.py", "shell.py"}
+    ignore_name = {"__init__.py", "shell.py", "func_wrap.py"}
     all_tasks = {task.stem for task in get_tasks(ignore_name=ignore_name)}
 
     have_example_tasks = set()
@@ -97,7 +97,7 @@ def test_example_basic():
         ), f"We expect all examples is python script, but get {ex.name}."
 
         # All except tutorial and __init__ is end with keyword "_example"
-        if ex.stem != "tutorial" and ex.stem != "__init__":
+        if ex.stem not in ("tutorial", "tutorial_decorator") and ex.stem != "__init__":
             assert ex.stem.endswith(
                 "_example"
             ), f"We expect all examples script end with keyword '_example', but get {ex.stem}."
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
index 85e5e23e31..218fa4a55c 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
@@ -18,6 +18,7 @@
 """Test whether success submit examples DAG to PythonGatewayService."""
 
 from pathlib import Path
+from subprocess import Popen
 
 import pytest
 
@@ -36,6 +37,8 @@ from tests.testing.path import path_example
 def test_exec_white_list_example(example_path: Path):
     """Test execute examples and submit DAG to PythonGatewayService."""
     try:
-        exec(example_path.read_text())
+        # Because our task decorator used module ``inspect`` to get the source, and it will
+        # raise IOError when call it by built-in function ``exec``, so we change to ``subprocess.Popen``
+        Popen(["python", str(example_path)])
     except Exception:
         raise Exception("Run example %s failed.", example_path.stem)
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_func_wrap.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_func_wrap.py
new file mode 100644
index 0000000000..628b6e7f86
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_func_wrap.py
@@ -0,0 +1,169 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test module about function wrap task decorator."""
+
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.tasks.func_wrap import task
+from tests.testing.decorator import foo as foo_decorator
+from tests.testing.task import Task
+
+PD_NAME = "test_process_definition"
+TASK_NAME = "test_task"
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
+)
+def test_single_task_outside(mock_code):
+    """Test single decorator task which outside process definition."""
+
+    @task
+    def foo():
+        print(TASK_NAME)
+
+    with ProcessDefinition(PD_NAME) as pd:
+        foo()
+
+    assert pd is not None and pd.name == PD_NAME
+    assert len(pd.tasks) == 1
+
+    pd_task = pd.tasks[12345]
+    assert pd_task.name == "foo"
+    assert pd_task.raw_script == "def foo():\n    print(TASK_NAME)\nfoo()"
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
+)
+def test_single_task_inside(mock_code):
+    """Test single decorator task which inside process definition."""
+    with ProcessDefinition(PD_NAME) as pd:
+
+        @task
+        def foo():
+            print(TASK_NAME)
+
+        foo()
+
+    assert pd is not None and pd.name == PD_NAME
+    assert len(pd.tasks) == 1
+
+    pd_task = pd.tasks[12345]
+    assert pd_task.name == "foo"
+    assert pd_task.raw_script == "def foo():\n    print(TASK_NAME)\nfoo()"
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
+)
+def test_addition_decorator_error(mock_code):
+    """Test error when using task decorator to a function already have decorator."""
+
+    @task
+    @foo_decorator
+    def foo():
+        print(TASK_NAME)
+
+    with ProcessDefinition(PD_NAME) as pd:  # noqa: F841
+        with pytest.raises(
+            PyDSParamException, match="Do no support other decorators for.*"
+        ):
+            foo()
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
+)
+def test_multiple_tasks_outside(mock_code):
+    """Test multiple decorator tasks which outside process definition."""
+
+    @task
+    def foo():
+        print(TASK_NAME)
+
+    @task
+    def bar():
+        print(TASK_NAME)
+
+    with ProcessDefinition(PD_NAME) as pd:
+        foo = foo()
+        bar = bar()
+
+        foo >> bar
+
+    assert pd is not None and pd.name == PD_NAME
+    assert len(pd.tasks) == 2
+
+    task_foo = pd.get_one_task_by_name("foo")
+    task_bar = pd.get_one_task_by_name("bar")
+    assert set(pd.task_list) == {task_foo, task_bar}
+    assert (
+        task_foo is not None
+        and task_foo._upstream_task_codes == set()
+        and task_foo._downstream_task_codes.pop() == task_bar.code
+    )
+    assert (
+        task_bar is not None
+        and task_bar._upstream_task_codes.pop() == task_foo.code
+        and task_bar._downstream_task_codes == set()
+    )
+
+
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
+)
+def test_multiple_tasks_inside(mock_code):
+    """Test multiple decorator tasks which inside process definition."""
+    with ProcessDefinition(PD_NAME) as pd:
+
+        @task
+        def foo():
+            print(TASK_NAME)
+
+        @task
+        def bar():
+            print(TASK_NAME)
+
+        foo = foo()
+        bar = bar()
+
+        foo >> bar
+
+    assert pd is not None and pd.name == PD_NAME
+    assert len(pd.tasks) == 2
+
+    task_foo = pd.get_one_task_by_name("foo")
+    task_bar = pd.get_one_task_by_name("bar")
+    assert set(pd.task_list) == {task_foo, task_bar}
+    assert (
+        task_foo is not None
+        and task_foo._upstream_task_codes == set()
+        and task_foo._downstream_task_codes.pop() == task_bar.code
+    )
+    assert (
+        task_bar is not None
+        and task_bar._upstream_task_codes.pop() == task_foo.code
+        and task_bar._downstream_task_codes == set()
+    )
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
index dbcd2986fb..1cdd85d2cb 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
@@ -26,11 +26,15 @@ from pydolphinscheduler.exceptions import PyDSParamException
 from pydolphinscheduler.tasks.python import Python
 
 
+def foo():  # noqa: D103
+    print("hello world.")
+
+
 @pytest.mark.parametrize(
     "attr, expect",
     [
         (
-            {"code": "print(1)"},
+            {"definition": "print(1)"},
             {
                 "rawScript": "print(1)",
                 "localParams": [],
@@ -39,7 +43,29 @@ from pydolphinscheduler.tasks.python import Python
                 "waitStartTimeout": {},
                 "conditionResult": {"successNode": [""], "failedNode": [""]},
             },
-        )
+        ),
+        (
+            {"definition": "def foo():\n    print('I am foo')"},
+            {
+                "rawScript": "def foo():\n    print('I am foo')\nfoo()",
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "waitStartTimeout": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+            },
+        ),
+        (
+            {"definition": foo},
+            {
+                "rawScript": 'def foo():  # noqa: D103\n    print("hello world.")\nfoo()',
+                "localParams": [],
+                "resourceList": [],
+                "dependence": {},
+                "waitStartTimeout": {},
+                "conditionResult": {"successNode": [""], "failedNode": [""]},
+            },
+        ),
     ],
 )
 @patch(
@@ -66,15 +92,13 @@ def test_property_task_params(mock_code_version, attr, expect):
 def test_python_task_not_support_code(mock_code, script_code):
     """Test python task parameters."""
     name = "not_support_code_type"
-    with pytest.raises(PyDSParamException, match="Parameter code do not support .*?"):
+    with pytest.raises(
+        PyDSParamException, match="Parameter definition do not support .*?"
+    ):
         task = Python(name, script_code)
         task.raw_script
 
 
-def foo():  # noqa: D103
-    print("hello world.")
-
-
 @pytest.mark.parametrize(
     "name, script_code, raw",
     [
@@ -82,7 +106,7 @@ def foo():  # noqa: D103
         (
             "function_define",
             foo,
-            'def foo():  # noqa: D103\n    print("hello world.")\n',
+            'def foo():  # noqa: D103\n    print("hello world.")\nfoo()',
         ),
     ],
 )
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py b/dolphinscheduler-python/pydolphinscheduler/tests/testing/decorator.py
similarity index 73%
copy from dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
copy to dolphinscheduler-python/pydolphinscheduler/tests/testing/decorator.py
index 31dc9446d8..78078ee863 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/testing/decorator.py
@@ -15,14 +15,18 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Init pydolphinscheduler.core package."""
+"""Decorator module for testing module."""
 
-from pydolphinscheduler.core.database import Database
-from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.core.task import Task
+import types
+from functools import wraps
 
-__all__ = [
-    "ProcessDefinition",
-    "Task",
-    "Database",
-]
+
+def foo(func: types.FunctionType):
+    """Decorate which do nothing for testing module."""
+
+    @wraps(func)
+    def wrapper():
+        print("foo decorator called.")
+        func()
+
+    return wrapper