You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/12/08 14:17:18 UTC
[dolphinscheduler] branch dev updated: [python] Refactor get object define communicate to gateway (#7272)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 2a7d6b4 [python] Refactor get object define communicate to gateway (#7272)
2a7d6b4 is described below
commit 2a7d6b468f2942d075430478ffe08375f98df007
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Wed Dec 8 22:17:13 2021 +0800
[python] Refactor get object define communicate to gateway (#7272)
* Change class Base `to_dict` to `get_define` for more clearer
* Remove class TaskParams and sub class make code easy to
understand and avoid task implement cycle dependence
* Remove class ObjectJsonBase in Task to reduce complexity
fix: #7271
---
.../src/pydolphinscheduler/core/base.py | 43 +++----
.../pydolphinscheduler/core/process_definition.py | 6 +-
.../src/pydolphinscheduler/core/task.py | 132 +++++++++------------
.../src/pydolphinscheduler/tasks/http.py | 47 +++-----
.../src/pydolphinscheduler/tasks/python.py | 33 +++---
.../src/pydolphinscheduler/tasks/shell.py | 18 ++-
.../src/pydolphinscheduler/tasks/sql.py | 78 ++++++------
.../src/pydolphinscheduler/tasks/sub_process.py | 55 +++------
.../tests/core/test_process_definition.py | 14 +--
.../pydolphinscheduler/tests/core/test_task.py | 76 ++++++++----
.../pydolphinscheduler/tests/tasks/test_http.py | 55 +++++++--
.../pydolphinscheduler/tests/tasks/test_python.py | 60 +++++-----
.../pydolphinscheduler/tests/tasks/test_shell.py | 43 ++++---
.../pydolphinscheduler/tests/tasks/test_sql.py | 42 ++++++-
.../tests/tasks/test_sub_process.py | 53 ++++++---
15 files changed, 405 insertions(+), 350 deletions(-)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
index d636d51..690351a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py
@@ -26,11 +26,14 @@ from pydolphinscheduler.utils.string import attr2camel
class Base:
"""DolphinScheduler Base object."""
+ # Object key attribute, to test whether object equals and so on.
_KEY_ATTR: set = {"name", "description"}
- _TO_DICT_ATTR: set = set()
+ # Object defines attribute, use when needs to communicate with Java gateway server.
+ _DEFINE_ATTR: set = set()
- DEFAULT_ATTR: Dict = {}
+ # Object default attribute, will add those attribute to `_DEFINE_ATTR` when init assign missing.
+ _DEFAULT_ATTR: Dict = {}
def __init__(self, name: str, description: Optional[str] = None):
self.name = name
@@ -44,28 +47,28 @@ class Base:
getattr(self, a, None) == getattr(other, a, None) for a in self._KEY_ATTR
)
- # TODO check how Redash do
- # TODO DRY
- def to_dict(self, camel_attr=True) -> Dict:
- """Get object key attribute dict.
-
- use attribute `self._TO_DICT_ATTR` to determine which attributes should including to
- children `to_dict` function.
- """
- # content = {}
- # for attr, value in self.__dict__.items():
- # # Don't publish private variables
- # if attr.startswith("_"):
- # continue
- # else:
- # content[snake2camel(attr)] = value
- # content.update(self.DEFAULT_ATTR)
- # return content
+ def get_define_custom(
+ self, camel_attr: bool = True, custom_attr: set = None
+ ) -> Dict:
+ """Get object definition attribute by given attr set."""
content = {}
- for attr in self._TO_DICT_ATTR:
+ for attr in custom_attr:
val = getattr(self, attr, None)
if camel_attr:
content[attr2camel(attr)] = val
else:
content[attr] = val
return content
+
+ def get_define(self, camel_attr: bool = True) -> Dict:
+ """Get object definition attribute communicate to Java gateway server.
+
+ use attribute `self._DEFINE_ATTR` to determine which attributes should including when
+ object tries to communicate with Java gateway server.
+ """
+ content = self.get_define_custom(camel_attr, self._DEFINE_ATTR)
+ update_default = {
+ k: self._DEFAULT_ATTR.get(k) for k in self._DEFAULT_ATTR if k not in content
+ }
+ content.update(update_default)
+ return content
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index 1586757..70d4e6b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -68,7 +68,7 @@ class ProcessDefinition(Base):
"param",
}
- _TO_DICT_ATTR = {
+ _DEFINE_ATTR = {
"name",
"description",
"_project",
@@ -195,7 +195,7 @@ class ProcessDefinition(Base):
if not self.tasks:
return [self.tasks]
else:
- return [task.to_dict() for task in self.tasks.values()]
+ return [task.get_define() for task in self.tasks.values()]
@property
def task_relation_json(self) -> List[Dict]:
@@ -204,7 +204,7 @@ class ProcessDefinition(Base):
return [self.tasks]
else:
self._handle_root_relation()
- return [tr.to_dict() for tr in self._task_relations]
+ return [tr.get_define() for tr in self._task_relations]
@property
def schedule_json(self) -> Optional[Dict]:
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index d22e5c8..39cca9c 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-"""DolphinScheduler ObjectJsonBase, TaskParams and Task object."""
+"""DolphinScheduler Task and TaskRelation object."""
import logging
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
@@ -32,61 +32,17 @@ from pydolphinscheduler.core.process_definition import (
ProcessDefinitionContext,
)
from pydolphinscheduler.java_gateway import launch_gateway
-from pydolphinscheduler.utils.string import class_name2camel, snake2camel
-class ObjectJsonBase:
- """Task base class, define `__str__` and `to_dict` function would be use in other task related class."""
-
- DEFAULT_ATTR = {}
-
- def __int__(self, *args, **kwargs):
- pass
-
- def __str__(self) -> str:
- content = []
- for attribute, value in self.__dict__.items():
- content.append(f'"{snake2camel(attribute)}": {value}')
- content = ",".join(content)
- return f'"{class_name2camel(type(self).__name__)}":{{{content}}}'
-
- # TODO check how Redash do
- # TODO DRY
- def to_dict(self) -> Dict:
- """Get object key attribute dict which determine by attribute `DEFAULT_ATTR`."""
- content = {snake2camel(attr): value for attr, value in self.__dict__.items()}
- content.update(self.DEFAULT_ATTR)
- return content
-
-
-class TaskParams(ObjectJsonBase):
- """TaskParams object, describe the key parameter of a single task."""
-
- DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
-
- def __init__(
- self,
- local_params: Optional[List] = None,
- resource_list: Optional[List] = None,
- dependence: Optional[Dict] = None,
- wait_start_timeout: Optional[Dict] = None,
- condition_result: Optional[Dict] = None,
- *args,
- **kwargs,
- ):
- super().__init__(*args, **kwargs)
- self.local_params = local_params or []
- self.resource_list = resource_list or []
- self.dependence = dependence or {}
- self.wait_start_timeout = wait_start_timeout or {}
- # TODO need better way to handle it, this code just for POC
- self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
-
-
-class TaskRelation(ObjectJsonBase):
+class TaskRelation(Base):
"""TaskRelation object, describe the relation of exactly two tasks."""
- DEFAULT_ATTR = {
+ _DEFINE_ATTR = {
+ "pre_task_code",
+ "post_task_code",
+ }
+
+ _DEFAULT_ATTR = {
"name": "",
"preTaskVersion": 1,
"postTaskVersion": 1,
@@ -98,8 +54,9 @@ class TaskRelation(ObjectJsonBase):
self,
pre_task_code: int,
post_task_code: int,
+ name: Optional[str] = None,
):
- super().__init__()
+ super().__init__(name)
self.pre_task_code = pre_task_code
self.post_task_code = post_task_code
@@ -110,19 +67,32 @@ class TaskRelation(ObjectJsonBase):
class Task(Base):
"""Task object, parent class for all exactly task type."""
- DEFAULT_DEPS_ATTR = {
- "name": "",
- "preTaskVersion": 1,
- "postTaskVersion": 1,
- "conditionType": 0,
- "conditionParams": {},
+ _DEFINE_ATTR = {
+ "name",
+ "code",
+ "version",
+ "task_type",
+ "task_params",
+ "description",
+ "flag",
+ "task_priority",
+ "worker_group",
+ "delay_time",
+ "fail_retry_times",
+ "fail_retry_interval",
+ "timeout_flag",
+ "timeout_notify_strategy",
+ "timeout",
}
+ _task_custom_attr: set = set()
+
+ DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
+
def __init__(
self,
name: str,
task_type: str,
- task_params: TaskParams,
description: Optional[str] = None,
flag: Optional[str] = TaskFlag.YES,
task_priority: Optional[str] = TaskPriority.MEDIUM,
@@ -134,11 +104,15 @@ class Task(Base):
timeout_notify_strategy: Optional = None,
timeout: Optional[int] = 0,
process_definition: Optional[ProcessDefinition] = None,
+ local_params: Optional[List] = None,
+ resource_list: Optional[List] = None,
+ dependence: Optional[Dict] = None,
+ wait_start_timeout: Optional[Dict] = None,
+ condition_result: Optional[Dict] = None,
):
super().__init__(name, description)
self.task_type = task_type
- self.task_params = task_params
self.flag = flag
self.task_priority = task_priority
self.worker_group = worker_group
@@ -169,6 +143,13 @@ class Task(Base):
self.code,
)
+ # Attribute for task param
+ self.local_params = local_params or []
+ self.resource_list = resource_list or []
+ self.dependence = dependence or {}
+ self.wait_start_timeout = wait_start_timeout or {}
+ self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
+
@property
def process_definition(self) -> Optional[ProcessDefinition]:
"""Get attribute process_definition."""
@@ -179,6 +160,22 @@ class Task(Base):
"""Set attribute process_definition."""
self._process_definition = process_definition
+ @property
+ def task_params(self) -> Optional[Dict]:
+ """Get task parameter object.
+
+ Will get result to combine _task_custom_attr and custom_attr.
+ """
+ custom_attr = {
+ "local_params",
+ "resource_list",
+ "dependence",
+ "wait_start_timeout",
+ "condition_result",
+ }
+ custom_attr |= self._task_custom_attr
+ return self.get_define_custom(custom_attr=custom_attr)
+
def __hash__(self):
return hash(self.code)
@@ -259,16 +256,3 @@ class Task(Base):
# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
# gateway_result_checker(result)
return result.get("code"), result.get("version")
-
- def to_dict(self, camel_attr=True) -> Dict:
- """Task `to_dict` function which will return key attribute for Task object."""
- content = {}
- for attr, value in self.__dict__.items():
- # Don't publish private variables
- if attr.startswith("_"):
- continue
- elif isinstance(value, TaskParams):
- content[snake2camel(attr)] = value.to_dict()
- else:
- content[snake2camel(attr)] = value
- return content
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py
index 445142e..781333d 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py
@@ -20,7 +20,7 @@
from typing import Optional
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task, TaskParams
+from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSParamException
@@ -50,11 +50,22 @@ class HttpCheckCondition:
BODY_NOT_CONTAINS = "BODY_NOT_CONTAINS"
-class HttpTaskParams(TaskParams):
- """Parameter only for Http task types."""
+class Http(Task):
+ """Task HTTP object, declare behavior for HTTP task to dolphinscheduler."""
+
+ _task_custom_attr = {
+ "url",
+ "http_method",
+ "http_params",
+ "http_check_condition",
+ "condition",
+ "connect_timeout",
+ "socket_timeout",
+ }
def __init__(
self,
+ name: str,
url: str,
http_method: Optional[str] = HttpMethod.GET,
http_params: Optional[str] = None,
@@ -65,7 +76,7 @@ class HttpTaskParams(TaskParams):
*args,
**kwargs
):
- super().__init__(*args, **kwargs)
+ super().__init__(name, TaskType.HTTP, *args, **kwargs)
self.url = url
if not hasattr(HttpMethod, http_method):
raise PyDSParamException(
@@ -88,31 +99,3 @@ class HttpTaskParams(TaskParams):
self.condition = condition
self.connect_timeout = connect_timeout
self.socket_timeout = socket_timeout
-
-
-class Http(Task):
- """Task HTTP object, declare behavior for HTTP task to dolphinscheduler."""
-
- def __init__(
- self,
- name: str,
- url: str,
- http_method: Optional[str] = HttpMethod.GET,
- http_params: Optional[str] = None,
- http_check_condition: Optional[str] = HttpCheckCondition.STATUS_CODE_DEFAULT,
- condition: Optional[str] = None,
- connect_timeout: Optional[int] = 60000,
- socket_timeout: Optional[int] = 60000,
- *args,
- **kwargs
- ):
- task_params = HttpTaskParams(
- url=url,
- http_method=http_method,
- http_params=http_params,
- http_check_condition=http_check_condition,
- condition=condition,
- connect_timeout=connect_timeout,
- socket_timeout=socket_timeout,
- )
- super().__init__(name, TaskType.HTTP, task_params, *args, **kwargs)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
index 9a7149a..7950480 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py
@@ -22,29 +22,30 @@ import types
from typing import Any
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task, TaskParams
+from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSParamException
-class PythonTaskParams(TaskParams):
- """Parameter only for Python task types."""
-
- def __init__(self, raw_script: str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.raw_script = raw_script
-
-
class Python(Task):
"""Task Python object, declare behavior for Python task to dolphinscheduler."""
+ _task_custom_attr = {
+ "raw_script",
+ }
+
def __init__(self, name: str, code: Any, *args, **kwargs):
- if isinstance(code, str):
- task_params = PythonTaskParams(raw_script=code)
- elif isinstance(code, types.FunctionType):
- py_function = inspect.getsource(code)
- task_params = PythonTaskParams(raw_script=py_function)
+ super().__init__(name, TaskType.PYTHON, *args, **kwargs)
+ self._code = code
+
+ @property
+ def raw_script(self) -> str:
+ """Get python task define attribute `raw_script`."""
+ if isinstance(self._code, str):
+ return self._code
+ elif isinstance(self._code, types.FunctionType):
+ py_function = inspect.getsource(self._code)
+ return py_function
else:
raise PyDSParamException(
- "Parameter code do not support % for now.", type(code)
+ "Parameter code do not support % for now.", type(self._code)
)
- super().__init__(name, TaskType.PYTHON, task_params, *args, **kwargs)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
index 25e82f5..94ad2f4 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py
@@ -18,15 +18,7 @@
"""Task shell."""
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task, TaskParams
-
-
-class ShellTaskParams(TaskParams):
- """Parameter only for shell task types."""
-
- def __init__(self, raw_script: str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.raw_script = raw_script
+from pydolphinscheduler.core.task import Task
class Shell(Task):
@@ -37,6 +29,10 @@ class Shell(Task):
task.name assign to `task_shell`
"""
+ _task_custom_attr = {
+ "raw_script",
+ }
+
def __init__(self, name: str, command: str, *args, **kwargs):
- task_params = ShellTaskParams(raw_script=command)
- super().__init__(name, TaskType.SHELL, task_params, *args, **kwargs)
+ super().__init__(name, TaskType.SHELL, *args, **kwargs)
+ self.raw_script = command
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
index 62da964..f16eb10 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py
@@ -21,7 +21,7 @@ import re
from typing import Dict, Optional
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.task import Task, TaskParams
+from pydolphinscheduler.core.task import Task
from pydolphinscheduler.java_gateway import launch_gateway
@@ -32,31 +32,6 @@ class SqlType:
NOT_SELECT = 1
-class SqlTaskParams(TaskParams):
- """Parameter only for Sql task type."""
-
- def __init__(
- self,
- type: str,
- datasource: str,
- sql: str,
- sql_type: Optional[int] = SqlType.NOT_SELECT,
- display_rows: Optional[int] = 10,
- pre_statements: Optional[str] = None,
- post_statements: Optional[str] = None,
- *args,
- **kwargs
- ):
- super().__init__(*args, **kwargs)
- self.type = type
- self.datasource = datasource
- self.sql = sql
- self.sql_type = sql_type
- self.display_rows = display_rows
- self.pre_statements = pre_statements or []
- self.post_statements = post_statements or []
-
-
class Sql(Task):
"""Task SQL object, declare behavior for SQL task to dolphinscheduler.
@@ -73,38 +48,40 @@ class Sql(Task):
database type and database instance would run this sql.
"""
+ _task_custom_attr = {
+ "sql",
+ "sql_type",
+ "pre_statements",
+ "post_statements",
+ "display_rows",
+ }
+
def __init__(
self,
name: str,
datasource_name: str,
sql: str,
- pre_sql: Optional[str] = None,
- post_sql: Optional[str] = None,
+ pre_statements: Optional[str] = None,
+ post_statements: Optional[str] = None,
display_rows: Optional[int] = 10,
*args,
**kwargs
):
- self._sql = sql
- self._datasource_name = datasource_name
+ super().__init__(name, TaskType.SQL, *args, **kwargs)
+ self.datasource_name = datasource_name
+ self.sql = sql
+ self.pre_statements = pre_statements or []
+ self.post_statements = post_statements or []
+ self.display_rows = display_rows
self._datasource = {}
- task_params = SqlTaskParams(
- type=self.get_datasource_type(),
- datasource=self.get_datasource_id(),
- sql=sql,
- sql_type=self.sql_type,
- display_rows=display_rows,
- pre_statements=pre_sql,
- post_statements=post_sql,
- )
- super().__init__(name, TaskType.SQL, task_params, *args, **kwargs)
def get_datasource_type(self) -> str:
"""Get datasource type from java gateway, a wrapper for :func:`get_datasource_info`."""
- return self.get_datasource_info(self._datasource_name).get("type")
+ return self.get_datasource_info(self.datasource_name).get("type")
def get_datasource_id(self) -> str:
"""Get datasource id from java gateway, a wrapper for :func:`get_datasource_info`."""
- return self.get_datasource_info(self._datasource_name).get("id")
+ return self.get_datasource_info(self.datasource_name).get("id")
def get_datasource_info(self, name) -> Dict:
"""Get datasource info from java gateway, contains datasource id, type, name."""
@@ -122,7 +99,22 @@ class Sql(Task):
"^(?!(.* |)insert |(.* |)delete |(.* |)drop |(.* |)update |(.* |)alter ).*"
)
pattern_select = re.compile(pattern_select_str, re.IGNORECASE)
- if pattern_select.match(self._sql) is None:
+ if pattern_select.match(self.sql) is None:
return SqlType.NOT_SELECT
else:
return SqlType.SELECT
+
+ @property
+ def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
+ """Override Task.task_params for sql task.
+
+ Sql task have some specials attribute for task_params, and is odd if we
+ directly set as python property, so we Override Task.task_params here.
+ """
+ params = super().task_params
+ custom_params = {
+ "type": self.get_datasource_type(),
+ "datasource": self.get_datasource_id(),
+ }
+ params.update(custom_params)
+ return params
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py
index 1bf0bd1..8ba6b4c 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py
@@ -20,57 +20,36 @@
from typing import Dict
from pydolphinscheduler.constants import TaskType
-from pydolphinscheduler.core.process_definition import ProcessDefinitionContext
-from pydolphinscheduler.core.task import Task, TaskParams
+from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSProcessDefinitionNotAssignException
from pydolphinscheduler.java_gateway import launch_gateway
-class SubProcessTaskParams(TaskParams):
- """Parameter only for Sub Process task type."""
-
- def __init__(self, process_definition_code, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.process_definition_code = process_definition_code
-
-
class SubProcess(Task):
"""Task SubProcess object, declare behavior for SubProcess task to dolphinscheduler."""
- def __init__(self, name: str, process_definition_name: str, *args, **kwargs):
- self._process_definition_name = process_definition_name
- self._process_definition_info = {}
- # TODO: Optimize the way of obtaining process_definition
- self.process_definition = kwargs.get(
- "process_definition", ProcessDefinitionContext.get()
- )
- if not self.process_definition:
- raise PyDSProcessDefinitionNotAssignException(
- "ProcessDefinition must be provider when SubProcess initialization."
- )
+ _task_custom_attr = {"process_definition_code"}
- task_params = SubProcessTaskParams(
- process_definition_code=self.get_process_definition_code(),
- )
- super().__init__(name, TaskType.SUB_PROCESS, task_params, *args, **kwargs)
+ def __init__(self, name: str, process_definition_name: str, *args, **kwargs):
+ super().__init__(name, TaskType.SUB_PROCESS, *args, **kwargs)
+ self.process_definition_name = process_definition_name
- def get_process_definition_code(self) -> str:
+ @property
+ def process_definition_code(self) -> str:
"""Get process definition code, a wrapper for :func:`get_process_definition_info`."""
- return self.get_process_definition_info(self._process_definition_name).get(
+ return self.get_process_definition_info(self.process_definition_name).get(
"code"
)
def get_process_definition_info(self, process_definition_name: str) -> Dict:
"""Get process definition info from java gateway, contains process definition id, name, code."""
- if self._process_definition_info:
- return self._process_definition_info
- else:
- gateway = launch_gateway()
- self._process_definition_info = (
- gateway.entry_point.getProcessDefinitionInfo(
- self.process_definition.user.name,
- self.process_definition.project.name,
- process_definition_name,
- )
+ if not self.process_definition:
+ raise PyDSProcessDefinitionNotAssignException(
+ "ProcessDefinition must be provider for task SubProcess."
)
- return self._process_definition_info
+ gateway = launch_gateway()
+ return gateway.entry_point.getProcessDefinitionInfo(
+ self.process_definition.user.name,
+ self.process_definition.project.name,
+ process_definition_name,
+ )
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
index 4c15974..8491878 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
@@ -28,7 +28,6 @@ from pydolphinscheduler.constants import (
ProcessDefinitionReleaseState,
)
from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.core.task import TaskParams
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.side import Project, Tenant, User
from pydolphinscheduler.utils.date import conv_to_schedule
@@ -152,8 +151,8 @@ def test__parse_datetime_not_support_type(val: Any):
pd._parse_datetime(val)
-def test_process_definition_to_dict_without_task():
- """Test process definition function to_dict without task."""
+def test_process_definition_get_define_without_task():
+ """Test process definition function get_define without task."""
expect = {
"name": TEST_PROCESS_DEFINITION_NAME,
"description": None,
@@ -168,7 +167,7 @@ def test_process_definition_to_dict_without_task():
"taskRelationJson": [{}],
}
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
- assert pd.to_dict() == expect
+ assert pd.get_define() == expect
def test_process_definition_simple_context_manager():
@@ -176,10 +175,7 @@ def test_process_definition_simple_context_manager():
expect_tasks_num = 5
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd:
for i in range(expect_tasks_num):
- task_params = TaskParams()
- curr_task = Task(
- name=f"task-{i}", task_type=f"type-{i}", task_params=task_params
- )
+ curr_task = Task(name=f"task-{i}", task_type=f"type-{i}")
# Set deps task i as i-1 parent
if i > 0:
pre_task = pd.get_one_task_by_name(f"task-{i - 1}")
@@ -221,11 +217,9 @@ def test_process_definition_simple_separate():
expect_tasks_num = 5
pd = ProcessDefinition(TEST_PROCESS_DEFINITION_NAME)
for i in range(expect_tasks_num):
- task_params = TaskParams()
curr_task = Task(
name=f"task-{i}",
task_type=f"type-{i}",
- task_params=task_params,
process_definition=pd,
)
# Set deps task i as i-1 parent
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
index 6d09820..b551f07 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
@@ -21,21 +21,49 @@ from unittest.mock import patch
import pytest
-from pydolphinscheduler.core.task import Task, TaskParams, TaskRelation
+from pydolphinscheduler.core.task import Task, TaskRelation
from tests.testing.task import Task as testTask
-def test_task_params_to_dict():
- """Test TaskParams object function to_dict."""
- expect = {
- "resourceList": [],
- "localParams": [],
- "dependence": {},
- "conditionResult": TaskParams.DEFAULT_CONDITION_RESULT,
- "waitStartTimeout": {},
- }
- task_param = TaskParams()
- assert task_param.to_dict() == expect
+@pytest.mark.parametrize(
+ "attr, expect",
+ [
+ (
+ dict(),
+ {
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "waitStartTimeout": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ },
+ ),
+ (
+ {
+ "local_params": ["foo", "bar"],
+ "resource_list": ["foo", "bar"],
+ "dependence": {"foo", "bar"},
+ "wait_start_timeout": {"foo", "bar"},
+ "condition_result": {"foo": ["bar"]},
+ },
+ {
+ "localParams": ["foo", "bar"],
+ "resourceList": ["foo", "bar"],
+ "dependence": {"foo", "bar"},
+ "waitStartTimeout": {"foo", "bar"},
+ "conditionResult": {"foo": ["bar"]},
+ },
+ ),
+ ],
+)
+def test_property_task_params(attr, expect):
+ """Test class task property."""
+ task = testTask(
+ "test-property-task-params",
+ "test-task",
+ **attr,
+ )
+ assert expect == task.task_params
def test_task_relation_to_dict():
@@ -54,15 +82,15 @@ def test_task_relation_to_dict():
task_param = TaskRelation(
pre_task_code=pre_task_code, post_task_code=post_task_code
)
- assert task_param.to_dict() == expect
+ assert task_param.get_define() == expect
-def test_task_to_dict():
- """Test Task object function to_dict."""
+def test_task_get_define():
+ """Test Task object function get_define."""
code = 123
version = 1
- name = "test_task_to_dict"
- task_type = "test_task_to_dict_type"
+ name = "test_task_get_define"
+ task_type = "test_task_get_define_type"
expect = {
"code": code,
"name": name,
@@ -90,8 +118,8 @@ def test_task_to_dict():
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
- task = Task(name=name, task_type=task_type, task_params=TaskParams())
- assert task.to_dict() == expect
+ task = Task(name=name, task_type=task_type)
+ assert task.get_define() == expect
@pytest.mark.parametrize("shift", ["<<", ">>"])
@@ -100,8 +128,8 @@ def test_two_tasks_shift(shift: str):
Here we test both `>>` and `<<` bit operator.
"""
- upstream = testTask(name="upstream", task_type=shift, task_params=TaskParams())
- downstream = testTask(name="downstream", task_type=shift, task_params=TaskParams())
+ upstream = testTask(name="upstream", task_type=shift)
+ downstream = testTask(name="downstream", task_type=shift)
if shift == "<<":
downstream << upstream
elif shift == ">>":
@@ -137,10 +165,10 @@ def test_tasks_list_shift(dep_expr: str, flag: str):
"downstream": "upstream",
}
task_type = "dep_task_and_tasks"
- task = testTask(name="upstream", task_type=task_type, task_params=TaskParams())
+ task = testTask(name="upstream", task_type=task_type)
tasks = [
- testTask(name="downstream1", task_type=task_type, task_params=TaskParams()),
- testTask(name="downstream2", task_type=task_type, task_params=TaskParams()),
+ testTask(name="downstream1", task_type=task_type),
+ testTask(name="downstream2", task_type=task_type),
]
# Use build-in function eval to simply test case and reduce duplicate code
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py
index 7c01517..060cdec 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py
@@ -22,12 +22,7 @@ from unittest.mock import patch
import pytest
from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.tasks.http import (
- Http,
- HttpCheckCondition,
- HttpMethod,
- HttpTaskParams,
-)
+from pydolphinscheduler.tasks.http import Http, HttpCheckCondition, HttpMethod
@pytest.mark.parametrize(
@@ -51,6 +46,38 @@ def test_attr_exists(class_name, attrs):
@pytest.mark.parametrize(
+ "attr, expect",
+ [
+ (
+ {"url": "https://www.apache.org"},
+ {
+ "url": "https://www.apache.org",
+ "httpMethod": "GET",
+ "httpParams": [],
+ "httpCheckCondition": "STATUS_CODE_DEFAULT",
+ "condition": None,
+ "connectTimeout": 60000,
+ "socketTimeout": 60000,
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "waitStartTimeout": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ },
+ )
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_property_task_params(mock_code_version, attr, expect):
+ """Test task http property."""
+ task = Http("test-http-task-params", **attr)
+ assert expect == task.task_params
+
+
+@pytest.mark.parametrize(
"param",
[
{"http_method": "http_method"},
@@ -62,18 +89,22 @@ def test_attr_exists(class_name, attrs):
},
],
)
-def test_http_task_param_not_support_param(param):
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_http_task_param_not_support_param(mock_code, param):
"""Test HttpTaskParams not support parameter."""
url = "https://www.apache.org"
with pytest.raises(PyDSParamException, match="Parameter .*?"):
- HttpTaskParams(url, **param)
+ Http("test-no-supprot-param", url, **param)
-def test_http_to_dict():
- """Test task HTTP function to_dict."""
+def test_http_get_define():
+ """Test task HTTP function get_define."""
code = 123
version = 1
- name = "test_http_to_dict"
+ name = "test_http_get_define"
url = "https://www.apache.org"
expect = {
"code": code,
@@ -110,4 +141,4 @@ def test_http_to_dict():
return_value=(code, version),
):
http = Http(name, url)
- assert http.to_dict() == expect
+ assert http.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
index f9e7f04..dbcd298 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py
@@ -23,26 +23,33 @@ from unittest.mock import patch
import pytest
from pydolphinscheduler.exceptions import PyDSParamException
-from pydolphinscheduler.tasks.python import Python, PythonTaskParams
+from pydolphinscheduler.tasks.python import Python
@pytest.mark.parametrize(
- "name, value",
+ "attr, expect",
[
- ("local_params", "local_params"),
- ("resource_list", "resource_list"),
- ("dependence", "dependence"),
- ("wait_start_timeout", "wait_start_timeout"),
- ("condition_result", "condition_result"),
+ (
+ {"code": "print(1)"},
+ {
+ "rawScript": "print(1)",
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "waitStartTimeout": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ },
+ )
],
)
-def test_python_task_params_attr_setter(name, value):
- """Test python task parameters."""
- command = 'print("hello world.")'
- python_task_params = PythonTaskParams(command)
- assert command == python_task_params.raw_script
- setattr(python_task_params, name, value)
- assert value == getattr(python_task_params, name)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_property_task_params(mock_code_version, attr, expect):
+ """Test task python property."""
+ task = Python("test-python-task-params", **attr)
+ assert expect == task.task_params
@pytest.mark.parametrize(
@@ -52,19 +59,16 @@ def test_python_task_params_attr_setter(name, value):
("print", "hello world"),
],
)
-def test_python_task_not_support_code(script_code):
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_python_task_not_support_code(mock_code, script_code):
"""Test python task parameters."""
name = "not_support_code_type"
- code = 123
- version = 1
- with patch(
- "pydolphinscheduler.core.task.Task.gen_code_and_version",
- return_value=(code, version),
- ):
- with pytest.raises(
- PyDSParamException, match="Parameter code do not support .*?"
- ):
- Python(name, script_code)
+ with pytest.raises(PyDSParamException, match="Parameter code do not support .*?"):
+ task = Python(name, script_code)
+ task.raw_script
def foo(): # noqa: D103
@@ -82,8 +86,8 @@ def foo(): # noqa: D103
),
],
)
-def test_python_to_dict(name, script_code, raw):
- """Test task python function to_dict."""
+def test_python_get_define(name, script_code, raw):
+ """Test task python function get_define."""
code = 123
version = 1
expect = {
@@ -115,4 +119,4 @@ def test_python_to_dict(name, script_code, raw):
return_value=(code, version),
):
shell = Python(name, script_code)
- assert shell.to_dict() == expect
+ assert shell.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
index 56fae1c..e42f6dc 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py
@@ -22,33 +22,40 @@ from unittest.mock import patch
import pytest
-from pydolphinscheduler.tasks.shell import Shell, ShellTaskParams
+from pydolphinscheduler.tasks.shell import Shell
@pytest.mark.parametrize(
- "name, value",
+ "attr, expect",
[
- ("local_params", "local_params"),
- ("resource_list", "resource_list"),
- ("dependence", "dependence"),
- ("wait_start_timeout", "wait_start_timeout"),
- ("condition_result", "condition_result"),
+ (
+ {"command": "test script"},
+ {
+ "rawScript": "test script",
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "waitStartTimeout": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ },
+ )
],
)
-def test_shell_task_params_attr_setter(name, value):
- """Test shell task parameters."""
- raw_script = "echo shell task parameter"
- shell_task_params = ShellTaskParams(raw_script)
- assert raw_script == shell_task_params.raw_script
- setattr(shell_task_params, name, value)
- assert value == getattr(shell_task_params, name)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_property_task_params(mock_code_version, attr, expect):
+ """Test task shell task property."""
+ task = Shell("test-shell-task-params", **attr)
+ assert expect == task.task_params
-def test_shell_to_dict():
- """Test task shell function to_dict."""
+def test_shell_get_define():
+ """Test task shell function get_define."""
code = 123
version = 1
- name = "test_shell_to_dict"
+ name = "test_shell_get_define"
command = "echo test shell"
expect = {
"code": code,
@@ -79,4 +86,4 @@ def test_shell_to_dict():
return_value=(code, version),
):
shell = Shell(name, command)
- assert shell.to_dict() == expect
+ assert shell.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
index 499b46b..2590100 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sql.py
@@ -82,12 +82,48 @@ def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
), f"Sql {sql} expect sql type is {sql_type} but got {task.sql_type}"
+@pytest.mark.parametrize(
+ "attr, expect",
+ [
+ (
+ {"datasource_name": "datasource_name", "sql": "select 1"},
+ {
+ "sql": "select 1",
+ "type": "MYSQL",
+ "datasource": 1,
+ "sqlType": SqlType.SELECT,
+ "preStatements": [],
+ "postStatements": [],
+ "displayRows": 10,
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "waitStartTimeout": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ },
+ )
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+@patch(
+ "pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
+ return_value=({"id": 1, "type": "MYSQL"}),
+)
+def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
+ """Test task sql task property."""
+ task = Sql("test-sql-task-params", **attr)
+ assert expect == task.task_params
+
+
@patch(
"pydolphinscheduler.tasks.sql.Sql.get_datasource_info",
return_value=({"id": 1, "type": "MYSQL"}),
)
-def test_sql_to_dict(mock_datasource):
- """Test task sql function to_dict."""
+def test_sql_get_define(mock_datasource):
+ """Test task sql function get_define."""
code = 123
version = 1
name = "test_sql_dict"
@@ -128,4 +164,4 @@ def test_sql_to_dict(mock_datasource):
return_value=(code, version),
):
task = Sql(name, datasource_name, command)
- assert task.to_dict() == expect
+ assert task.get_define() == expect
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py
index 4a5388a..7f471a1 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py
@@ -23,7 +23,7 @@ from unittest.mock import patch
import pytest
from pydolphinscheduler.core.process_definition import ProcessDefinition
-from pydolphinscheduler.tasks.sub_process import SubProcess, SubProcessTaskParams
+from pydolphinscheduler.tasks.sub_process import SubProcess
TEST_SUB_PROCESS_DEFINITION_NAME = "sub-test-process-definition"
TEST_SUB_PROCESS_DEFINITION_CODE = "3643589832320"
@@ -31,22 +31,39 @@ TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
@pytest.mark.parametrize(
- "name, value",
+ "attr, expect",
[
- ("local_params", "local_params"),
- ("resource_list", "resource_list"),
- ("dependence", "dependence"),
- ("wait_start_timeout", "wait_start_timeout"),
- ("condition_result", "condition_result"),
+ (
+ {"process_definition_name": TEST_SUB_PROCESS_DEFINITION_NAME},
+ {
+ "processDefinitionCode": TEST_SUB_PROCESS_DEFINITION_CODE,
+ "localParams": [],
+ "resourceList": [],
+ "dependence": {},
+ "waitStartTimeout": {},
+ "conditionResult": {"successNode": [""], "failedNode": [""]},
+ },
+ )
],
)
-def test_sub_process_task_params_attr_setter(name, value):
- """Test sub_process task parameters."""
- process_definition_code = "3643589832320"
- sub_process_task_params = SubProcessTaskParams(process_definition_code)
- assert process_definition_code == sub_process_task_params.process_definition_code
- setattr(sub_process_task_params, name, value)
- assert value == getattr(sub_process_task_params, name)
+@patch(
+ "pydolphinscheduler.tasks.sub_process.SubProcess.get_process_definition_info",
+ return_value=(
+ {
+ "id": 1,
+ "name": TEST_SUB_PROCESS_DEFINITION_NAME,
+ "code": TEST_SUB_PROCESS_DEFINITION_CODE,
+ }
+ ),
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+def test_property_task_params(mock_code_version, mock_pd_info, attr, expect):
+ """Test task sub process property."""
+ task = SubProcess("test-sub-process-task-params", **attr)
+ assert expect == task.task_params
@patch(
@@ -59,11 +76,11 @@ def test_sub_process_task_params_attr_setter(name, value):
}
),
)
-def test_sub_process_to_dict(mock_process_definition):
- """Test task sub_process function to_dict."""
+def test_sub_process_get_define(mock_process_definition):
+ """Test task sub_process function get_define."""
code = 123
version = 1
- name = "test_sub_process_to_dict"
+ name = "test_sub_process_get_define"
expect = {
"code": code,
"name": name,
@@ -94,4 +111,4 @@ def test_sub_process_to_dict(mock_process_definition):
):
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME):
sub_process = SubProcess(name, TEST_SUB_PROCESS_DEFINITION_NAME)
- assert sub_process.to_dict() == expect
+ assert sub_process.get_define() == expect