You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2023/10/12 11:49:45 UTC

[dolphinscheduler-sdk-python] branch main updated: feat: Add task group to task class (#114)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git


The following commit(s) were added to refs/heads/main by this push:
     new bb9e428  feat: Add task group to task class (#114)
bb9e428 is described below

commit bb9e42818ccfac4b9cbfb002b6114ddea17874f2
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Thu Oct 12 19:49:40 2023 +0800

    feat: Add task group to task class (#114)
    
    ```py
       extract = Shell(
          name="extract",
          command="echo 'Some extract command here'",
          task_group_id=1,
          task_group_priority=123
       )
    ```
    
    fix: #106
---
 docs/source/concept.rst             | 35 +++++++++++++++++++++++++++++++++++
 src/pydolphinscheduler/core/task.py |  9 +++++++++
 tests/core/test_engine.py           |  2 ++
 tests/core/test_task.py             |  2 ++
 4 files changed, 48 insertions(+)

diff --git a/docs/source/concept.rst b/docs/source/concept.rst
index 0d25ea5..ab8df5e 100644
--- a/docs/source/concept.rst
+++ b/docs/source/concept.rst
@@ -193,6 +193,41 @@ decide workflow of task. You could set `workflow` in both normal assign or in co
 
 With both `Workflow`_, `Tasks`_  and `Tasks Dependence`_, we could build a workflow with multiple tasks.
 
+Task Group
+~~~~~~~~~~
+
+A task group can manage and control the maximum number of concurrently running tasks. This is particularly
+useful when you want to limit the simultaneous execution of various task types. For instance, in an ETL
+(Extract, Transform, Load) job where data is extracted from a source database, it's crucial to control the
+parallelism of extract tasks to prevent an excessive number of connections to the source database. This is
+where a task group comes into play. There are two key parameters, ``task_group_id`` and ``task_group_priority``
+that determine the behavior of the task group.
+
+Task group can control the maximum number of tasks running at the same time. It is useful when you don't want
+to run too many type of tasks at the same time. For example when you extract data from source database in ELT
+job, you want to control the parallelism of extract task to avoid too many connections to source database.
+Then task group can help you. There are two major parameters ``task_group_id`` and ``task_group_priority``
+to control the behavior of task group.
+
+* ``task_group_id``: is an integer used to identify the task group. You can set a ``task_group_id`` to
+  restrict the parallelism of tasks. The ``task_group_id`` can be find in the DolphinScheduler web UI. The
+  default value is ``0``, which means there are no restrictions for this task group.
+* ``task_group_priority``: is an integer used to define the priority of the task group. When different tasks
+  share the same ``task_group_id``, the task group's priority comes into play, controlling the order in which
+  they run. Higher values indicate higher priority. The default value is ``0``, which means there's no
+  specific priority for this task group, and tasks will run in the order they were created.
+
+Here's an example in Python:
+
+.. code-block:: python
+
+   extract = Shell(
+      name="extract",
+      command="echo 'Some extract command here'",
+      task_group_id=1,
+      task_group_priority=123
+   )
+
 Resource Files
 --------------
 
diff --git a/src/pydolphinscheduler/core/task.py b/src/pydolphinscheduler/core/task.py
index 3341040..9ec53f0 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -92,6 +92,9 @@ class Task(Base):
     :param task_priority: default TaskPriority.MEDIUM
     :param worker_group: default configuration.WORKFLOW_WORKER_GROUP
     :param environment_name: default None
+    :param task_group_id: Identify of task group to restrict the parallelism of tasks instance run, default 0.
+    :param task_group_priority: Priority for same task group to, the higher the value, the higher the
+        priority, default 0.
     :param delay_time: deault 0
     :param fail_retry_times: default 0
     :param fail_retry_interval: default 1
@@ -120,6 +123,8 @@ class Task(Base):
         "delay_time",
         "fail_retry_times",
         "fail_retry_interval",
+        "task_group_id",
+        "task_group_priority",
         "timeout_flag",
         "timeout_notify_strategy",
         "timeout",
@@ -153,6 +158,8 @@ class Task(Base):
         task_priority: Optional[str] = TaskPriority.MEDIUM,
         worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
         environment_name: Optional[str] = None,
+        task_group_id: Optional[int] = 0,
+        task_group_priority: Optional[int] = 0,
         delay_time: Optional[int] = 0,
         fail_retry_times: Optional[int] = 0,
         fail_retry_interval: Optional[int] = 1,
@@ -177,6 +184,8 @@ class Task(Base):
         self.task_priority = task_priority
         self.worker_group = worker_group
         self._environment_name = environment_name
+        self.task_group_id = task_group_id
+        self.task_group_priority = task_group_priority
         self.fail_retry_times = fail_retry_times
         self.fail_retry_interval = fail_retry_interval
         self.delay_time = delay_time
diff --git a/tests/core/test_engine.py b/tests/core/test_engine.py
index 90a14d0..9c44308 100644
--- a/tests/core/test_engine.py
+++ b/tests/core/test_engine.py
@@ -108,6 +108,8 @@ def test_property_task_params(mock_resource, mock_code_version, attr, expect):
                 "version": 1,
                 "description": None,
                 "delayTime": 0,
+                "taskGroupId": 0,
+                "taskGroupPriority": 0,
                 "taskType": "test-engine",
                 "taskParams": {
                     "mainClass": "org.apache.examples.mock.Mock",
diff --git a/tests/core/test_task.py b/tests/core/test_task.py
index d34b6d2..1a44e6b 100644
--- a/tests/core/test_task.py
+++ b/tests/core/test_task.py
@@ -243,6 +243,8 @@ def test_task_get_define():
         "version": version,
         "description": None,
         "delayTime": 0,
+        "taskGroupId": 0,
+        "taskGroupPriority": 0,
         "taskType": task_type,
         "taskParams": {
             "resourceList": [],