You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/12/13 01:03:24 UTC

[dolphinscheduler-sdk-python] branch main updated: [feat] Mark workflow timeout as type timedelta (#41)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 13a8f90  [feat] Mark workflow timeout as type timedelta (#41)
13a8f90 is described below

commit 13a8f90648254fb33cd7d5766b41e699d8bea8da
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Tue Dec 13 09:03:19 2022 +0800

    [feat] Mark workflow timeout as type timedelta (#41)
    
    * Change task parameter ``timeout`` from int to timedelta
    * Set attribute ``timeout_flag`` base on ``timeout`` value
    * Add updating
---
 README.md                                   |  6 +++---
 UPDATING.md                                 |  1 +
 examples/yaml_define/MoreConfiguration.yaml |  1 -
 src/pydolphinscheduler/constants.py         |  3 ++-
 src/pydolphinscheduler/core/task.py         | 23 +++++++++++++++++++----
 src/pydolphinscheduler/utils/date.py        | 14 +++++++++++++-
 tests/core/test_task.py                     | 24 +++++++++++++++++++++++-
 tests/utils/test_date.py                    | 29 +++++++++++++++++++++++++++--
 8 files changed, 88 insertions(+), 13 deletions(-)

diff --git a/README.md b/README.md
index fcdc944..8f56339 100644
--- a/README.md
+++ b/README.md
@@ -31,7 +31,7 @@ under the License.
 [![Twitter Follow](https://img.shields.io/twitter/follow/dolphinschedule.svg?style=social&label=Follow)](https://twitter.com/dolphinschedule)
 [![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://s.apache.org/dolphinscheduler-slack)
 
-**PyDolphinScheduler** is python API for [Apache DolphinScheduler](https://dolphinscheduler.apache.org/en-us/index.html),
+**PyDolphinScheduler** is python API for [Apache DolphinScheduler](https://dolphinscheduler.apache.org),
 which allow you definition your workflow by python code, aka workflow-as-codes.
 
 ## Quick Start
@@ -69,7 +69,7 @@ docker run --name dolphinscheduler-standalone-server -p 12345:12345 -p 25333:253
 
 After the container is started, you can access the DolphinScheduler UI via http://localhost:12345/dolphinscheduler.
 For more way to start DolphinScheduler and the more detail about DolphinScheduler, please refer to
-[DolphinScheduler](https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/about/introduction.html)
+[DolphinScheduler](https://dolphinscheduler.apache.org/#/en-us/docs/3.1.2/guide/start/quick-start)
 
 ### Run a simple example
 
@@ -92,7 +92,7 @@ python ./tutorial.py
 After that, a new workflow will be created by PyDolphinScheduler, and you can see it in DolphinScheduler web
 UI's Project Management page. It will trigger the workflow automatically, so you can see the workflow running
 in DolphinScheduler web UI's Workflow Instance page too. For more detail about any function about DolphinScheduler
-Project Management, please refer to [DolphinScheduler Project Management](https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/project/project-list.html)
+Project Management, please refer to [DolphinScheduler Workflow](https://dolphinscheduler.apache.org/#/en-us/docs/3.1.2/guide/project/workflow-definition)
 
 ## Documentation
 
diff --git a/UPDATING.md b/UPDATING.md
index a1d9c99..7ca97ea 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -24,6 +24,7 @@ It started after version 2.0.5 released
 
 ## Main
 
+* Change Task attr ``timeout`` type from int to timedelta and use timeout determine attr ``timeout_flag`` value ([#41](https://github.com/apache/dolphinscheduler-sdk-python/pull/41)) 
 * Remove the spark version of spark task ([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
 * Change class name from process definition to workflow ([#26](https://github.com/apache/dolphinscheduler-sdk-python/pull/26))
   * Deprecated class `ProcessDefinition` to `Workflow`
diff --git a/examples/yaml_define/MoreConfiguration.yaml b/examples/yaml_define/MoreConfiguration.yaml
index 6fb4357..5f4d9d3 100644
--- a/examples/yaml_define/MoreConfiguration.yaml
+++ b/examples/yaml_define/MoreConfiguration.yaml
@@ -34,7 +34,6 @@ tasks:
     delay_time: 20
     fail_retry_times: 30
     fail_retry_interval: 5
-    timeout_flag: "CLOSE"
     timeout: 60
     local_params:
       - { "prop": "n", "direct": "IN", "type": "VARCHAR", "value": "${n}" }
diff --git a/src/pydolphinscheduler/constants.py b/src/pydolphinscheduler/constants.py
index 62b6885..cdfd441 100644
--- a/src/pydolphinscheduler/constants.py
+++ b/src/pydolphinscheduler/constants.py
@@ -38,7 +38,8 @@ class TaskFlag(str):
 class TaskTimeoutFlag(str):
     """Constants for task timeout flag."""
 
-    CLOSE = "CLOSE"
+    OFF = "CLOSE"
+    ON = "OPEN"
 
 
 class TaskType(str):
diff --git a/src/pydolphinscheduler/core/task.py b/src/pydolphinscheduler/core/task.py
index 29f0ed5..816d1b5 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -19,6 +19,7 @@
 import copy
 import types
 import warnings
+from datetime import timedelta
 from logging import getLogger
 from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
 
@@ -37,6 +38,7 @@ from pydolphinscheduler.core.workflow import Workflow, WorkflowContext
 from pydolphinscheduler.exceptions import PyDSParamException, PyResPluginException
 from pydolphinscheduler.java_gateway import gateway
 from pydolphinscheduler.models import Base
+from pydolphinscheduler.utils.date import timedelta2timeout
 
 logger = getLogger(__name__)
 
@@ -130,9 +132,8 @@ class Task(Base):
         delay_time: Optional[int] = 0,
         fail_retry_times: Optional[int] = 0,
         fail_retry_interval: Optional[int] = 1,
-        timeout_flag: Optional[int] = TaskTimeoutFlag.CLOSE,
         timeout_notify_strategy: Optional = None,
-        timeout: Optional[int] = 0,
+        timeout: Optional[timedelta] = None,
         workflow: Optional[Workflow] = None,
         local_params: Optional[List] = None,
         resource_list: Optional[List] = None,
@@ -151,9 +152,8 @@ class Task(Base):
         self.fail_retry_times = fail_retry_times
         self.fail_retry_interval = fail_retry_interval
         self.delay_time = delay_time
-        self.timeout_flag = timeout_flag
         self.timeout_notify_strategy = timeout_notify_strategy
-        self.timeout = timeout
+        self._timeout: timedelta = timeout
         self._workflow = None
         self.workflow: Workflow = workflow or WorkflowContext.get()
         self._upstream_task_codes: Set[int] = set()
@@ -190,6 +190,21 @@ class Task(Base):
         """Set attribute workflow."""
         self._workflow = workflow
 
+    @property
+    def timeout(self) -> int:
+        """Get attribute timeout."""
+        return timedelta2timeout(self._timeout) if self._timeout else 0
+
+    @timeout.setter
+    def timeout(self, val: timedelta) -> None:
+        """Set attribute timeout."""
+        self._timeout = val
+
+    @property
+    def timeout_flag(self) -> str:
+        """Whether the timeout attribute is being set or not."""
+        return TaskTimeoutFlag.ON if self._timeout else TaskTimeoutFlag.OFF
+
     @property
     def resource_list(self) -> List[Dict[str, Resource]]:
         """Get task define attribute `resource_list`."""
diff --git a/src/pydolphinscheduler/utils/date.py b/src/pydolphinscheduler/utils/date.py
index 18cf93e..a1300f6 100644
--- a/src/pydolphinscheduler/utils/date.py
+++ b/src/pydolphinscheduler/utils/date.py
@@ -17,7 +17,8 @@
 
 """Date util function collections."""
 
-from datetime import datetime
+from datetime import datetime, timedelta
+from math import ceil
 
 from pydolphinscheduler.constants import Delimiter, Time
 
@@ -80,3 +81,14 @@ def conv_from_str(src: str) -> datetime:
             )
     else:
         raise NotImplementedError("%s could not be convert to datetime for now.", src)
+
+
+def timedelta2timeout(td: timedelta) -> int:
+    """Convert timedelta to workflow timeout, only keep ``math.ceil`` integer in minutes.
+
+    Because dolphinscheduler timeout attribute only supported in minutes, so we need to convert timedelta
+    into minutes. And will use ``math.ceil`` to keep it integer and not less than 1 if it configured.
+
+    :param td: timedelta object want to convert
+    """
+    return ceil(td.total_seconds() / 60)
diff --git a/tests/core/test_task.py b/tests/core/test_task.py
index 82bfec9..1742cd2 100644
--- a/tests/core/test_task.py
+++ b/tests/core/test_task.py
@@ -18,7 +18,8 @@
 """Test Task class function."""
 import logging
 import re
-from typing import Set
+from datetime import timedelta
+from typing import Set, Tuple
 from unittest.mock import PropertyMock, patch
 
 import pytest
@@ -100,6 +101,27 @@ def test__get_attr(addition: Set, ignore: Set, expect: Set):
     assert task._get_attr() == expect
 
 
+@pytest.mark.parametrize(
+    "value, expect",
+    [
+        (None, (0, "CLOSE")),
+        (timedelta(seconds=0.1), (1, "OPEN")),
+        (timedelta(seconds=61), (2, "OPEN")),
+        (timedelta(seconds=0), (0, "CLOSE")),
+        (timedelta(minutes=1.3), (2, "OPEN")),
+    ],
+)
+def test_task_timeout(value: timedelta, expect: Tuple[int, str]):
+    """Test task timout attribute."""
+    task = TestTask(
+        name="test-get-attr",
+        task_type="test",
+        timeout=value,
+    )
+    assert task.timeout == expect[0]
+    assert task.timeout_flag == expect[1]
+
+
 @pytest.mark.parametrize(
     "attr, expect",
     [
diff --git a/tests/utils/test_date.py b/tests/utils/test_date.py
index b9f8ce5..6546c4a 100644
--- a/tests/utils/test_date.py
+++ b/tests/utils/test_date.py
@@ -17,11 +17,17 @@
 
 """Test utils.date module."""
 
-from datetime import datetime
+from datetime import datetime, timedelta
+from typing import Dict
 
 import pytest
 
-from pydolphinscheduler.utils.date import FMT_STD, conv_from_str, conv_to_schedule
+from pydolphinscheduler.utils.date import (
+    FMT_STD,
+    conv_from_str,
+    conv_to_schedule,
+    timedelta2timeout,
+)
 
 curr_date = datetime.now()
 
@@ -76,3 +82,22 @@ def test_conv_from_str_not_impl(src: str) -> None:
         NotImplementedError, match=".*? could not be convert to datetime for now."
     ):
         conv_from_str(src)
+
+
+@pytest.mark.parametrize(
+    "src, expect",
+    [
+        ({"seconds": 1}, 1),
+        ({"seconds": 60}, 1),
+        ({"seconds": 62}, 2),
+        ({"minutes": 1}, 1),
+        ({"minutes": 1.1}, 2),
+        ({"minutes": 2}, 2),
+        ({"hours": 1}, 60),
+        ({"hours": 1.3}, 78),
+    ],
+)
+def test_timedelta2timeout(src: Dict, expect: int) -> None:
+    """Test function timedelta2timeout."""
+    td = timedelta(**src)
+    assert timedelta2timeout(td) == expect, f"Test case {td} not expect to {expect}."