You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ch...@apache.org on 2022/11/07 06:58:38 UTC

[dolphinscheduler] branch 3.0.2-prepare updated: [fix-#11753] send alert error alert data id (#11774) (#12762)

This is an automated email from the ASF dual-hosted git repository.

chufenggao pushed a commit to branch 3.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.0.2-prepare by this push:
     new ffb8b6ef70 [fix-#11753] send alert error alert data id (#11774) (#12762)
ffb8b6ef70 is described below

commit ffb8b6ef702a47ec4d7708ada9f830ceeaa715f3
Author: Eric Gao <er...@gmail.com>
AuthorDate: Mon Nov 7 14:58:31 2022 +0800

    [fix-#11753] send alert error alert data id (#11774) (#12762)
    
    Co-authored-by: fuchanghai <ch...@marketingforce.com>
---
 .../api/controller/ExecutorController.java            |  4 ++--
 .../dolphinscheduler/api/python/PythonGateway.java    |  2 +-
 .../dolphinscheduler/api/service/ExecutorService.java |  2 +-
 .../api/service/impl/ExecutorServiceImpl.java         |  4 ++--
 .../api/controller/ExecutorControllerTest.java        |  2 +-
 .../api/service/ExecutorServiceTest.java              | 14 +++++++-------
 .../org/apache/dolphinscheduler/dao/AlertDao.java     | 19 ++++++++++++++++++-
 .../dao/entity/ProcessDefinition.java                 |  2 +-
 .../server/master/runner/WorkflowExecuteRunnable.java |  8 ++++----
 .../service/alert/ProcessAlertManager.java            |  8 --------
 10 files changed, 37 insertions(+), 28 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index 6532dbda1c..ec68b4b7f7 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -132,7 +132,7 @@ public class ExecutorController extends BaseController {
                                        @RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
                                        @RequestParam(value = "execType", required = false) CommandType execType,
                                        @RequestParam(value = "warningType") WarningType warningType,
-                                       @RequestParam(value = "warningGroupId", required = false, defaultValue = "0") Integer warningGroupId,
+                                       @RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
                                        @RequestParam(value = "runMode", required = false) RunMode runMode,
                                        @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
                                        @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@@ -216,7 +216,7 @@ public class ExecutorController extends BaseController {
                                             @RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
                                             @RequestParam(value = "execType", required = false) CommandType execType,
                                             @RequestParam(value = "warningType") WarningType warningType,
-                                            @RequestParam(value = "warningGroupId", required = false) int warningGroupId,
+                                            @RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
                                             @RequestParam(value = "runMode", required = false) RunMode runMode,
                                             @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
                                             @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index b25feb13e4..a651dddf63 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -325,7 +325,7 @@ public class PythonGateway {
                                     String cronTime,
                                     String workerGroup,
                                     String warningType,
-                                    int warningGroupId,
+                                    Integer warningGroupId,
                                     Integer timeout
     ) {
         User user = usersService.queryUser(userName);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index b5cf04bd5e..acab76f936 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -61,7 +61,7 @@ public interface ExecutorService {
     Map<String, Object> execProcessInstance(User loginUser, long projectCode,
                                             long processDefinitionCode, String cronTime, CommandType commandType,
                                             FailureStrategy failureStrategy, String startNodeList,
-                                            TaskDependType taskDependType, WarningType warningType, int warningGroupId,
+                                            TaskDependType taskDependType, WarningType warningType, Integer warningGroupId,
                                             RunMode runMode,
                                             Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout,
                                             Map<String, String> startParams, Integer expectedParallelismNumber,
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 802d3990e0..71a5874404 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -160,7 +160,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
     public Map<String, Object> execProcessInstance(User loginUser, long projectCode,
                                                    long processDefinitionCode, String cronTime, CommandType commandType,
                                                    FailureStrategy failureStrategy, String startNodeList,
-                                                   TaskDependType taskDependType, WarningType warningType, int warningGroupId,
+                                                   TaskDependType taskDependType, WarningType warningType, Integer warningGroupId,
                                                    RunMode runMode,
                                                    Priority processInstancePriority, String workerGroup, Long environmentCode,Integer timeout,
                                                    Map<String, String> startParams, Integer expectedParallelismNumber,
@@ -613,7 +613,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
     private int createCommand(CommandType commandType, long processDefineCode,
                               TaskDependType nodeDep, FailureStrategy failureStrategy,
                               String startNodeList, String schedule, WarningType warningType,
-                              int executorId, int warningGroupId,
+                              int executorId, Integer warningGroupId,
                               RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode,
                               Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun, ComplementDependentMode complementDependentMode) {
 
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
index 7faed0bde0..710244eb13 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
@@ -206,7 +206,7 @@ public class ExecutorControllerTest extends AbstractControllerTest {
 
         when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
 				eq(scheduleTime), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType),
-                eq(0), eq(null), eq(null), eq("default"), eq(-1L),
+                eq(null), eq(null), eq(null), eq("default"), eq(-1L),
                 eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0),
                 eq(complementDependentMode))).thenReturn(executeServiceResult);
 
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 9c0c1c3231..0e2c2ba5f1 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -197,7 +197,7 @@ public class ExecutorServiceTest {
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
                 processDefinitionCode, cronTime, CommandType.START_PROCESS,
                 null, null,
-                null, null, 0,
+                null, null, null,
                 RunMode.RUN_MODE_SERIAL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
@@ -216,7 +216,7 @@ public class ExecutorServiceTest {
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
                 processDefinitionCode, cronTime, CommandType.START_PROCESS,
                 null, "n1,n2",
-                null, null, 0,
+                null, null, null,
                 RunMode.RUN_MODE_SERIAL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
@@ -235,7 +235,7 @@ public class ExecutorServiceTest {
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
                 processDefinitionCode, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA,
                 null, null,
-                null, null, 0,
+                null, null, null,
                 RunMode.RUN_MODE_SERIAL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
@@ -253,7 +253,7 @@ public class ExecutorServiceTest {
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
                 processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
                 null, null,
-                null, null, 0,
+                null, null, null,
                 RunMode.RUN_MODE_SERIAL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
@@ -271,7 +271,7 @@ public class ExecutorServiceTest {
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
                 processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
                 null, null,
-                null, null, 0,
+                null, null, null,
                 RunMode.RUN_MODE_PARALLEL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
@@ -290,7 +290,7 @@ public class ExecutorServiceTest {
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
                 processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
                 null, null,
-                null, null, 0,
+                null, null, null,
                 RunMode.RUN_MODE_PARALLEL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 15, Constants.DRY_RUN_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
@@ -306,7 +306,7 @@ public class ExecutorServiceTest {
         Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
                 processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
                 null, null,
-                null, null, 0,
+                null, null, null,
                 RunMode.RUN_MODE_PARALLEL,
                 Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO,
                 ComplementDependentMode.OFF_MODE);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
index a2d2aacdf8..1b0f12acde 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.dao;
 
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.dolphinscheduler.common.enums.AlertEvent;
 import org.apache.dolphinscheduler.common.enums.AlertStatus;
 import org.apache.dolphinscheduler.common.enums.AlertType;
@@ -47,6 +48,8 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
@@ -58,6 +61,13 @@ import com.google.common.collect.Lists;
 @Component
 public class AlertDao {
 
+    /**
+     * logger of AlertDao
+     */
+    private static final Logger logger = LoggerFactory.getLogger(AlertDao.class);
+  
+    private static final int QUERY_ALERT_THRESHOLD = 100;
+
     @Value("${alert.alarm-suppression.crash:60}")
     private Integer crashAlarmSuppression;
 
@@ -80,9 +90,16 @@ public class AlertDao {
      * @return add alert result
      */
     public int addAlert(Alert alert) {
+        if (null == alert.getAlertGroupId() || NumberUtils.INTEGER_ZERO.equals(alert.getAlertGroupId())) {
+            logger.warn("the value of alertGroupId is null or 0 ");
+            return 0;
+        }
+
         String sign = generateSign(alert);
         alert.setSign(sign);
-        return alertMapper.insert(alert);
+        int count = alertMapper.insert(alert);
+        logger.info("add alert to db , alert: {}", alert);
+        return count;
     }
 
     /**
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
index f68ad065b3..fe26746a55 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
@@ -165,7 +165,7 @@ public class ProcessDefinition {
      * warningGroupId
      */
     @TableField(exist = false)
-    private int warningGroupId;
+    private Integer warningGroupId;
 
     /**
      * execution type
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index fd4649bd44..f3f84408a6 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -697,13 +697,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         if (processDefinition.getExecutionType().typeIsSerialWait()) {
             checkSerialProcess(processDefinition);
         }
+
         if (processInstance.getState().typeIsWaitingThread()) {
             processService.createRecoveryWaitingThreadCommand(null, processInstance);
         }
-        if (processAlertManager.isNeedToSendWarning(processInstance)) {
-            ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
-            processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
-        }
+
+        ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
+        processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
         if (checkTaskQueue()) {
             //release task group
             processService.releaseAllTaskGroup(processInstance.getId());
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
index 4fca7b47c6..43b38f5b9f 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
@@ -197,7 +197,6 @@ public class ProcessAlertManager {
             alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1 : processInstance.getWarningGroupId());
             alert.setAlertType(AlertType.FAULT_TOLERANCE_WARNING);
             alertDao.addAlert(alert);
-            logger.info("add alert to db , alert : {}", alert);
 
         } catch (Exception e) {
             logger.error("send alert failed:{} ", e.getMessage());
@@ -214,13 +213,10 @@ public class ProcessAlertManager {
     public void sendAlertProcessInstance(ProcessInstance processInstance,
                                          List<TaskInstance> taskInstances,
                                          ProjectUser projectUser) {
-
         if (!isNeedToSendWarning(processInstance)) {
             return;
         }
-
         Alert alert = new Alert();
-
         String cmdName = getCommandCnName(processInstance.getCommandType());
         String success = processInstance.getState().typeIsSuccess() ? "success" : "failed";
         alert.setTitle(cmdName + " " + success);
@@ -234,7 +230,6 @@ public class ProcessAlertManager {
         alert.setProcessInstanceId(processInstance.getId());
         alert.setAlertType(processInstance.getState().typeIsSuccess() ? AlertType.PROCESS_INSTANCE_SUCCESS : AlertType.PROCESS_INSTANCE_FAILURE);
         alertDao.addAlert(alert);
-        logger.info("add alert to db , alert: {}", alert);
     }
 
     /**
@@ -297,7 +292,6 @@ public class ProcessAlertManager {
         //might need to change to data quality status
         alert.setAlertType(processInstance.getState().typeIsSuccess() ? AlertType.PROCESS_INSTANCE_SUCCESS : AlertType.PROCESS_INSTANCE_FAILURE);
         alertDao.addAlert(alert);
-        logger.info("add alert to db , alert: {}", alert);
     }
 
     /**
@@ -314,7 +308,6 @@ public class ProcessAlertManager {
         alert.setProcessInstanceId(processInstance.getId());
         alert.setAlertType(AlertType.TASK_FAILURE);
         alertDao.addAlert(alert);
-        logger.info("add alert to db , alert: {}", alert);
     }
 
     /**
@@ -411,6 +404,5 @@ public class ProcessAlertManager {
         alert.setProcessInstanceId(processInstance.getId());
         alert.setAlertType(AlertType.PROCESS_INSTANCE_BLOCKED);
         alertDao.addAlert(alert);
-        logger.info("add alert to db, alert: {}",alert);
     }
 }