You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/04/27 14:02:21 UTC
[dolphinscheduler] branch dev updated: [improve][python] Specify warning group when deploy process-definition (#9773)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 828034f8b9 [improve][python] Specify warning group when deploy process-definition (#9773)
828034f8b9 is described below
commit 828034f8b9760ed51cd5657e31b4e4f462c643c0
Author: 陈家名 <13...@163.com>
AuthorDate: Wed Apr 27 22:02:14 2022 +0800
[improve][python] Specify warning group when deploy process-definition (#9773)
---
.../dolphinscheduler/api/python/PythonGateway.java | 28 ++++++++++++++--------
.../pydolphinscheduler/docs/source/config.rst | 2 ++
.../src/pydolphinscheduler/core/configuration.py | 3 +++
.../pydolphinscheduler/core/default_config.yaml | 1 +
.../pydolphinscheduler/core/process_definition.py | 16 +++++++++++++
.../tests/core/test_configuration.py | 6 +++++
.../tests/core/test_process_definition.py | 21 ++++++++++++++++
.../tests/utils/test_yaml_parser.py | 1 +
8 files changed, 68 insertions(+), 10 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index 8aea47471b..4fb74f1e66 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -83,8 +83,6 @@ import py4j.GatewayServer;
public class PythonGateway {
private static final Logger logger = LoggerFactory.getLogger(PythonGateway.class);
- private static final WarningType DEFAULT_WARNING_TYPE = WarningType.NONE;
- private static final int DEFAULT_WARNING_GROUP_ID = 0;
private static final FailureStrategy DEFAULT_FAILURE_STRATEGY = FailureStrategy.CONTINUE;
private static final Priority DEFAULT_PRIORITY = Priority.MEDIUM;
private static final Long DEFAULT_ENVIRONMENT_CODE = -1L;
@@ -200,6 +198,8 @@ public class PythonGateway {
* @param globalParams global params
* @param schedule schedule for process definition, will not set schedule if null,
* and if would always fresh exists schedule if not null
+ * @param warningType warning type
+ * @param warningGroupId warning group id
* @param locations locations json object about all tasks
* @param timeout timeout for process definition working, if running time longer than timeout,
* task will mark as fail
@@ -215,6 +215,8 @@ public class PythonGateway {
String description,
String globalParams,
String schedule,
+ String warningType,
+ int warningGroupId,
String locations,
int timeout,
String workerGroup,
@@ -244,7 +246,7 @@ public class PythonGateway {
// Fresh process definition schedule
if (schedule != null) {
- createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup);
+ createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId);
}
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
return processDefinitionCode;
@@ -283,25 +285,29 @@ public class PythonGateway {
* @param processDefinitionCode process definition code
* @param schedule schedule expression
* @param workerGroup work group
+ * @param warningType warning type
+ * @param warningGroupId warning group id
*/
private void createOrUpdateSchedule(User user,
long projectCode,
long processDefinitionCode,
String schedule,
- String workerGroup) {
+ String workerGroup,
+ String warningType,
+ int warningGroupId) {
Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
// create or update schedule
int scheduleId;
if (scheduleObj == null) {
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
- Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, DEFAULT_WARNING_TYPE,
- DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
+ Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, WarningType.valueOf(warningType),
+ warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
scheduleId = (int) result.get("scheduleId");
} else {
scheduleId = scheduleObj.getId();
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
- schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, DEFAULT_WARNING_TYPE,
- DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
+ schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType),
+ warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
}
schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
}
@@ -311,6 +317,8 @@ public class PythonGateway {
String processDefinitionName,
String cronTime,
String workerGroup,
+ String warningType,
+ int warningGroupId,
Integer timeout
) {
User user = usersService.queryUser(userName);
@@ -328,8 +336,8 @@ public class PythonGateway {
DEFAULT_FAILURE_STRATEGY,
null,
DEFAULT_TASK_DEPEND_TYPE,
- DEFAULT_WARNING_TYPE,
- DEFAULT_WARNING_GROUP_ID,
+ WarningType.valueOf(warningType),
+ warningGroupId,
DEFAULT_RUN_MODE,
DEFAULT_PRIORITY,
workerGroup,
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst
index 66c7f08525..b7879dd94e 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst
@@ -108,6 +108,8 @@ All environment variables as below, and you could modify their value via `Bash <
| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. |
+ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. |
++ +------------------------------------+------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. |
+------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+
.. note::
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py
index 14f2fe9fce..03ac0977e4 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py
@@ -183,5 +183,8 @@ WORKFLOW_WORKER_GROUP = os.environ.get(
WORKFLOW_TIME_ZONE = os.environ.get(
"PYDS_WORKFLOW_TIME_ZONE", configs.get("default.workflow.time_zone")
)
+WORKFLOW_WARNING_TYPE = os.environ.get(
+ "PYDS_WORKFLOW_WARNING_TYPE", configs.get("default.workflow.warning_type")
+)
# End Common Configuration Setting
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml
index 410f64d6d3..e437e55c31 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml
@@ -49,3 +49,4 @@ default:
queue: queuePythonGateway
worker_group: default
time_zone: Asia/Shanghai
+ warning_type: NONE
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index 7615226de6..649e2ce51a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -80,6 +80,8 @@ class ProcessDefinition(Base):
"_project",
"_tenant",
"worker_group",
+ "warning_type",
+ "warning_group_id",
"timeout",
"release_state",
"param",
@@ -100,6 +102,8 @@ class ProcessDefinition(Base):
project: Optional[str] = configuration.WORKFLOW_PROJECT,
tenant: Optional[str] = configuration.WORKFLOW_TENANT,
worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
+ warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE,
+ warning_group_id: Optional[int] = 0,
timeout: Optional[int] = 0,
release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
param: Optional[Dict] = None,
@@ -113,6 +117,14 @@ class ProcessDefinition(Base):
self._project = project
self._tenant = tenant
self.worker_group = worker_group
+ self.warning_type = warning_type
+ if warning_type.strip().upper() not in ("FAILURE", "SUCCESS", "ALL", "NONE"):
+ raise PyDSParamException(
+ "Parameter `warning_type` with unexpect value `%s`", warning_type
+ )
+ else:
+ self.warning_type = warning_type.strip().upper()
+ self.warning_group_id = warning_group_id
self.timeout = timeout
self.release_state = release_state
self.param = param
@@ -361,6 +373,8 @@ class ProcessDefinition(Base):
str(self.description) if self.description else "",
json.dumps(self.param_json),
json.dumps(self.schedule_json) if self.schedule_json else None,
+ self.warning_type,
+ self.warning_group_id,
json.dumps(self.task_location),
self.timeout,
self.worker_group,
@@ -384,5 +398,7 @@ class ProcessDefinition(Base):
self.name,
"",
self.worker_group,
+ self.warning_type,
+ self.warning_group_id,
24 * 3600,
)
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py
index 394fd33d92..c7e217a46f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py
@@ -187,6 +187,7 @@ def test_get_configs_build_in():
("default.workflow.queue", "queuePythonGateway", "editQueuePythonGateway"),
("default.workflow.worker_group", "default", "specific"),
("default.workflow.time_zone", "Asia/Shanghai", "Asia/Beijing"),
+ ("default.workflow.warning_type", "NONE", "ALL"),
],
)
def test_single_config_get_set(teardown_file_env, key: str, val: Any, new_val: Any):
@@ -222,6 +223,7 @@ def test_single_config_get_set_not_exists_key():
("WORKFLOW_QUEUE", "queuePythonGateway"),
("WORKFLOW_WORKER_GROUP", "default"),
("WORKFLOW_TIME_ZONE", "Asia/Shanghai"),
+ ("WORKFLOW_WARNING_TYPE", "NONE"),
],
)
def test_get_configuration(config_name: str, expect: Any):
@@ -250,6 +252,7 @@ def test_get_configuration(config_name: str, expect: Any):
("WORKFLOW_QUEUE", "queuePythonGateway", "envQueuePythonGateway"),
("WORKFLOW_WORKER_GROUP", "default", "custom"),
("WORKFLOW_TIME_ZONE", "Asia/Shanghai", "America/Los_Angeles"),
+ ("WORKFLOW_WARNING_TYPE", "NONE", "ALL"),
],
)
def test_get_configuration_env(config_name: str, src: Any, dest: Any):
@@ -262,5 +265,8 @@ def test_get_configuration_env(config_name: str, src: Any, dest: Any):
importlib.reload(configuration)
assert getattr(configuration, config_name) == dest
+ # pop and reload configuration to test whether this config equal to `src` value
os.environ.pop(env_name, None)
+ importlib.reload(configuration)
+ assert getattr(configuration, config_name) == src
assert env_name not in os.environ
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
index e311be2ec1..88028f72c7 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
@@ -65,6 +65,8 @@ def test_process_definition_key_attr(func):
),
),
("worker_group", configuration.WORKFLOW_WORKER_GROUP),
+ ("warning_type", configuration.WORKFLOW_WARNING_TYPE),
+ ("warning_group_id", 0),
("release_state", ProcessDefinitionReleaseState.ONLINE),
],
)
@@ -85,6 +87,8 @@ def test_process_definition_default_value(name, value):
("schedule", str, "schedule"),
("timezone", str, "timezone"),
("worker_group", str, "worker_group"),
+ ("warning_type", str, "FAILURE"),
+ ("warning_group_id", int, 1),
("timeout", int, 1),
("release_state", str, "OFFLINE"),
("param", dict, {"key": "value"}),
@@ -152,6 +156,21 @@ def test__parse_datetime_not_support_type(val: Any):
pd._parse_datetime(val)
+@pytest.mark.parametrize(
+ "val",
+ [
+ "ALLL",
+ "nonee",
+ ],
+)
+def test_warn_type_not_support_type(val: str):
+ """Test process definition param warning_type not support type error."""
+ with pytest.raises(
+ PyDSParamException, match="Parameter `warning_type` with unexpect value.*?"
+ ):
+ ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, warning_type=val)
+
+
@pytest.mark.parametrize(
"param, expect",
[
@@ -234,6 +253,8 @@ def test_process_definition_get_define_without_task():
"project": configuration.WORKFLOW_PROJECT,
"tenant": configuration.WORKFLOW_TENANT,
"workerGroup": configuration.WORKFLOW_WORKER_GROUP,
+ "warningType": configuration.WORKFLOW_WARNING_TYPE,
+ "warningGroupId": 0,
"timeout": 0,
"releaseState": ProcessDefinitionReleaseState.ONLINE,
"param": None,
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py b/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py
index aa6d9eee6c..4b1b05ed33 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py
@@ -61,6 +61,7 @@ expects = [
"default.workflow.queue": ("queuePythonGateway", "SmithEdit"),
"default.workflow.worker_group": ("default", "SmithEdit"),
"default.workflow.time_zone": ("Asia/Shanghai", "SmithEdit"),
+ "default.workflow.warning_type": ("NONE", "SmithEdit"),
},
]