You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/09/29 07:34:49 UTC

[dolphinscheduler] branch dev updated: [fix-#11753] send alert error alert data id (#11774)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new e27c79974d [fix-#11753] send alert error alert data id (#11774)
e27c79974d is described below

commit e27c79974d93d0d4181a0f5dbe45159084e224d3
Author: fuchanghai <33...@users.noreply.github.com>
AuthorDate: Thu Sep 29 15:34:40 2022 +0800

    [fix-#11753] send alert error alert data id (#11774)
    
    * [fix-#11753] send alert error alert data id
    
    Co-authored-by: fuchanghai <ch...@marketingforce.com>
---
 .../api/controller/ExecutorController.java              |  4 ++--
 .../dolphinscheduler/api/python/PythonGateway.java      |  2 +-
 .../dolphinscheduler/api/service/ExecutorService.java   |  2 +-
 .../api/service/impl/ExecutorServiceImpl.java           |  4 ++--
 .../api/controller/ExecutorControllerTest.java          |  6 +++---
 .../api/service/ExecutorServiceTest.java                | 14 +++++++-------
 .../java/org/apache/dolphinscheduler/dao/AlertDao.java  | 17 ++++++++++++++++-
 .../dolphinscheduler/dao/entity/ProcessDefinition.java  |  3 +--
 .../server/master/runner/WorkflowExecuteRunnable.java   |  4 +---
 .../service/alert/ProcessAlertManager.java              |  8 --------
 10 files changed, 34 insertions(+), 30 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index c120fb0fc1..98e69ab1f4 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -139,7 +139,7 @@ public class ExecutorController extends BaseController {
                                        @RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
                                        @RequestParam(value = "execType", required = false) CommandType execType,
                                        @RequestParam(value = "warningType") WarningType warningType,
-                                       @RequestParam(value = "warningGroupId", required = false, defaultValue = "0") Integer warningGroupId,
+                                       @RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
                                        @RequestParam(value = "runMode", required = false) RunMode runMode,
                                        @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
                                        @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@@ -226,7 +226,7 @@ public class ExecutorController extends BaseController {
                                             @RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
                                             @RequestParam(value = "execType", required = false) CommandType execType,
                                             @RequestParam(value = "warningType") WarningType warningType,
-                                            @RequestParam(value = "warningGroupId", required = false) int warningGroupId,
+                                            @RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
                                             @RequestParam(value = "runMode", required = false) RunMode runMode,
                                             @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
                                             @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index 93b3c91248..89fcc5fcf3 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -333,7 +333,7 @@ public class PythonGateway {
                                     String cronTime,
                                     String workerGroup,
                                     String warningType,
-                                    int warningGroupId,
+                                    Integer warningGroupId,
                                     Integer timeout
     ) {
         User user = usersService.queryUser(userName);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index df44dc1445..9aaabb36b6 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -61,7 +61,7 @@ public interface ExecutorService {
     Map<String, Object> execProcessInstance(User loginUser, long projectCode,
                                             long processDefinitionCode, String cronTime, CommandType commandType,
                                             FailureStrategy failureStrategy, String startNodeList,
-                                            TaskDependType taskDependType, WarningType warningType, int warningGroupId,
+                                            TaskDependType taskDependType, WarningType warningType, Integer warningGroupId,
                                             RunMode runMode,
                                             Priority processInstancePriority, String workerGroup, Long environmentCode,
                                             Integer timeout,
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index a5bfe2780a..41ac529318 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -173,7 +173,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
                                                    String cronTime, CommandType commandType,
                                                    FailureStrategy failureStrategy, String startNodeList,
                                                    TaskDependType taskDependType, WarningType warningType,
-                                                   int warningGroupId, RunMode runMode,
+                                                   Integer warningGroupId, RunMode runMode,
                                                    Priority processInstancePriority, String workerGroup,
                                                    Long environmentCode, Integer timeout,
                                                    Map<String, String> startParams, Integer expectedParallelismNumber,
@@ -714,7 +714,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
      */
     private int createCommand(CommandType commandType, long processDefineCode, TaskDependType nodeDep,
                               FailureStrategy failureStrategy, String startNodeList, String schedule,
-                              WarningType warningType, int executorId, int warningGroupId, RunMode runMode,
+                              WarningType warningType, int executorId, Integer warningGroupId, RunMode runMode,
                               Priority processInstancePriority, String workerGroup, Long environmentCode,
                               Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun,
                               int testFlag, ComplementDependentMode complementDependentMode) {
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
index f55aa9f8be..fcfa068ae4 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
@@ -209,9 +209,9 @@ public class ExecutorControllerTest extends AbstractControllerTest {
 
         when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
 				eq(scheduleTime), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType),
-                eq(0), eq(null), eq(null), eq("default"), eq(-1L),
-                eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0), eq(0),
-                eq(complementDependentMode))).thenReturn(executeServiceResult);
+        eq(null), eq(null), eq(null), eq("default"), eq(-1L),
+        eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0), eq(0),
+        eq(complementDependentMode))).thenReturn(executeServiceResult);
 
 		//When
         final MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance", projectCode)
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index e50fb31ce9..8a8fd243a1 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -220,7 +220,7 @@ public class ExecutorServiceTest {
                 "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
                 CommandType.START_PROCESS,
                 null, null,
-                null, null, 0,
+                null, null, null,
                 RunMode.RUN_MODE_SERIAL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
@@ -242,7 +242,7 @@ public class ExecutorServiceTest {
                 "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
                 CommandType.START_PROCESS,
                 null, "n1,n2",
-                null, null, 0,
+                null, null, null,
                 RunMode.RUN_MODE_SERIAL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
@@ -308,7 +308,7 @@ public class ExecutorServiceTest {
                 "{\"complementStartDate\":\"2022-01-07 12:12:12\",\"complementEndDate\":\"2022-01-06 12:12:12\"}",
                 CommandType.COMPLEMENT_DATA,
                 null, null,
-                null, null, 0,
+                null, null, null,
                 RunMode.RUN_MODE_SERIAL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
@@ -329,7 +329,7 @@ public class ExecutorServiceTest {
                 "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
                 CommandType.COMPLEMENT_DATA,
                 null, null,
-                null, null, 0,
+                null, null, null,
                 RunMode.RUN_MODE_SERIAL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
@@ -350,7 +350,7 @@ public class ExecutorServiceTest {
                 "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
                 CommandType.COMPLEMENT_DATA,
                 null, null,
-                null, null, 0,
+                null, null, null,
                 RunMode.RUN_MODE_PARALLEL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
@@ -372,7 +372,7 @@ public class ExecutorServiceTest {
                 "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
                 CommandType.COMPLEMENT_DATA,
                 null, null,
-                null, null, 0,
+                null, null, null,
                 RunMode.RUN_MODE_PARALLEL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 15, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
@@ -390,7 +390,7 @@ public class ExecutorServiceTest {
                 "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
                 CommandType.COMPLEMENT_DATA,
                 null, null,
-                null, null, 0,
+                null, null, null,
                 RunMode.RUN_MODE_PARALLEL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
index 169aa7aab9..dfdf4521a6 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.dao;
 
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.dolphinscheduler.common.enums.AlertEvent;
 import org.apache.dolphinscheduler.common.enums.AlertStatus;
 import org.apache.dolphinscheduler.common.enums.AlertType;
@@ -48,6 +49,8 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
@@ -60,6 +63,11 @@ import com.google.common.collect.Lists;
 @Component
 public class AlertDao {
 
+    /**
+     * logger of AlertDao
+     */
+    private static final Logger logger = LoggerFactory.getLogger(AlertDao.class);
+  
     private static final int QUERY_ALERT_THRESHOLD = 100;
 
     @Value("${alert.alarm-suppression.crash:60}")
@@ -84,9 +92,16 @@ public class AlertDao {
      * @return add alert result
      */
     public int addAlert(Alert alert) {
+        if (null == alert.getAlertGroupId() || NumberUtils.INTEGER_ZERO.equals(alert.getAlertGroupId())) {
+            logger.warn("the value of alertGroupId is null or 0 ");
+            return 0;
+        }
+
         String sign = generateSign(alert);
         alert.setSign(sign);
-        return alertMapper.insert(alert);
+        int count = alertMapper.insert(alert);
+        logger.info("add alert to db , alert: {}", alert);
+        return count;
     }
 
     /**
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
index 6e7414c286..27df7bd573 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
@@ -169,7 +169,7 @@ public class ProcessDefinition {
      * warningGroupId
      */
     @TableField(exist = false)
-    private int warningGroupId;
+    private Integer warningGroupId;
 
     /**
      * execution type
@@ -226,5 +226,4 @@ public class ProcessDefinition {
 
         return globalParamMap;
     }
-
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 3d448f2ebf..7716314444 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -716,9 +716,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
             checkSerialProcess(processDefinition);
         }
         ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
-        if (processAlertManager.isNeedToSendWarning(processInstance)) {
-            processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
-        }
+        processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
         if (processInstance.getState().isSuccess()) {
             processAlertManager.closeAlert(processInstance);
         }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
index ab8b162c08..2604354fbc 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
@@ -200,7 +200,6 @@ public class ProcessAlertManager {
                     processInstance.getWarningGroupId() == null ? 1 : processInstance.getWarningGroupId());
             alert.setAlertType(AlertType.FAULT_TOLERANCE_WARNING);
             alertDao.addAlert(alert);
-            logger.info("add alert to db , alert : {}", alert);
 
         } catch (Exception e) {
             logger.error("send alert failed:{} ", e.getMessage());
@@ -217,13 +216,10 @@ public class ProcessAlertManager {
     public void sendAlertProcessInstance(ProcessInstance processInstance,
                                          List<TaskInstance> taskInstances,
                                          ProjectUser projectUser) {
-
         if (!isNeedToSendWarning(processInstance)) {
             return;
         }
-
         Alert alert = new Alert();
-
         String cmdName = getCommandCnName(processInstance.getCommandType());
         String success = processInstance.getState().isSuccess() ? "success" : "failed";
         alert.setTitle(cmdName + " " + success);
@@ -238,7 +234,6 @@ public class ProcessAlertManager {
         alert.setAlertType(processInstance.getState().isSuccess() ? AlertType.PROCESS_INSTANCE_SUCCESS
                 : AlertType.PROCESS_INSTANCE_FAILURE);
         alertDao.addAlert(alert);
-        logger.info("add alert to db , alert: {}", alert);
     }
 
     /**
@@ -325,7 +320,6 @@ public class ProcessAlertManager {
         alert.setAlertType(processInstance.getState().isSuccess() ? AlertType.PROCESS_INSTANCE_SUCCESS
                 : AlertType.PROCESS_INSTANCE_FAILURE);
         alertDao.addAlert(alert);
-        logger.info("add alert to db , alert: {}", alert);
     }
 
     /**
@@ -342,7 +336,6 @@ public class ProcessAlertManager {
         alert.setProcessInstanceId(processInstance.getId());
         alert.setAlertType(AlertType.TASK_FAILURE);
         alertDao.addAlert(alert);
-        logger.info("add alert to db , alert: {}", alert);
     }
 
     /**
@@ -440,6 +433,5 @@ public class ProcessAlertManager {
         alert.setProcessInstanceId(processInstance.getId());
         alert.setAlertType(AlertType.PROCESS_INSTANCE_BLOCKED);
         alertDao.addAlert(alert);
-        logger.info("add alert to db, alert: {}", alert);
     }
 }