You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ch...@apache.org on 2022/11/07 06:58:38 UTC
[dolphinscheduler] branch 3.0.2-prepare updated: [fix-#11753] send alert error alert data id (#11774) (#12762)
This is an automated email from the ASF dual-hosted git repository.
chufenggao pushed a commit to branch 3.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.0.2-prepare by this push:
new ffb8b6ef70 [fix-#11753] send alert error alert data id (#11774) (#12762)
ffb8b6ef70 is described below
commit ffb8b6ef702a47ec4d7708ada9f830ceeaa715f3
Author: Eric Gao <er...@gmail.com>
AuthorDate: Mon Nov 7 14:58:31 2022 +0800
[fix-#11753] send alert error alert data id (#11774) (#12762)
Co-authored-by: fuchanghai <ch...@marketingforce.com>
---
.../api/controller/ExecutorController.java | 4 ++--
.../dolphinscheduler/api/python/PythonGateway.java | 2 +-
.../dolphinscheduler/api/service/ExecutorService.java | 2 +-
.../api/service/impl/ExecutorServiceImpl.java | 4 ++--
.../api/controller/ExecutorControllerTest.java | 2 +-
.../api/service/ExecutorServiceTest.java | 14 +++++++-------
.../org/apache/dolphinscheduler/dao/AlertDao.java | 19 ++++++++++++++++++-
.../dao/entity/ProcessDefinition.java | 2 +-
.../server/master/runner/WorkflowExecuteRunnable.java | 8 ++++----
.../service/alert/ProcessAlertManager.java | 8 --------
10 files changed, 37 insertions(+), 28 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index 6532dbda1c..ec68b4b7f7 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -132,7 +132,7 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
@RequestParam(value = "execType", required = false) CommandType execType,
@RequestParam(value = "warningType") WarningType warningType,
- @RequestParam(value = "warningGroupId", required = false, defaultValue = "0") Integer warningGroupId,
+ @RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@@ -216,7 +216,7 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
@RequestParam(value = "execType", required = false) CommandType execType,
@RequestParam(value = "warningType") WarningType warningType,
- @RequestParam(value = "warningGroupId", required = false) int warningGroupId,
+ @RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index b25feb13e4..a651dddf63 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -325,7 +325,7 @@ public class PythonGateway {
String cronTime,
String workerGroup,
String warningType,
- int warningGroupId,
+ Integer warningGroupId,
Integer timeout
) {
User user = usersService.queryUser(userName);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index b5cf04bd5e..acab76f936 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -61,7 +61,7 @@ public interface ExecutorService {
Map<String, Object> execProcessInstance(User loginUser, long projectCode,
long processDefinitionCode, String cronTime, CommandType commandType,
FailureStrategy failureStrategy, String startNodeList,
- TaskDependType taskDependType, WarningType warningType, int warningGroupId,
+ TaskDependType taskDependType, WarningType warningType, Integer warningGroupId,
RunMode runMode,
Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber,
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 802d3990e0..71a5874404 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -160,7 +160,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
public Map<String, Object> execProcessInstance(User loginUser, long projectCode,
long processDefinitionCode, String cronTime, CommandType commandType,
FailureStrategy failureStrategy, String startNodeList,
- TaskDependType taskDependType, WarningType warningType, int warningGroupId,
+ TaskDependType taskDependType, WarningType warningType, Integer warningGroupId,
RunMode runMode,
Priority processInstancePriority, String workerGroup, Long environmentCode,Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber,
@@ -613,7 +613,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
private int createCommand(CommandType commandType, long processDefineCode,
TaskDependType nodeDep, FailureStrategy failureStrategy,
String startNodeList, String schedule, WarningType warningType,
- int executorId, int warningGroupId,
+ int executorId, Integer warningGroupId,
RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode,
Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun, ComplementDependentMode complementDependentMode) {
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
index 7faed0bde0..710244eb13 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
@@ -206,7 +206,7 @@ public class ExecutorControllerTest extends AbstractControllerTest {
when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType),
- eq(0), eq(null), eq(null), eq("default"), eq(-1L),
+ eq(null), eq(null), eq(null), eq("default"), eq(-1L),
eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0),
eq(complementDependentMode))).thenReturn(executeServiceResult);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 9c0c1c3231..0e2c2ba5f1 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -197,7 +197,7 @@ public class ExecutorServiceTest {
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.START_PROCESS,
null, null,
- null, null, 0,
+ null, null, null,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
@@ -216,7 +216,7 @@ public class ExecutorServiceTest {
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.START_PROCESS,
null, "n1,n2",
- null, null, 0,
+ null, null, null,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
@@ -235,7 +235,7 @@ public class ExecutorServiceTest {
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA,
null, null,
- null, null, 0,
+ null, null, null,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
@@ -253,7 +253,7 @@ public class ExecutorServiceTest {
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
- null, null, 0,
+ null, null, null,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
@@ -271,7 +271,7 @@ public class ExecutorServiceTest {
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
- null, null, 0,
+ null, null, null,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
@@ -290,7 +290,7 @@ public class ExecutorServiceTest {
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
- null, null, 0,
+ null, null, null,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 15, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
@@ -306,7 +306,7 @@ public class ExecutorServiceTest {
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
- null, null, 0,
+ null, null, null,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
index a2d2aacdf8..1b0f12acde 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.dolphinscheduler.common.enums.AlertEvent;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
@@ -47,6 +48,8 @@ import java.util.Optional;
import java.util.stream.Collectors;
import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@@ -58,6 +61,13 @@ import com.google.common.collect.Lists;
@Component
public class AlertDao {
+ /**
+ * logger of AlertDao
+ */
+ private static final Logger logger = LoggerFactory.getLogger(AlertDao.class);
+
+ private static final int QUERY_ALERT_THRESHOLD = 100;
+
@Value("${alert.alarm-suppression.crash:60}")
private Integer crashAlarmSuppression;
@@ -80,9 +90,16 @@ public class AlertDao {
* @return add alert result
*/
public int addAlert(Alert alert) {
+ if (null == alert.getAlertGroupId() || NumberUtils.INTEGER_ZERO.equals(alert.getAlertGroupId())) {
+ logger.warn("the value of alertGroupId is null or 0 ");
+ return 0;
+ }
+
String sign = generateSign(alert);
alert.setSign(sign);
- return alertMapper.insert(alert);
+ int count = alertMapper.insert(alert);
+ logger.info("add alert to db , alert: {}", alert);
+ return count;
}
/**
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
index f68ad065b3..fe26746a55 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
@@ -165,7 +165,7 @@ public class ProcessDefinition {
* warningGroupId
*/
@TableField(exist = false)
- private int warningGroupId;
+ private Integer warningGroupId;
/**
* execution type
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index fd4649bd44..f3f84408a6 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -697,13 +697,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (processDefinition.getExecutionType().typeIsSerialWait()) {
checkSerialProcess(processDefinition);
}
+
if (processInstance.getState().typeIsWaitingThread()) {
processService.createRecoveryWaitingThreadCommand(null, processInstance);
}
- if (processAlertManager.isNeedToSendWarning(processInstance)) {
- ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
- processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
- }
+
+ ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
+ processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
if (checkTaskQueue()) {
//release task group
processService.releaseAllTaskGroup(processInstance.getId());
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
index 4fca7b47c6..43b38f5b9f 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
@@ -197,7 +197,6 @@ public class ProcessAlertManager {
alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1 : processInstance.getWarningGroupId());
alert.setAlertType(AlertType.FAULT_TOLERANCE_WARNING);
alertDao.addAlert(alert);
- logger.info("add alert to db , alert : {}", alert);
} catch (Exception e) {
logger.error("send alert failed:{} ", e.getMessage());
@@ -214,13 +213,10 @@ public class ProcessAlertManager {
public void sendAlertProcessInstance(ProcessInstance processInstance,
List<TaskInstance> taskInstances,
ProjectUser projectUser) {
-
if (!isNeedToSendWarning(processInstance)) {
return;
}
-
Alert alert = new Alert();
-
String cmdName = getCommandCnName(processInstance.getCommandType());
String success = processInstance.getState().typeIsSuccess() ? "success" : "failed";
alert.setTitle(cmdName + " " + success);
@@ -234,7 +230,6 @@ public class ProcessAlertManager {
alert.setProcessInstanceId(processInstance.getId());
alert.setAlertType(processInstance.getState().typeIsSuccess() ? AlertType.PROCESS_INSTANCE_SUCCESS : AlertType.PROCESS_INSTANCE_FAILURE);
alertDao.addAlert(alert);
- logger.info("add alert to db , alert: {}", alert);
}
/**
@@ -297,7 +292,6 @@ public class ProcessAlertManager {
//might need to change to data quality status
alert.setAlertType(processInstance.getState().typeIsSuccess() ? AlertType.PROCESS_INSTANCE_SUCCESS : AlertType.PROCESS_INSTANCE_FAILURE);
alertDao.addAlert(alert);
- logger.info("add alert to db , alert: {}", alert);
}
/**
@@ -314,7 +308,6 @@ public class ProcessAlertManager {
alert.setProcessInstanceId(processInstance.getId());
alert.setAlertType(AlertType.TASK_FAILURE);
alertDao.addAlert(alert);
- logger.info("add alert to db , alert: {}", alert);
}
/**
@@ -411,6 +404,5 @@ public class ProcessAlertManager {
alert.setProcessInstanceId(processInstance.getId());
alert.setAlertType(AlertType.PROCESS_INSTANCE_BLOCKED);
alertDao.addAlert(alert);
- logger.info("add alert to db, alert: {}",alert);
}
}