You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2023/10/12 11:49:45 UTC
[dolphinscheduler-sdk-python] branch main updated: feat: Add task group to task class (#114)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new bb9e428 feat: Add task group to task class (#114)
bb9e428 is described below
commit bb9e42818ccfac4b9cbfb002b6114ddea17874f2
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Thu Oct 12 19:49:40 2023 +0800
feat: Add task group to task class (#114)
```py
extract = Shell(
name="extract",
command="echo 'Some extract command here'",
task_group_id=1,
task_group_priority=123
)
```
fix: #106
---
docs/source/concept.rst | 35 +++++++++++++++++++++++++++++++++++
src/pydolphinscheduler/core/task.py | 9 +++++++++
tests/core/test_engine.py | 2 ++
tests/core/test_task.py | 2 ++
4 files changed, 48 insertions(+)
diff --git a/docs/source/concept.rst b/docs/source/concept.rst
index 0d25ea5..ab8df5e 100644
--- a/docs/source/concept.rst
+++ b/docs/source/concept.rst
@@ -193,6 +193,41 @@ decide workflow of task. You could set `workflow` in both normal assign or in co
With both `Workflow`_, `Tasks`_ and `Tasks Dependence`_, we could build a workflow with multiple tasks.
+Task Group
+~~~~~~~~~~
+
+A task group can manage and control the maximum number of concurrently running tasks. This is particularly
+useful when you want to limit the simultaneous execution of various task types. For instance, in an ETL
+(Extract, Transform, Load) job where data is extracted from a source database, it's crucial to control the
+parallelism of extract tasks to prevent an excessive number of connections to the source database. This is
+where a task group comes into play. There are two key parameters, ``task_group_id`` and ``task_group_priority``
+that determine the behavior of the task group.
+
+Task group can control the maximum number of tasks running at the same time. It is useful when you don't want
+to run too many type of tasks at the same time. For example when you extract data from source database in ELT
+job, you want to control the parallelism of extract task to avoid too many connections to source database.
+Then task group can help you. There are two major parameters ``task_group_id`` and ``task_group_priority``
+to control the behavior of task group.
+
+* ``task_group_id``: is an integer used to identify the task group. You can set a ``task_group_id`` to
+ restrict the parallelism of tasks. The ``task_group_id`` can be find in the DolphinScheduler web UI. The
+ default value is ``0``, which means there are no restrictions for this task group.
+* ``task_group_priority``: is an integer used to define the priority of the task group. When different tasks
+ share the same ``task_group_id``, the task group's priority comes into play, controlling the order in which
+ they run. Higher values indicate higher priority. The default value is ``0``, which means there's no
+ specific priority for this task group, and tasks will run in the order they were created.
+
+Here's an example in Python:
+
+.. code-block:: python
+
+ extract = Shell(
+ name="extract",
+ command="echo 'Some extract command here'",
+ task_group_id=1,
+ task_group_priority=123
+ )
+
Resource Files
--------------
diff --git a/src/pydolphinscheduler/core/task.py b/src/pydolphinscheduler/core/task.py
index 3341040..9ec53f0 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -92,6 +92,9 @@ class Task(Base):
:param task_priority: default TaskPriority.MEDIUM
:param worker_group: default configuration.WORKFLOW_WORKER_GROUP
:param environment_name: default None
+ :param task_group_id: Identify of task group to restrict the parallelism of tasks instance run, default 0.
+ :param task_group_priority: Priority for same task group to, the higher the value, the higher the
+ priority, default 0.
:param delay_time: deault 0
:param fail_retry_times: default 0
:param fail_retry_interval: default 1
@@ -120,6 +123,8 @@ class Task(Base):
"delay_time",
"fail_retry_times",
"fail_retry_interval",
+ "task_group_id",
+ "task_group_priority",
"timeout_flag",
"timeout_notify_strategy",
"timeout",
@@ -153,6 +158,8 @@ class Task(Base):
task_priority: Optional[str] = TaskPriority.MEDIUM,
worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
environment_name: Optional[str] = None,
+ task_group_id: Optional[int] = 0,
+ task_group_priority: Optional[int] = 0,
delay_time: Optional[int] = 0,
fail_retry_times: Optional[int] = 0,
fail_retry_interval: Optional[int] = 1,
@@ -177,6 +184,8 @@ class Task(Base):
self.task_priority = task_priority
self.worker_group = worker_group
self._environment_name = environment_name
+ self.task_group_id = task_group_id
+ self.task_group_priority = task_group_priority
self.fail_retry_times = fail_retry_times
self.fail_retry_interval = fail_retry_interval
self.delay_time = delay_time
diff --git a/tests/core/test_engine.py b/tests/core/test_engine.py
index 90a14d0..9c44308 100644
--- a/tests/core/test_engine.py
+++ b/tests/core/test_engine.py
@@ -108,6 +108,8 @@ def test_property_task_params(mock_resource, mock_code_version, attr, expect):
"version": 1,
"description": None,
"delayTime": 0,
+ "taskGroupId": 0,
+ "taskGroupPriority": 0,
"taskType": "test-engine",
"taskParams": {
"mainClass": "org.apache.examples.mock.Mock",
diff --git a/tests/core/test_task.py b/tests/core/test_task.py
index d34b6d2..1a44e6b 100644
--- a/tests/core/test_task.py
+++ b/tests/core/test_task.py
@@ -243,6 +243,8 @@ def test_task_get_define():
"version": version,
"description": None,
"delayTime": 0,
+ "taskGroupId": 0,
+ "taskGroupPriority": 0,
"taskType": task_type,
"taskParams": {
"resourceList": [],