You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/04/27 14:02:21 UTC

[dolphinscheduler] branch dev updated: [improve][python] Specify warning group when deploy process-definition (#9773)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 828034f8b9 [improve][python] Specify warning group when deploy process-definition (#9773)
828034f8b9 is described below

commit 828034f8b9760ed51cd5657e31b4e4f462c643c0
Author: 陈家名 <13...@163.com>
AuthorDate: Wed Apr 27 22:02:14 2022 +0800

    [improve][python] Specify warning group when deploy process-definition (#9773)
---
 .../dolphinscheduler/api/python/PythonGateway.java | 28 ++++++++++++++--------
 .../pydolphinscheduler/docs/source/config.rst      |  2 ++
 .../src/pydolphinscheduler/core/configuration.py   |  3 +++
 .../pydolphinscheduler/core/default_config.yaml    |  1 +
 .../pydolphinscheduler/core/process_definition.py  | 16 +++++++++++++
 .../tests/core/test_configuration.py               |  6 +++++
 .../tests/core/test_process_definition.py          | 21 ++++++++++++++++
 .../tests/utils/test_yaml_parser.py                |  1 +
 8 files changed, 68 insertions(+), 10 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index 8aea47471b..4fb74f1e66 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -83,8 +83,6 @@ import py4j.GatewayServer;
 public class PythonGateway {
     private static final Logger logger = LoggerFactory.getLogger(PythonGateway.class);
 
-    private static final WarningType DEFAULT_WARNING_TYPE = WarningType.NONE;
-    private static final int DEFAULT_WARNING_GROUP_ID = 0;
     private static final FailureStrategy DEFAULT_FAILURE_STRATEGY = FailureStrategy.CONTINUE;
     private static final Priority DEFAULT_PRIORITY = Priority.MEDIUM;
     private static final Long DEFAULT_ENVIRONMENT_CODE = -1L;
@@ -200,6 +198,8 @@ public class PythonGateway {
      * @param globalParams global params
      * @param schedule schedule for process definition, will not set schedule if null,
      * and if would always fresh exists schedule if not null
+     * @param warningType warning type
+     * @param warningGroupId warning group id
      * @param locations locations json object about all tasks
      * @param timeout timeout for process definition working, if running time longer than timeout,
      * task will mark as fail
@@ -215,6 +215,8 @@ public class PythonGateway {
                                                 String description,
                                                 String globalParams,
                                                 String schedule,
+                                                String warningType,
+                                                int warningGroupId,
                                                 String locations,
                                                 int timeout,
                                                 String workerGroup,
@@ -244,7 +246,7 @@ public class PythonGateway {
 
         // Fresh process definition schedule 
         if (schedule != null) {
-            createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup);
+            createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId);
         }
         processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
         return processDefinitionCode;
@@ -283,25 +285,29 @@ public class PythonGateway {
      * @param processDefinitionCode process definition code
      * @param schedule schedule expression
      * @param workerGroup work group
+     * @param warningType warning type
+     * @param warningGroupId warning group id
      */
     private void createOrUpdateSchedule(User user,
                                         long projectCode,
                                         long processDefinitionCode,
                                         String schedule,
-                                        String workerGroup) {
+                                        String workerGroup,
+                                        String warningType,
+                                        int warningGroupId) {
         Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
         // create or update schedule
         int scheduleId;
         if (scheduleObj == null) {
             processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE);
-            Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, DEFAULT_WARNING_TYPE,
-                    DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
+            Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, WarningType.valueOf(warningType),
+                    warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
             scheduleId = (int) result.get("scheduleId");
         } else {
             scheduleId = scheduleObj.getId();
             processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
-            schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, DEFAULT_WARNING_TYPE,
-                    DEFAULT_WARNING_GROUP_ID, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
+            schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType),
+                    warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
         }
         schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
     }
@@ -311,6 +317,8 @@ public class PythonGateway {
                                     String processDefinitionName,
                                     String cronTime,
                                     String workerGroup,
+                                    String warningType,
+                                    int warningGroupId,
                                     Integer timeout
     ) {
         User user = usersService.queryUser(userName);
@@ -328,8 +336,8 @@ public class PythonGateway {
                 DEFAULT_FAILURE_STRATEGY,
                 null,
                 DEFAULT_TASK_DEPEND_TYPE,
-                DEFAULT_WARNING_TYPE,
-                DEFAULT_WARNING_GROUP_ID,
+                WarningType.valueOf(warningType),
+                warningGroupId,
                 DEFAULT_RUN_MODE,
                 DEFAULT_PRIORITY,
                 workerGroup,
diff --git a/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst b/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst
index 66c7f08525..b7879dd94e 100644
--- a/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst
+++ b/dolphinscheduler-python/pydolphinscheduler/docs/source/config.rst
@@ -108,6 +108,8 @@ All environment variables as below, and you could modify their value via `Bash <
 |                  | ``PYDS_WORKFLOW_WORKER_GROUP``     | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. |
 +                  +------------------------------------+------------------------------------------------------------------------------------------------------------------+
 |                  | ``PYDS_WORKFLOW_TIME_ZONE``        | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``.     |
++                  +------------------------------------+------------------------------------------------------------------------------------------------------------------+
+|                  | ``PYDS_WORKFLOW_WARNING_TYPE``     | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. |
 +------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------+
 
 .. note::
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py
index 14f2fe9fce..03ac0977e4 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/configuration.py
@@ -183,5 +183,8 @@ WORKFLOW_WORKER_GROUP = os.environ.get(
 WORKFLOW_TIME_ZONE = os.environ.get(
     "PYDS_WORKFLOW_TIME_ZONE", configs.get("default.workflow.time_zone")
 )
+WORKFLOW_WARNING_TYPE = os.environ.get(
+    "PYDS_WORKFLOW_WARNING_TYPE", configs.get("default.workflow.warning_type")
+)
 
 # End Common Configuration Setting
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml
index 410f64d6d3..e437e55c31 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/default_config.yaml
@@ -49,3 +49,4 @@ default:
     queue: queuePythonGateway
     worker_group: default
     time_zone: Asia/Shanghai
+    warning_type: NONE
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index 7615226de6..649e2ce51a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -80,6 +80,8 @@ class ProcessDefinition(Base):
         "_project",
         "_tenant",
         "worker_group",
+        "warning_type",
+        "warning_group_id",
         "timeout",
         "release_state",
         "param",
@@ -100,6 +102,8 @@ class ProcessDefinition(Base):
         project: Optional[str] = configuration.WORKFLOW_PROJECT,
         tenant: Optional[str] = configuration.WORKFLOW_TENANT,
         worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
+        warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE,
+        warning_group_id: Optional[int] = 0,
         timeout: Optional[int] = 0,
         release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
         param: Optional[Dict] = None,
@@ -113,6 +117,14 @@ class ProcessDefinition(Base):
         self._project = project
         self._tenant = tenant
         self.worker_group = worker_group
+        self.warning_type = warning_type
+        if warning_type.strip().upper() not in ("FAILURE", "SUCCESS", "ALL", "NONE"):
+            raise PyDSParamException(
+                "Parameter `warning_type` with unexpect value `%s`", warning_type
+            )
+        else:
+            self.warning_type = warning_type.strip().upper()
+        self.warning_group_id = warning_group_id
         self.timeout = timeout
         self.release_state = release_state
         self.param = param
@@ -361,6 +373,8 @@ class ProcessDefinition(Base):
             str(self.description) if self.description else "",
             json.dumps(self.param_json),
             json.dumps(self.schedule_json) if self.schedule_json else None,
+            self.warning_type,
+            self.warning_group_id,
             json.dumps(self.task_location),
             self.timeout,
             self.worker_group,
@@ -384,5 +398,7 @@ class ProcessDefinition(Base):
             self.name,
             "",
             self.worker_group,
+            self.warning_type,
+            self.warning_group_id,
             24 * 3600,
         )
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py
index 394fd33d92..c7e217a46f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_configuration.py
@@ -187,6 +187,7 @@ def test_get_configs_build_in():
         ("default.workflow.queue", "queuePythonGateway", "editQueuePythonGateway"),
         ("default.workflow.worker_group", "default", "specific"),
         ("default.workflow.time_zone", "Asia/Shanghai", "Asia/Beijing"),
+        ("default.workflow.warning_type", "NONE", "ALL"),
     ],
 )
 def test_single_config_get_set(teardown_file_env, key: str, val: Any, new_val: Any):
@@ -222,6 +223,7 @@ def test_single_config_get_set_not_exists_key():
         ("WORKFLOW_QUEUE", "queuePythonGateway"),
         ("WORKFLOW_WORKER_GROUP", "default"),
         ("WORKFLOW_TIME_ZONE", "Asia/Shanghai"),
+        ("WORKFLOW_WARNING_TYPE", "NONE"),
     ],
 )
 def test_get_configuration(config_name: str, expect: Any):
@@ -250,6 +252,7 @@ def test_get_configuration(config_name: str, expect: Any):
         ("WORKFLOW_QUEUE", "queuePythonGateway", "envQueuePythonGateway"),
         ("WORKFLOW_WORKER_GROUP", "default", "custom"),
         ("WORKFLOW_TIME_ZONE", "Asia/Shanghai", "America/Los_Angeles"),
+        ("WORKFLOW_WARNING_TYPE", "NONE", "ALL"),
     ],
 )
 def test_get_configuration_env(config_name: str, src: Any, dest: Any):
@@ -262,5 +265,8 @@ def test_get_configuration_env(config_name: str, src: Any, dest: Any):
     importlib.reload(configuration)
     assert getattr(configuration, config_name) == dest
 
+    # pop and reload configuration to test whether this config equal to `src` value
     os.environ.pop(env_name, None)
+    importlib.reload(configuration)
+    assert getattr(configuration, config_name) == src
     assert env_name not in os.environ
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
index e311be2ec1..88028f72c7 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
@@ -65,6 +65,8 @@ def test_process_definition_key_attr(func):
             ),
         ),
         ("worker_group", configuration.WORKFLOW_WORKER_GROUP),
+        ("warning_type", configuration.WORKFLOW_WARNING_TYPE),
+        ("warning_group_id", 0),
         ("release_state", ProcessDefinitionReleaseState.ONLINE),
     ],
 )
@@ -85,6 +87,8 @@ def test_process_definition_default_value(name, value):
         ("schedule", str, "schedule"),
         ("timezone", str, "timezone"),
         ("worker_group", str, "worker_group"),
+        ("warning_type", str, "FAILURE"),
+        ("warning_group_id", int, 1),
         ("timeout", int, 1),
         ("release_state", str, "OFFLINE"),
         ("param", dict, {"key": "value"}),
@@ -152,6 +156,21 @@ def test__parse_datetime_not_support_type(val: Any):
             pd._parse_datetime(val)
 
 
+@pytest.mark.parametrize(
+    "val",
+    [
+        "ALLL",
+        "nonee",
+    ],
+)
+def test_warn_type_not_support_type(val: str):
+    """Test process definition param warning_type not support type error."""
+    with pytest.raises(
+        PyDSParamException, match="Parameter `warning_type` with unexpect value.*?"
+    ):
+        ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, warning_type=val)
+
+
 @pytest.mark.parametrize(
     "param, expect",
     [
@@ -234,6 +253,8 @@ def test_process_definition_get_define_without_task():
         "project": configuration.WORKFLOW_PROJECT,
         "tenant": configuration.WORKFLOW_TENANT,
         "workerGroup": configuration.WORKFLOW_WORKER_GROUP,
+        "warningType": configuration.WORKFLOW_WARNING_TYPE,
+        "warningGroupId": 0,
         "timeout": 0,
         "releaseState": ProcessDefinitionReleaseState.ONLINE,
         "param": None,
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py b/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py
index aa6d9eee6c..4b1b05ed33 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_yaml_parser.py
@@ -61,6 +61,7 @@ expects = [
         "default.workflow.queue": ("queuePythonGateway", "SmithEdit"),
         "default.workflow.worker_group": ("default", "SmithEdit"),
         "default.workflow.time_zone": ("Asia/Shanghai", "SmithEdit"),
+        "default.workflow.warning_type": ("NONE", "SmithEdit"),
     },
 ]