You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/08/01 05:04:37 UTC
[dolphinscheduler] branch dev updated: Write alert result into db (#11221)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 8e21c38c00 Write alert result into db (#11221)
8e21c38c00 is described below
commit 8e21c38c0041aa2689eb9d9bc703f67292c98e88
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Aug 1 13:04:27 2022 +0800
Write alert result into db (#11221)
---
.../dolphinscheduler/alert/api/AlertChannel.java | 5 ++
.../dolphinscheduler/alert/api/AlertData.java | 5 ++
.../dolphinscheduler/alert/api/AlertResult.java | 5 +-
.../dolphinscheduler/alert/AlertSenderService.java | 72 ++++++++++++++--------
.../dolphinscheduler/common/enums/AlertType.java | 7 ++-
.../org/apache/dolphinscheduler/dao/AlertDao.java | 11 +++-
.../master/runner/WorkflowExecuteRunnable.java | 8 ++-
.../service/alert/ProcessAlertManager.java | 27 +++++++-
8 files changed, 105 insertions(+), 35 deletions(-)
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannel.java b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannel.java
index 14dca78f6f..56c45362dd 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannel.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannel.java
@@ -25,8 +25,13 @@ package org.apache.dolphinscheduler.alert.api;
public interface AlertChannel {
/**
* process and send alert
+ *
* @param info alert info
* @return process alarm result
*/
AlertResult process(AlertInfo info);
+
+ default AlertResult closeAlert(AlertInfo info) {
+ return new AlertResult("true", "no need to close alert");
+ }
}
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertData.java b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertData.java
index 50b6c2cc7a..37a3f3357c 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertData.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertData.java
@@ -61,4 +61,9 @@ public class AlertData {
*/
private int warnType;
+ /**
+ * AlertType#code
+ */
+ private int alertType;
+
}
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertResult.java b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertResult.java
index 734d51779f..b6c5db38e9 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertResult.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertResult.java
@@ -20,24 +20,27 @@
package org.apache.dolphinscheduler.alert.api;
import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* alert result
*/
+@Builder
@AllArgsConstructor
@Data
@NoArgsConstructor
public class AlertResult {
/**
+ * todo: use enum
* false or true
*/
private String status;
/**
- * alert result message
+ * alert result message, each plugin can have its own message
*/
private String message;
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
index 2f91579322..fe2e1aaf4c 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.alert.api.AlertInfo;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
+import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -43,10 +44,14 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
+import com.google.common.collect.Lists;
+
@Service
public final class AlertSenderService extends Thread {
private static final Logger logger = LoggerFactory.getLogger(AlertSenderService.class);
@@ -89,26 +94,31 @@ public final class AlertSenderService extends Thread {
List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
if (CollectionUtils.isEmpty(alertInstanceList)) {
logger.error("send alert msg fail,no bind plugin instance.");
- alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "no bind plugin instance", alertId);
+ List<AlertResult> alertResults = Lists.newArrayList(new AlertResult("false",
+ "no bind plugin instance"));
+ alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, JSONUtils.toJsonString(alertResults), alertId);
continue;
}
AlertData alertData = AlertData.builder()
- .id(alertId)
- .content(alert.getContent())
- .log(alert.getLog())
- .title(alert.getTitle())
- .warnType(alert.getWarningType().getCode())
- .build();
+ .id(alertId)
+ .content(alert.getContent())
+ .log(alert.getLog())
+ .title(alert.getTitle())
+ .warnType(alert.getWarningType().getCode())
+ .alertType(alert.getAlertType().getCode())
+ .build();
int sendSuccessCount = 0;
+ List<AlertResult> alertResults = new ArrayList<>();
for (AlertPluginInstance instance : alertInstanceList) {
AlertResult alertResult = this.alertResultHandler(instance, alertData);
if (alertResult != null) {
AlertStatus sendStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE;
- alertDao.addAlertSendStatus(sendStatus,alertResult.getMessage(),alertId,instance.getId());
+ alertDao.addAlertSendStatus(sendStatus, JSONUtils.toJsonString(alertResult), alertId, instance.getId());
if (sendStatus.equals(AlertStatus.EXECUTION_SUCCESS)) {
sendSuccessCount++;
}
+ alertResults.add(alertResult);
}
}
AlertStatus alertStatus = AlertStatus.EXECUTION_SUCCESS;
@@ -117,7 +127,7 @@ public final class AlertSenderService extends Thread {
} else if (sendSuccessCount < alertInstanceList.size()) {
alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS;
}
- alertDao.updateAlert(alertStatus, "", alertId);
+ alertDao.updateAlert(alertStatus, JSONUtils.toJsonString(alertResults), alertId);
}
}
@@ -170,17 +180,18 @@ public final class AlertSenderService extends Thread {
* @param alertData alertData
* @return AlertResult
*/
- private AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) {
- Optional<AlertChannel> alertChannel = alertPluginManager.getAlertChannel(instance.getPluginDefineId());
- AlertResult alertResultExtend = new AlertResult();
+ private @Nullable AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) {
String pluginInstanceName = instance.getInstanceName();
- if (!alertChannel.isPresent()) {
- String message = String.format("Alert Plugin %s send error : return value is null", pluginInstanceName);
- alertResultExtend.setStatus(String.valueOf(false));
- alertResultExtend.setMessage(message);
- logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, instance.getPluginDefineId());
- return alertResultExtend;
+ int pluginDefineId = instance.getPluginDefineId();
+ Optional<AlertChannel> alertChannelOptional = alertPluginManager.getAlertChannel(instance.getPluginDefineId());
+ if (!alertChannelOptional.isPresent()) {
+ String message = String.format("Alert Plugin %s send error: the channel doesn't exist, pluginDefineId: %s",
+ pluginInstanceName,
+ pluginDefineId);
+ logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginDefineId);
+ return new AlertResult("false", message);
}
+ AlertChannel alertChannel = alertChannelOptional.get();
Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams());
String instanceWarnType = WarningType.ALL.getDescp();
@@ -193,10 +204,8 @@ public final class AlertSenderService extends Thread {
if (warningType == null) {
String message = String.format("Alert Plugin %s send error : plugin warnType is null", pluginInstanceName);
- alertResultExtend.setStatus(String.valueOf(false));
- alertResultExtend.setMessage(message);
logger.error("Alert Plugin {} send error : plugin warnType is null", pluginInstanceName);
- return alertResultExtend;
+ return new AlertResult("false", message);
}
boolean sendWarning = false;
@@ -231,10 +240,18 @@ public final class AlertSenderService extends Thread {
AlertResult alertResult;
try {
if (waitTimeout <= 0) {
- alertResult = alertChannel.get().process(alertInfo);
+ if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
+ alertResult = alertChannel.closeAlert(alertInfo);
+ } else {
+ alertResult = alertChannel.process(alertInfo);
+ }
} else {
- CompletableFuture<AlertResult> future =
- CompletableFuture.supplyAsync(() -> alertChannel.get().process(alertInfo));
+ CompletableFuture<AlertResult> future;
+ if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
+ future = CompletableFuture.supplyAsync(() -> alertChannel.closeAlert(alertInfo));
+ } else {
+ future = CompletableFuture.supplyAsync(() -> alertChannel.process(alertInfo));
+ }
alertResult = future.get(waitTimeout, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
@@ -246,18 +263,19 @@ public final class AlertSenderService extends Thread {
logger.error("send alert error alert data id :{},", alertData.getId(), e);
}
+ AlertResult alertResultExtend = new AlertResult();
if (alertResult == null) {
String message = String.format("Alert Plugin %s send error : return alertResult value is null", pluginInstanceName);
- alertResultExtend.setStatus(String.valueOf(false));
+ alertResultExtend.setStatus("false");
alertResultExtend.setMessage(message);
logger.info("Alert Plugin {} send error : return alertResult value is null", pluginInstanceName);
} else if (!Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))) {
- alertResultExtend.setStatus(String.valueOf(false));
+ alertResultExtend.setStatus("false");
alertResultExtend.setMessage(alertResult.getMessage());
logger.info("Alert Plugin {} send error : {}", pluginInstanceName, alertResult.getMessage());
} else {
String message = String.format("Alert Plugin %s send success", pluginInstanceName);
- alertResultExtend.setStatus(String.valueOf(true));
+ alertResultExtend.setStatus("true");
alertResultExtend.setMessage(message);
logger.info("Alert Plugin {} send success", pluginInstanceName);
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java
index 8b97dbdf93..d5713a775a 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java
@@ -25,7 +25,7 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
public enum AlertType {
/**
* 0 process instance failure, 1 process instance success, 2 process instance blocked, 3 process instance timeout, 4 fault tolerance warning,
- * 5 task failure, 6 task success, 7 task timeout
+ * 5 task failure, 6 task success, 7 task timeout, 8 close alert
*/
PROCESS_INSTANCE_FAILURE(0, "process instance failure"),
PROCESS_INSTANCE_SUCCESS(1, "process instance success"),
@@ -34,7 +34,10 @@ public enum AlertType {
FAULT_TOLERANCE_WARNING(4, "fault tolerance warning"),
TASK_FAILURE(5, "task failure"),
TASK_SUCCESS(6, "task success"),
- TASK_TIMEOUT(7, "task timeout"),;
+ TASK_TIMEOUT(7, "task timeout"),
+
+ CLOSE_ALERT(8, "the process instance success, can close the before alert")
+ ;
AlertType(int code, String descp) {
this.code = code;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
index 76fb17e143..018cea4827 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
@@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.AlertSendStatusMapper;
import org.apache.commons.codec.digest.DigestUtils;
+
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -89,7 +90,7 @@ public class AlertDao {
* update alert sending(execution) status
*
* @param alertStatus alertStatus
- * @param log log
+ * @param log alert results json
* @param id id
* @return update alert result
*/
@@ -253,7 +254,13 @@ public class AlertDao {
*/
public List<Alert> listPendingAlerts() {
LambdaQueryWrapper<Alert> wrapper = new QueryWrapper<>(new Alert()).lambda()
- .eq(Alert::getAlertStatus, AlertStatus.WAIT_EXECUTION);
+ .eq(Alert::getAlertStatus, AlertStatus.WAIT_EXECUTION);
+ return alertMapper.selectList(wrapper);
+ }
+
+ public List<Alert> listAlerts(int processInstanceId) {
+ LambdaQueryWrapper<Alert> wrapper = new QueryWrapper<>(new Alert()).lambda()
+ .eq(Alert::getProcessInstanceId, processInstanceId);
return alertMapper.selectList(wrapper);
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 27f90d3d00..f2965f79f5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -692,16 +692,20 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/
public void endProcess() {
this.stateEvents.clear();
- if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) {
+ if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType()
+ .typeIsSerialPriority()) {
checkSerialProcess(processDefinition);
}
if (processInstance.getState().typeIsWaitingThread()) {
processService.createRecoveryWaitingThreadCommand(null, processInstance);
}
+ ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
if (processAlertManager.isNeedToSendWarning(processInstance)) {
- ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
}
+ if (processInstance.getState().typeIsSuccess()) {
+ processAlertManager.closeAlert(processInstance);
+ }
if (checkTaskQueue()) {
//release task group
processService.releaseAllTaskGroup(processInstance.getId());
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
index 4fca7b47c6..9296aebe27 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
@@ -33,6 +33,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskAlertContent;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
+import org.apache.commons.collections4.CollectionUtils;
+
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -270,11 +272,34 @@ public class ProcessAlertManager {
return sendWarning;
}
+ /**
+ * Send a close alert event, if the processInstance has sent alert before, then will insert a closed event.
+ *
+ * @param processInstance success process instance
+ */
+ public void closeAlert(ProcessInstance processInstance) {
+ List<Alert> alerts = alertDao.listAlerts(processInstance.getId());
+ if (CollectionUtils.isEmpty(alerts)) {
+ // no need to close alert
+ return;
+ }
+
+ Alert alert = new Alert();
+ alert.setAlertGroupId(processInstance.getWarningGroupId());
+ alert.setUpdateTime(new Date());
+ alert.setCreateTime(new Date());
+ alert.setProjectCode(processInstance.getProcessDefinition().getProjectCode());
+ alert.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
+ alert.setProcessInstanceId(processInstance.getId());
+ alert.setAlertType(AlertType.CLOSE_ALERT);
+ alertDao.addAlert(alert);
+ }
+
/**
* send process timeout alert
*
* @param processInstance process instance
- * @param projectUser projectUser
+ * @param projectUser projectUser
*/
public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProjectUser projectUser) {
alertDao.sendProcessTimeoutAlert(processInstance, projectUser);