You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/04/15 07:50:57 UTC
[dolphinscheduler] branch dev updated: [python] Add task decorator for python function (#9496)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 1f48601c75 [python] Add task decorator for python function (#9496)
1f48601c75 is described below
commit 1f48601c759df8dae82e966bed1a346416c7449a
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Fri Apr 15 15:50:52 2022 +0800
[python] Add task decorator for python function (#9496)
* [python] Add task decorator for python function
* Add decorator `@task`
* Add a tutorial about it
* Change tutorial doc and combine into traditional docs
* Add sphinx-inline-tab for better view
* revert not need change
* Correct python function indent
* Correct integration test
---
.../pydolphinscheduler/docs/source/conf.py | 2 +
.../pydolphinscheduler/docs/source/howto/index.rst | 8 +-
.../pydolphinscheduler/docs/source/start.rst | 9 +-
.../docs/source/tasks/{index.rst => func_wrap.rst} | 40 ++--
.../pydolphinscheduler/docs/source/tasks/index.rst | 1 +
.../pydolphinscheduler/docs/source/tutorial.rst | 233 ++++++++++++++-------
.../pydolphinscheduler/setup.py | 1 +
.../src/pydolphinscheduler/core/__init__.py | 2 +
.../examples/tutorial_decorator.py | 91 ++++++++
.../src/pydolphinscheduler/tasks/func_wrap.py | 61 ++++++
.../src/pydolphinscheduler/tasks/python.py | 71 ++++++-
.../tests/example/test_example.py | 4 +-
.../tests/integration/test_submit_examples.py | 5 +-
.../tests/tasks/test_func_wrap.py | 169 +++++++++++++++
.../pydolphinscheduler/tests/tasks/test_python.py | 40 +++-
.../__init__.py => tests/testing/decorator.py} | 22 +-
16 files changed, 620 insertions(+), 139 deletions(-)
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py b/dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py
index e22b3bb1b8..efede5c299 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/conf.py
@@ -57,6 +57,8 @@ extensions = [
"sphinx_rtd_theme",
# Documenting command line interface
"sphinx_click.ext",
+ # Add inline tabbed content
+ "sphinx_inline_tabs",
]
# Add any paths that contain templates here, relative to this directory.
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/howto/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/howto/index.rst
index e83b1631cf..a0b3c29c0c 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/howto/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/howto/index.rst
@@ -15,10 +15,14 @@
specific language governing permissions and limitations
under the License.
-How To
+HOWTOs
======
-In this section
+pydolphinscheduler HOWTOs are documents that cover a single, specific topic, and attempt to cover it fairly
+completely. This collection is an effort to foster documentation that is more detailed than the :doc:`../concept`
+and :doc:`../tutorial`.
+
+Currently, the HOWTOs are:
.. toctree::
:maxdepth: 2
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst
index e411d7bf72..6663c085e9 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/start.rst
@@ -137,13 +137,16 @@ from the API server, you should first change pydolphinscheduler configuration an
You could see more information in :doc:`config` about all the configurations pydolphinscheduler supported.
+After that, you could go and see your DolphinScheduler web UI to find out a new workflow created by pydolphinscheduler,
+and the path of web UI is `Project -> Workflow -> Workflow Definition`.
+
What's More
-----------
-If you do not familiar with *PyDolphinScheduler*, you could go to :doc:`tutorial`
-and see how it work. But if you already know the inside of *PyDolphinScheduler*,
-maybe you could go and play with all :doc:`tasks/index` *PyDolphinScheduler* supports.
+If you do not familiar with *PyDolphinScheduler*, you could go to :doc:`tutorial` and see how it works. But
+if you already know the basic usage or concept of *PyDolphinScheduler*, you could go and play with all
+:doc:`tasks/index` *PyDolphinScheduler* supports, or see our :doc:`howto/index` about useful cases.
.. _`instructions for all platforms here`: https://wiki.python.org/moin/BeginnersGuide/Download
.. _`Apache DolphinScheduler`: https://dolphinscheduler.apache.org
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst
similarity index 69%
copy from dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
copy to dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst
index 42dcdf9c8c..5f41b80cfd 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/func_wrap.rst
@@ -15,27 +15,19 @@
specific language governing permissions and limitations
under the License.
-Tasks
-=====
-
-In this section
-
-.. toctree::
- :maxdepth: 1
-
- shell
- sql
- python
- http
-
- switch
- condition
- dependent
-
- spark
- flink
- map_reduce
- procedure
-
- datax
- sub_process
+Python Function Wrapper
+=======================
+
+A decorator covert Python function into pydolphinscheduler's task.
+
+Example
+-------
+
+.. literalinclude:: ../../../src/pydolphinscheduler/examples/tutorial_decorator.py
+ :start-after: [start tutorial]
+ :end-before: [end tutorial]
+
+Dive Into
+---------
+
+.. automodule:: pydolphinscheduler.tasks.func_wrap
\ No newline at end of file
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
index 42dcdf9c8c..d6bbb960c1 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tasks/index.rst
@@ -23,6 +23,7 @@ In this section
.. toctree::
:maxdepth: 1
+ func_wrap
shell
sql
python
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
index e0f22fb816..6366c803bb 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/tutorial.rst
@@ -18,129 +18,202 @@
Tutorial
========
-This tutorial show you the basic concept of *PyDolphinScheduler* and tell all
+This tutorial shows you the basic concept of *PyDolphinScheduler* and tells all
things you should know before you submit or run your first workflow. If you
-still not install *PyDolphinScheduler* and start Apache DolphinScheduler, you
-could go and see :ref:`how to getting start PyDolphinScheduler <start:getting started>`
+still have not installed *PyDolphinScheduler* and start DolphinScheduler, you
+could go and see :ref:`how to getting start PyDolphinScheduler <start:getting started>` firstly.
Overview of Tutorial
--------------------
-Here have an overview of our tutorial, and it look a little complex but do not
-worry about that because we explain this example below as detailed as possible.
+Here have an overview of our tutorial, and it looks a little complex but does not
+worry about that because we explain this example below as detail as possible.
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
- :start-after: [start tutorial]
- :end-before: [end tutorial]
+There are two types of tutorials: traditional and task decorator.
+
+- **Traditional Way**: More general, support many :doc:`built-in task types <tasks/index>`, it is convenient
+ when you build your workflow at the beginning.
+- **Task Decorator**: A Python decorator allow you to wrap your function into pydolphinscheduler's task. Less
+ versatility to the traditional way because it only supported Python functions and without build-in tasks
+ supported. But it is helpful if your workflow is all built with Python or if you already have some Python
+ workflow code and want to migrate them to pydolphinscheduler.
+
+.. tab:: Tradition
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+ :dedent: 0
+ :start-after: [start tutorial]
+ :end-before: [end tutorial]
+
+.. tab:: Task Decorator
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+ :dedent: 0
+ :start-after: [start tutorial]
+ :end-before: [end tutorial]
Import Necessary Module
-----------------------
-First of all, we should importing necessary module which we would use later just
-like other Python package. We just create a minimum demo here, so we just import
-:class:`pydolphinscheduler.core.process_definition` and
-:class:`pydolphinscheduler.tasks.shell`.
+First of all, we should import the necessary module which we would use later just like other Python packages.
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
- :start-after: [start package_import]
- :end-before: [end package_import]
+.. tab:: Tradition
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+ :dedent: 0
+ :start-after: [start package_import]
+ :end-before: [end package_import]
-If you want to use other task type you could click and
-:doc:`see all tasks we support <tasks/index>`
+ In tradition tutorial we import :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` and
+ :class:`pydolphinscheduler.tasks.shell.Shell`.
+
+ If you want to use other task type you could click and :doc:`see all tasks we support <tasks/index>`
+
+.. tab:: Task Decorator
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+ :dedent: 0
+ :start-after: [start package_import]
+ :end-before: [end package_import]
+
+ In task decorator tutorial we import :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` and
+ :func:`pydolphinscheduler.tasks.func_wrap.task`.
Process Definition Declaration
------------------------------
-We should instantiate object after we import them from `import necessary module`_.
-Here we declare basic arguments for process definition(aka, workflow). We define
-the name of process definition, using `Python context manager`_ and it
-**the only required argument** for object process definition. Beside that we also
-declare three arguments named `schedule`, `start_time` which setting workflow schedule
-interval and schedule start_time, and argument `tenant` which changing workflow's
-task running user in the worker, :ref:`section tenant <concept:tenant>` in *PyDolphinScheduler*
-:doc:`concept` page have more detail information.
+We should instantiate :class:`pydolphinscheduler.core.process_definition.ProcessDefinition` object after we
+import them from `import necessary module`_. Here we declare basic arguments for process definition(aka, workflow).
+We define the name of :code:`ProcessDefinition`, using `Python context manager`_ and it **the only required argument**
+for `ProcessDefinition`. Besides, we also declare three arguments named :code:`schedule` and :code:`start_time`
+which setting workflow schedule interval and schedule start_time, and argument :code:`tenant` defines which tenant
+will be running this task in the DolphinScheduler worker. See :ref:`section tenant <concept:tenant>` in
+*PyDolphinScheduler* :doc:`concept` for more information.
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
- :start-after: [start workflow_declare]
- :end-before: [end workflow_declare]
+.. tab:: Tradition
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+ :dedent: 0
+ :start-after: [start workflow_declare]
+ :end-before: [end workflow_declare]
+
+.. tab:: Task Decorator
-We could find more detail about process definition in
-:ref:`concept about process definition <concept:process definition>` if you interested in it.
-For all arguments of object process definition, you could find in the
-:class:`pydolphinscheduler.core.process_definition` api documentation.
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+ :dedent: 0
+ :start-after: [start workflow_declare]
+ :end-before: [end workflow_declare]
+
+We could find more detail about :code:`ProcessDefinition` in :ref:`concept about process definition <concept:process definition>`
+if you are interested in it. For all arguments of object process definition, you could find in the
+:class:`pydolphinscheduler.core.process_definition` API documentation.
Task Declaration
----------------
-Here we declare four tasks, and bot of them are simple task of
-:class:`pydolphinscheduler.tasks.shell` which running `echo` command in terminal.
-Beside the argument `command`, we also need setting argument `name` for each task *(not
-only shell task, `name` is required for each type of task)*.
+.. tab:: Tradition
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
- :dedent: 0
- :start-after: [start task_declare]
- :end-before: [end task_declare]
+ We declare four tasks to show how to create tasks, and both of them are simple tasks of
+ :class:`pydolphinscheduler.tasks.shell` which runs `echo` command in the terminal. Besides the argument
+ `command` with :code:`echo` command, we also need to set the argument `name` for each task
+ *(not only shell task, `name` is required for each type of task)*.
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+ :dedent: 0
+ :start-after: [start task_declare]
+ :end-before: [end task_declare]
+
+ Besides shell task, *PyDolphinScheduler* supports multiple tasks and you could find in :doc:`tasks/index`.
+
+.. tab:: Task Decorator
-Beside shell task, *PyDolphinScheduler* support multiple tasks and you could
-find in :doc:`tasks/index`.
+ We declare four tasks to show how to create tasks, and both of them are created by the task decorator which
+ using :func:`pydolphinscheduler.tasks.func_wrap.task`. All we have to do is add a decorator named
+ :code:`@task` to existing Python function, and then use them inside :class:`pydolphinscheduler.core.process_definition`
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+ :dedent: 0
+ :start-after: [start task_declare]
+ :end-before: [end task_declare]
+
+ It makes our workflow more Pythonic, but be careful that when we use task decorator mode mean we only use
+ Python function as a task and could not use the :doc:`built-in tasks <tasks/index>` most of the cases.
Setting Task Dependence
-----------------------
-After we declare both process definition and task, we have one workflow with
-four tasks, both all tasks is independent so that they would run in parallel.
-We should reorder the sort and the dependence of tasks. It useful when we need
-run prepare task before we run actual task or we need tasks running is specific
-rule. We both support attribute `set_downstream` and `set_upstream`, or bitwise
-operators `>>` and `<<`.
+After we declare both process definition and task, we have four tasks that are independent and will be running
+in parallel. If you want to start one task until some task is finished, you have to set dependence on those
+tasks.
-In this example, we set task `task_parent` is the upstream task of task
-`task_child_one` and `task_child_two`, and task `task_union` is the downstream
-task of both these two task.
+Set task dependence is quite easy by task's attribute :code:`set_downstream` and :code:`set_upstream` or by
+bitwise operators :code:`>>` and :code:`<<`
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
- :dedent: 0
- :start-after: [start task_relation_declare]
- :end-before: [end task_relation_declare]
+In this tutorial, task `task_parent` is the leading task of the whole workflow, then task `task_child_one` and
+task `task_child_two` are its downstream tasks. Task `task_union` will not run unless both task `task_child_one`
+and task `task_child_two` was done, because both two task is `task_union`'s upstream.
+
+.. tab:: Tradition
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+ :dedent: 0
+ :start-after: [start task_relation_declare]
+ :end-before: [end task_relation_declare]
-Please notice that we could grouping some tasks and set dependence if they have
-same downstream or upstream. We declare task `task_child_one` and `task_child_two`
-as a group here, named as `task_group` and set task `task_parent` as upstream of
-both of them. You could see more detail in :ref:`concept:Tasks Dependence` section in concept
-documentation.
+.. tab:: Task Decorator
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+ :dedent: 0
+ :start-after: [start task_relation_declare]
+ :end-before: [end task_relation_declare]
+
+.. note::
+
+ We could set task dependence in batch mode if they have the same downstream or upstream by declaring those
+ tasks as task groups. In tutorial, We declare task `task_child_one` and `task_child_two` as task group named
+ `task_group`, then set `task_group` as downstream of task `task_parent`. You could see more detail in
+ :ref:`concept:Tasks Dependence` for more detail about how to set task dependence.
Submit Or Run Workflow
----------------------
-Now we finish our workflow definition, with task and task dependence, but all
-these things are in local, we should let Apache DolphinScheduler daemon know what we
-define our workflow. So the last thing we have to do here is submit our workflow to
-Apache DolphinScheduler daemon.
+After that, we finish our workflow definition, with four tasks and task dependence, but all these things are
+local, we should let the DolphinScheduler daemon know how the definition of workflow. So the last thing we
+have to do is submit the workflow to the DolphinScheduler daemon.
-We here in the example using `ProcessDefinition` attribute `run` to submit workflow
-to the daemon, and set the schedule time we just declare in `process definition declaration`_.
+Fortunately, we have a convenient method to submit workflow via `ProcessDefinition` attribute :code:`run` which
+will create workflow definition as well as workflow schedule.
-Now, we could run the Python code like other Python script, for the basic usage run
-:code:`python tutorial.py` to trigger and run it.
+.. tab:: Tradition
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
+ :dedent: 0
+ :start-after: [start submit_or_run]
+ :end-before: [end submit_or_run]
-.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
- :dedent: 0
- :start-after: [start submit_or_run]
- :end-before: [end submit_or_run]
+.. tab:: Task Decorator
+
+ .. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial_decorator.py
+ :dedent: 0
+ :start-after: [start submit_or_run]
+ :end-before: [end submit_or_run]
+
+At last, we could execute this workflow code in your terminal like other Python scripts, running
+:code:`python tutorial.py` to trigger and execute it.
+
+.. note::
-If you not start your Apache DolphinScheduler server, you could find the way in
-:ref:`start:start Python gateway service` and it would have more detail about related server
-start. Beside attribute `run`, we have attribute `submit` for object `ProcessDefinition`
-and it just submit workflow to the daemon but not setting the schedule information. For
-more detail you could see :ref:`concept:process definition`.
+ If you do not start your DolphinScheduler API server, you could find how to start it in
+ :ref:`start:start Python gateway service` for more detail. Besides attribute :code:`run`, we have attribute
+ :code:`submit` for object `ProcessDefinition` which just submits workflow to the daemon but does not set
+ the workflow schedule information. For more detail, you could see :ref:`concept:process definition`.
DAG Graph After Tutorial Run
----------------------------
-After we run the tutorial code, you could login Apache DolphinScheduler web UI,
-go and see the `DolphinScheduler project page`_. they is a new process definition be
-created and named "Tutorial". It create by *PyDolphinScheduler* and the DAG graph as below
+After we run the tutorial code, you could log in DolphinScheduler web UI, go and see the
+`DolphinScheduler project page`_. They is a new process definition be created by *PyDolphinScheduler* and it
+named "tutorial" or "tutorial_decorator". The task graph of workflow like below:
.. literalinclude:: ../../src/pydolphinscheduler/examples/tutorial.py
:language: text
diff --git a/dolphinscheduler-python/pydolphinscheduler/setup.py b/dolphinscheduler-python/pydolphinscheduler/setup.py
index 9632bb1fd9..62f20a54ce 100644
--- a/dolphinscheduler-python/pydolphinscheduler/setup.py
+++ b/dolphinscheduler-python/pydolphinscheduler/setup.py
@@ -51,6 +51,7 @@ doc = [
"sphinx>=4.3",
"sphinx_rtd_theme>=1.0",
"sphinx-click>=3.0",
+ "sphinx-inline-tabs",
]
test = [
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
index 31dc9446d8..7497d1f289 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
@@ -18,10 +18,12 @@
"""Init pydolphinscheduler.core package."""
from pydolphinscheduler.core.database import Database
+from pydolphinscheduler.core.engine import Engine
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.core.task import Task
__all__ = [
+ "Engine",
"ProcessDefinition",
"Task",
"Database",
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_decorator.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_decorator.py
new file mode 100644
index 0000000000..986c1bbb6e
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/examples/tutorial_decorator.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+r"""
+A tutorial example take you to experience pydolphinscheduler.
+
+After tutorial.py file submit to Apache DolphinScheduler server a DAG would be create,
+and workflow DAG graph as below:
+
+ --> task_child_one
+ / \
+task_parent --> --> task_union
+ \ /
+ --> task_child_two
+
+it will instantiate and run all the task it have.
+"""
+
+# [start tutorial]
+# [start package_import]
+# Import ProcessDefinition object to define your workflow attributes
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+
+# Import task Shell object cause we would create some shell tasks later
+from pydolphinscheduler.tasks.func_wrap import task
+
+# [end package_import]
+
+
+# [start task_declare]
+@task
+def task_parent():
+ """First task in this workflow."""
+ print("echo hello pydolphinscheduler")
+
+
+@task
+def task_child_one():
+ """Child task will be run parallel after task ``task_parent`` finished."""
+ print("echo 'child one'")
+
+
+@task
+def task_child_two():
+ """Child task will be run parallel after task ``task_parent`` finished."""
+ print("echo 'child two'")
+
+
+@task
+def task_union():
+ """Last task in this workflow."""
+ print("echo union")
+
+
+# [end task_declare]
+
+
+# [start workflow_declare]
+with ProcessDefinition(
+ name="tutorial_decorator",
+ schedule="0 0 0 * * ? *",
+ start_time="2021-01-01",
+ tenant="tenant_exists",
+) as pd:
+ # [end workflow_declare]
+
+ # [start task_relation_declare]
+ task_group = [task_child_one(), task_child_two()]
+ task_parent().set_downstream(task_group)
+
+ task_union() << task_group
+ # [end task_relation_declare]
+
+ # [start submit_or_run]
+ pd.run()
+ # [end submit_or_run]
+# [end tutorial]
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/func_wrap.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/func_wrap.py
new file mode 100644
index 0000000000..c0b73a1fc2
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/func_wrap.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Task function wrapper allows using decorator to create a task."""
+
+import functools
+import inspect
+import itertools
+import types
+
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.tasks.python import Python
+
+
+def _get_func_str(func: types.FunctionType) -> str:
+ """Get Python function string without indent from decorator.
+
+ :param func: The function which wraps by decorator ``@task``.
+ """
+ lines = inspect.getsourcelines(func)[0]
+
+ src_strip = ""
+ lead_space_num = None
+ for line in lines:
+ if lead_space_num is None:
+ lead_space_num = sum(1 for _ in itertools.takewhile(str.isspace, line))
+ if line.strip() == "@task":
+ continue
+ elif line.strip().startswith("@"):
+ raise PyDSParamException(
+ "Do no support other decorators for function ``task`` decorator."
+ )
+ src_strip += line[lead_space_num:]
+ return src_strip
+
+
+def task(func: types.FunctionType):
+ """Decorate which covert Python function into pydolphinscheduler task."""
+
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ func_str = _get_func_str(func)
+ return Python(
+ name=kwargs.get("name", func.__name__), definition=func_str, *args, **kwargs
+ )
+
+ return wrapper
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
index 79504808c8..52903d48d9 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
@@ -18,34 +18,85 @@
"""Task Python."""
import inspect
+import logging
+import re
import types
-from typing import Any
+from typing import Union
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSParamException
+log = logging.getLogger(__file__)
+
class Python(Task):
- """Task Python object, declare behavior for Python task to dolphinscheduler."""
+ """Task Python object, declare behavior for Python task to dolphinscheduler.
+
+ Python task support two types of parameters for :param:``code``, and here is an example:
+
+ Using str type of :param:``code``
+
+ .. code-block:: python
+
+ python_task = Python(name="str_type", code="print('Hello Python task.')")
+
+ Or using Python callable type of :param:``code``
+
+ .. code-block:: python
+
+ def foo():
+ print("Hello Python task.")
+
+ python_task = Python(name="str_type", code=foo)
+
+ :param name: The name for Python task. It define the task name.
+ :param definition: String format of Python script you want to execute or Python callable you
+ want to execute.
+ """
_task_custom_attr = {
"raw_script",
}
- def __init__(self, name: str, code: Any, *args, **kwargs):
+ def __init__(
+ self, name: str, definition: Union[str, types.FunctionType], *args, **kwargs
+ ):
super().__init__(name, TaskType.PYTHON, *args, **kwargs)
- self._code = code
+ self.definition = definition
+
+ def _build_exe_str(self) -> str:
+ """Build executable string from given definition.
+
+ Attribute ``self.definition`` almost is a function, we need to call this function after parsing it
+ to string. The easier way to call a function is using syntax ``func()`` and we use it to call it too.
+ """
+ if isinstance(self.definition, types.FunctionType):
+ py_function = inspect.getsource(self.definition)
+ func_str = f"{py_function}{self.definition.__name__}()"
+ else:
+ pattern = re.compile("^def (\\w+)\\(")
+ find = pattern.findall(self.definition)
+ if not find:
+ log.warning(
+ "Python definition is simple script instead of function, with value %s",
+ self.definition,
+ )
+ return self.definition
+ # Keep function str and function callable always have one blank line
+ func_str = (
+ f"{self.definition}{find[0]}()"
+ if self.definition.endswith("\n")
+ else f"{self.definition}\n{find[0]}()"
+ )
+ return func_str
@property
def raw_script(self) -> str:
"""Get python task define attribute `raw_script`."""
- if isinstance(self._code, str):
- return self._code
- elif isinstance(self._code, types.FunctionType):
- py_function = inspect.getsource(self._code)
- return py_function
+ if isinstance(self.definition, (str, types.FunctionType)):
+ return self._build_exe_str()
else:
raise PyDSParamException(
- "Parameter code do not support % for now.", type(self._code)
+ "Parameter definition do not support % for now.", type(self.definition)
)
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py b/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
index 5bf897f560..70f367767c 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/example/test_example.py
@@ -44,7 +44,7 @@ def test_task_without_example():
Avoiding add new type of tasks but without adding example describe how to use it.
"""
# We use example/tutorial.py as shell task example
- ignore_name = {"__init__.py", "shell.py"}
+ ignore_name = {"__init__.py", "shell.py", "func_wrap.py"}
all_tasks = {task.stem for task in get_tasks(ignore_name=ignore_name)}
have_example_tasks = set()
@@ -97,7 +97,7 @@ def test_example_basic():
), f"We expect all examples is python script, but get {ex.name}."
# All except tutorial and __init__ is end with keyword "_example"
- if ex.stem != "tutorial" and ex.stem != "__init__":
+ if ex.stem not in ("tutorial", "tutorial_decorator") and ex.stem != "__init__":
assert ex.stem.endswith(
"_example"
), f"We expect all examples script end with keyword '_example', but get {ex.stem}."
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
index 85e5e23e31..218fa4a55c 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
@@ -18,6 +18,7 @@
"""Test whether success submit examples DAG to PythonGatewayService."""
from pathlib import Path
+from subprocess import Popen
import pytest
@@ -36,6 +37,8 @@ from tests.testing.path import path_example
def test_exec_white_list_example(example_path: Path):
"""Test execute examples and submit DAG to PythonGatewayService."""
try:
- exec(example_path.read_text())
+ # Because our task decorator used module ``inspect`` to get the source, and it will
+ # raise IOError when call it by built-in function ``exec``, so we change to ``subprocess.Popen``
+ Popen(["python", str(example_path)])
except Exception:
raise Exception("Run example %s failed.", example_path.stem)
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_func_wrap.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_func_wrap.py
new file mode 100644
index 0000000000..628b6e7f86
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_func_wrap.py
@@ -0,0 +1,169 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test module about function wrap task decorator."""
+
+from unittest.mock import patch
+
+import pytest
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.exceptions import PyDSParamException
+from pydolphinscheduler.tasks.func_wrap import task
+from tests.testing.decorator import foo as foo_decorator
+from tests.testing.task import Task
+
+PD_NAME = "test_process_definition"
+TASK_NAME = "test_task"
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
+)
+def test_single_task_outside(mock_code):
+ """Test single decorator task which outside process definition."""
+
+ @task
+ def foo():
+ print(TASK_NAME)
+
+ with ProcessDefinition(PD_NAME) as pd:
+ foo()
+
+ assert pd is not None and pd.name == PD_NAME
+ assert len(pd.tasks) == 1
+
+ pd_task = pd.tasks[12345]
+ assert pd_task.name == "foo"
+ assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()"
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
+)
+def test_single_task_inside(mock_code):
+ """Test single decorator task which inside process definition."""
+ with ProcessDefinition(PD_NAME) as pd:
+
+ @task
+ def foo():
+ print(TASK_NAME)
+
+ foo()
+
+ assert pd is not None and pd.name == PD_NAME
+ assert len(pd.tasks) == 1
+
+ pd_task = pd.tasks[12345]
+ assert pd_task.name == "foo"
+ assert pd_task.raw_script == "def foo():\n print(TASK_NAME)\nfoo()"
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(12345, 1)
+)
+def test_addition_decorator_error(mock_code):
+ """Test error when using task decorator to a function already have decorator."""
+
+ @task
+ @foo_decorator
+ def foo():
+ print(TASK_NAME)
+
+ with ProcessDefinition(PD_NAME) as pd: # noqa: F841
+ with pytest.raises(
+ PyDSParamException, match="Do no support other decorators for.*"
+ ):
+ foo()
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
+)
+def test_multiple_tasks_outside(mock_code):
+ """Test multiple decorator tasks which outside process definition."""
+
+ @task
+ def foo():
+ print(TASK_NAME)
+
+ @task
+ def bar():
+ print(TASK_NAME)
+
+ with ProcessDefinition(PD_NAME) as pd:
+ foo = foo()
+ bar = bar()
+
+ foo >> bar
+
+ assert pd is not None and pd.name == PD_NAME
+ assert len(pd.tasks) == 2
+
+ task_foo = pd.get_one_task_by_name("foo")
+ task_bar = pd.get_one_task_by_name("bar")
+ assert set(pd.task_list) == {task_foo, task_bar}
+ assert (
+ task_foo is not None
+ and task_foo._upstream_task_codes == set()
+ and task_foo._downstream_task_codes.pop() == task_bar.code
+ )
+ assert (
+ task_bar is not None
+ and task_bar._upstream_task_codes.pop() == task_foo.code
+ and task_bar._downstream_task_codes == set()
+ )
+
+
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ side_effect=Task("test_func_wrap", "func_wrap").gen_code_and_version,
+)
+def test_multiple_tasks_inside(mock_code):
+ """Test multiple decorator tasks which inside process definition."""
+ with ProcessDefinition(PD_NAME) as pd:
+
+ @task
+ def foo():
+ print(TASK_NAME)
+
+ @task
+ def bar():
+ print(TASK_NAME)
+
+ foo = foo()
+ bar = bar()
+
+ foo >> bar
+
+ assert pd is not None and pd.name == PD_NAME
+ assert len(pd.tasks) == 2
+
+ task_foo = pd.get_one_task_by_name("foo")
+ task_bar = pd.get_one_task_by_name("bar")
+ assert set(pd.task_list) == {task_foo, task_bar}
+ assert (
+ task_foo is not None
+ and task_foo._upstream_task_codes == set()
+ and task_foo._downstream_task_codes.pop() == task_bar.code
+ )
+ assert (
+ task_bar is not None
+ and task_bar._upstream_task_codes.pop() == task_foo.code
+ and task_bar._downstream_task_codes == set()
+ )
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
index dbcd2986fb..1cdd85d2cb 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
@@ -26,11 +26,15 @@ from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.tasks.python import Python
+def foo(): # noqa: D103
+ print("hello world.")
+
+
@pytest.mark.parametrize(
"attr, expect",
[
(
- {"code": "print(1)"},
+ {"definition": "print(1)"},
{
"rawScript": "print(1)",
"localParams": [],
@@ -39,7 +43,29 @@ from pydolphinscheduler.tasks.python import Python
"waitStartTimeout": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
},
- )
+ ),
+ (
+ {"definition": "def foo():\n print('I am foo')"},
+ {
+ "rawScript": "def foo():\n print('I am foo')\nfoo()",
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "waitStartTimeout": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ },
+ ),
+ (
+ {"definition": foo},
+ {
+ "rawScript": 'def foo(): # noqa: D103\n print("hello world.")\nfoo()',
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "waitStartTimeout": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ },
+ ),
],
)
@patch(
@@ -66,15 +92,13 @@ def test_property_task_params(mock_code_version, attr, expect):
def test_python_task_not_support_code(mock_code, script_code):
"""Test python task parameters."""
name = "not_support_code_type"
- with pytest.raises(PyDSParamException, match="Parameter code do not support .*?"):
+ with pytest.raises(
+ PyDSParamException, match="Parameter definition do not support .*?"
+ ):
task = Python(name, script_code)
task.raw_script
-def foo(): # noqa: D103
- print("hello world.")
-
-
@pytest.mark.parametrize(
"name, script_code, raw",
[
@@ -82,7 +106,7 @@ def foo(): # noqa: D103
(
"function_define",
foo,
- 'def foo(): # noqa: D103\n print("hello world.")\n',
+ 'def foo(): # noqa: D103\n print("hello world.")\nfoo()',
),
],
)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py b/dolphinscheduler-python/pydolphinscheduler/tests/testing/decorator.py
similarity index 73%
copy from dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
copy to dolphinscheduler-python/pydolphinscheduler/tests/testing/decorator.py
index 31dc9446d8..78078ee863 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/testing/decorator.py
@@ -15,14 +15,18 @@
# specific language governing permissions and limitations
# under the License.
-"""Init pydolphinscheduler.core package."""
+"""Decorator module for testing module."""
-from pydolphinscheduler.core.database import Database
-from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.core.task import Task
+import types
+from functools import wraps
-__all__ = [
- "ProcessDefinition",
- "Task",
- "Database",
-]
+
+def foo(func: types.FunctionType):
+ """Decorate which do nothing for testing module."""
+
+ @wraps(func)
+ def wrapper():
+ print("foo decorator called.")
+ func()
+
+ return wrapper