You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/12/13 01:03:24 UTC
[dolphinscheduler-sdk-python] branch main updated: [feat] Mark workflow timeout as type timedelta (#41)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler-sdk-python.git
The following commit(s) were added to refs/heads/main by this push:
new 13a8f90 [feat] Mark workflow timeout as type timedelta (#41)
13a8f90 is described below
commit 13a8f90648254fb33cd7d5766b41e699d8bea8da
Author: Jay Chung <zh...@gmail.com>
AuthorDate: Tue Dec 13 09:03:19 2022 +0800
[feat] Mark workflow timeout as type timedelta (#41)
* Change task parameter ``timeout`` from int to timedelta
* Set attribute ``timeout_flag`` base on ``timeout`` value
* Add updating
---
README.md | 6 +++---
UPDATING.md | 1 +
examples/yaml_define/MoreConfiguration.yaml | 1 -
src/pydolphinscheduler/constants.py | 3 ++-
src/pydolphinscheduler/core/task.py | 23 +++++++++++++++++++----
src/pydolphinscheduler/utils/date.py | 14 +++++++++++++-
tests/core/test_task.py | 24 +++++++++++++++++++++++-
tests/utils/test_date.py | 29 +++++++++++++++++++++++++++--
8 files changed, 88 insertions(+), 13 deletions(-)
diff --git a/README.md b/README.md
index fcdc944..8f56339 100644
--- a/README.md
+++ b/README.md
@@ -31,7 +31,7 @@ under the License.
[![Twitter Follow](https://img.shields.io/twitter/follow/dolphinschedule.svg?style=social&label=Follow)](https://twitter.com/dolphinschedule)
[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://s.apache.org/dolphinscheduler-slack)
-**PyDolphinScheduler** is python API for [Apache DolphinScheduler](https://dolphinscheduler.apache.org/en-us/index.html),
+**PyDolphinScheduler** is python API for [Apache DolphinScheduler](https://dolphinscheduler.apache.org),
which allow you definition your workflow by python code, aka workflow-as-codes.
## Quick Start
@@ -69,7 +69,7 @@ docker run --name dolphinscheduler-standalone-server -p 12345:12345 -p 25333:253
After the container is started, you can access the DolphinScheduler UI via http://localhost:12345/dolphinscheduler.
For more way to start DolphinScheduler and the more detail about DolphinScheduler, please refer to
-[DolphinScheduler](https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/about/introduction.html)
+[DolphinScheduler](https://dolphinscheduler.apache.org/#/en-us/docs/3.1.2/guide/start/quick-start)
### Run a simple example
@@ -92,7 +92,7 @@ python ./tutorial.py
After that, a new workflow will be created by PyDolphinScheduler, and you can see it in DolphinScheduler web
UI's Project Management page. It will trigger the workflow automatically, so you can see the workflow running
in DolphinScheduler web UI's Workflow Instance page too. For more detail about any function about DolphinScheduler
-Project Management, please refer to [DolphinScheduler Project Management](https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/guide/project/project-list.html)
+Project Management, please refer to [DolphinScheduler Workflow](https://dolphinscheduler.apache.org/#/en-us/docs/3.1.2/guide/project/workflow-definition)
## Documentation
diff --git a/UPDATING.md b/UPDATING.md
index a1d9c99..7ca97ea 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -24,6 +24,7 @@ It started after version 2.0.5 released
## Main
+* Change Task attr ``timeout`` type from int to timedelta and use timeout determine attr ``timeout_flag`` value ([#41](https://github.com/apache/dolphinscheduler-sdk-python/pull/41))
* Remove the spark version of spark task ([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
* Change class name from process definition to workflow ([#26](https://github.com/apache/dolphinscheduler-sdk-python/pull/26))
* Deprecated class `ProcessDefinition` to `Workflow`
diff --git a/examples/yaml_define/MoreConfiguration.yaml b/examples/yaml_define/MoreConfiguration.yaml
index 6fb4357..5f4d9d3 100644
--- a/examples/yaml_define/MoreConfiguration.yaml
+++ b/examples/yaml_define/MoreConfiguration.yaml
@@ -34,7 +34,6 @@ tasks:
delay_time: 20
fail_retry_times: 30
fail_retry_interval: 5
- timeout_flag: "CLOSE"
timeout: 60
local_params:
- { "prop": "n", "direct": "IN", "type": "VARCHAR", "value": "${n}" }
diff --git a/src/pydolphinscheduler/constants.py b/src/pydolphinscheduler/constants.py
index 62b6885..cdfd441 100644
--- a/src/pydolphinscheduler/constants.py
+++ b/src/pydolphinscheduler/constants.py
@@ -38,7 +38,8 @@ class TaskFlag(str):
class TaskTimeoutFlag(str):
"""Constants for task timeout flag."""
- CLOSE = "CLOSE"
+ OFF = "CLOSE"
+ ON = "OPEN"
class TaskType(str):
diff --git a/src/pydolphinscheduler/core/task.py b/src/pydolphinscheduler/core/task.py
index 29f0ed5..816d1b5 100644
--- a/src/pydolphinscheduler/core/task.py
+++ b/src/pydolphinscheduler/core/task.py
@@ -19,6 +19,7 @@
import copy
import types
import warnings
+from datetime import timedelta
from logging import getLogger
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
@@ -37,6 +38,7 @@ from pydolphinscheduler.core.workflow import Workflow, WorkflowContext
from pydolphinscheduler.exceptions import PyDSParamException, PyResPluginException
from pydolphinscheduler.java_gateway import gateway
from pydolphinscheduler.models import Base
+from pydolphinscheduler.utils.date import timedelta2timeout
logger = getLogger(__name__)
@@ -130,9 +132,8 @@ class Task(Base):
delay_time: Optional[int] = 0,
fail_retry_times: Optional[int] = 0,
fail_retry_interval: Optional[int] = 1,
- timeout_flag: Optional[int] = TaskTimeoutFlag.CLOSE,
timeout_notify_strategy: Optional = None,
- timeout: Optional[int] = 0,
+ timeout: Optional[timedelta] = None,
workflow: Optional[Workflow] = None,
local_params: Optional[List] = None,
resource_list: Optional[List] = None,
@@ -151,9 +152,8 @@ class Task(Base):
self.fail_retry_times = fail_retry_times
self.fail_retry_interval = fail_retry_interval
self.delay_time = delay_time
- self.timeout_flag = timeout_flag
self.timeout_notify_strategy = timeout_notify_strategy
- self.timeout = timeout
+ self._timeout: timedelta = timeout
self._workflow = None
self.workflow: Workflow = workflow or WorkflowContext.get()
self._upstream_task_codes: Set[int] = set()
@@ -190,6 +190,21 @@ class Task(Base):
"""Set attribute workflow."""
self._workflow = workflow
+ @property
+ def timeout(self) -> int:
+ """Get attribute timeout."""
+ return timedelta2timeout(self._timeout) if self._timeout else 0
+
+ @timeout.setter
+ def timeout(self, val: timedelta) -> None:
+ """Set attribute timeout."""
+ self._timeout = val
+
+ @property
+ def timeout_flag(self) -> str:
+ """Whether the timeout attribute is being set or not."""
+ return TaskTimeoutFlag.ON if self._timeout else TaskTimeoutFlag.OFF
+
@property
def resource_list(self) -> List[Dict[str, Resource]]:
"""Get task define attribute `resource_list`."""
diff --git a/src/pydolphinscheduler/utils/date.py b/src/pydolphinscheduler/utils/date.py
index 18cf93e..a1300f6 100644
--- a/src/pydolphinscheduler/utils/date.py
+++ b/src/pydolphinscheduler/utils/date.py
@@ -17,7 +17,8 @@
"""Date util function collections."""
-from datetime import datetime
+from datetime import datetime, timedelta
+from math import ceil
from pydolphinscheduler.constants import Delimiter, Time
@@ -80,3 +81,14 @@ def conv_from_str(src: str) -> datetime:
)
else:
raise NotImplementedError("%s could not be convert to datetime for now.", src)
+
+
+def timedelta2timeout(td: timedelta) -> int:
+ """Convert timedelta to workflow timeout, only keep ``math.ceil`` integer in minutes.
+
+ Because dolphinscheduler timeout attribute only supported in minutes, so we need to convert timedelta
+ into minutes. And will use ``math.ceil`` to keep it integer and not less than 1 if it configured.
+
+ :param td: timedelta object want to convert
+ """
+ return ceil(td.total_seconds() / 60)
diff --git a/tests/core/test_task.py b/tests/core/test_task.py
index 82bfec9..1742cd2 100644
--- a/tests/core/test_task.py
+++ b/tests/core/test_task.py
@@ -18,7 +18,8 @@
"""Test Task class function."""
import logging
import re
-from typing import Set
+from datetime import timedelta
+from typing import Set, Tuple
from unittest.mock import PropertyMock, patch
import pytest
@@ -100,6 +101,27 @@ def test__get_attr(addition: Set, ignore: Set, expect: Set):
assert task._get_attr() == expect
+@pytest.mark.parametrize(
+ "value, expect",
+ [
+ (None, (0, "CLOSE")),
+ (timedelta(seconds=0.1), (1, "OPEN")),
+ (timedelta(seconds=61), (2, "OPEN")),
+ (timedelta(seconds=0), (0, "CLOSE")),
+ (timedelta(minutes=1.3), (2, "OPEN")),
+ ],
+)
+def test_task_timeout(value: timedelta, expect: Tuple[int, str]):
+ """Test task timout attribute."""
+ task = TestTask(
+ name="test-get-attr",
+ task_type="test",
+ timeout=value,
+ )
+ assert task.timeout == expect[0]
+ assert task.timeout_flag == expect[1]
+
+
@pytest.mark.parametrize(
"attr, expect",
[
diff --git a/tests/utils/test_date.py b/tests/utils/test_date.py
index b9f8ce5..6546c4a 100644
--- a/tests/utils/test_date.py
+++ b/tests/utils/test_date.py
@@ -17,11 +17,17 @@
"""Test utils.date module."""
-from datetime import datetime
+from datetime import datetime, timedelta
+from typing import Dict
import pytest
-from pydolphinscheduler.utils.date import FMT_STD, conv_from_str, conv_to_schedule
+from pydolphinscheduler.utils.date import (
+ FMT_STD,
+ conv_from_str,
+ conv_to_schedule,
+ timedelta2timeout,
+)
curr_date = datetime.now()
@@ -76,3 +82,22 @@ def test_conv_from_str_not_impl(src: str) -> None:
NotImplementedError, match=".*? could not be convert to datetime for now."
):
conv_from_str(src)
+
+
+@pytest.mark.parametrize(
+ "src, expect",
+ [
+ ({"seconds": 1}, 1),
+ ({"seconds": 60}, 1),
+ ({"seconds": 62}, 2),
+ ({"minutes": 1}, 1),
+ ({"minutes": 1.1}, 2),
+ ({"minutes": 2}, 2),
+ ({"hours": 1}, 60),
+ ({"hours": 1.3}, 78),
+ ],
+)
+def test_timedelta2timeout(src: Dict, expect: int) -> None:
+ """Test function timedelta2timeout."""
+ td = timedelta(**src)
+ assert timedelta2timeout(td) == expect, f"Test case {td} not expect to {expect}."