You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/08/01 05:04:37 UTC

[dolphinscheduler] branch dev updated: Write alert result into db (#11221)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8e21c38c00 Write alert result into db (#11221)
8e21c38c00 is described below

commit 8e21c38c0041aa2689eb9d9bc703f67292c98e88
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Aug 1 13:04:27 2022 +0800

    Write alert result into db (#11221)
---
 .../dolphinscheduler/alert/api/AlertChannel.java   |  5 ++
 .../dolphinscheduler/alert/api/AlertData.java      |  5 ++
 .../dolphinscheduler/alert/api/AlertResult.java    |  5 +-
 .../dolphinscheduler/alert/AlertSenderService.java | 72 ++++++++++++++--------
 .../dolphinscheduler/common/enums/AlertType.java   |  7 ++-
 .../org/apache/dolphinscheduler/dao/AlertDao.java  | 11 +++-
 .../master/runner/WorkflowExecuteRunnable.java     |  8 ++-
 .../service/alert/ProcessAlertManager.java         | 27 +++++++-
 8 files changed, 105 insertions(+), 35 deletions(-)

diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannel.java b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannel.java
index 14dca78f6f..56c45362dd 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannel.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannel.java
@@ -25,8 +25,13 @@ package org.apache.dolphinscheduler.alert.api;
 public interface AlertChannel {
     /**
      * process and send alert
+     *
      * @param info alert info
      * @return process alarm result
      */
     AlertResult process(AlertInfo info);
+
+    default AlertResult closeAlert(AlertInfo info) {
+        return new AlertResult("true", "no need to close alert");
+    }
 }
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertData.java b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertData.java
index 50b6c2cc7a..37a3f3357c 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertData.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertData.java
@@ -61,4 +61,9 @@ public class AlertData {
      */
     private int warnType;
 
+    /**
+     * AlertType#code
+     */
+    private int alertType;
+
 }
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertResult.java b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertResult.java
index 734d51779f..b6c5db38e9 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertResult.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertResult.java
@@ -20,24 +20,27 @@
 package org.apache.dolphinscheduler.alert.api;
 
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 /**
  * alert result
  */
+@Builder
 @AllArgsConstructor
 @Data
 @NoArgsConstructor
 public class AlertResult {
 
     /**
+     * todo: use enum
      * false or true
      */
     private String status;
 
     /**
-     * alert result message
+     * alert result message, each plugin can have its own message
      */
     private String message;
 
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
index 2f91579322..fe2e1aaf4c 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
@@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.alert.api.AlertInfo;
 import org.apache.dolphinscheduler.alert.api.AlertResult;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AlertStatus;
+import org.apache.dolphinscheduler.common.enums.AlertType;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -43,10 +44,14 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import javax.annotation.Nullable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
+import com.google.common.collect.Lists;
+
 @Service
 public final class AlertSenderService extends Thread {
     private static final Logger logger = LoggerFactory.getLogger(AlertSenderService.class);
@@ -89,26 +94,31 @@ public final class AlertSenderService extends Thread {
             List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
             if (CollectionUtils.isEmpty(alertInstanceList)) {
                 logger.error("send alert msg fail,no bind plugin instance.");
-                alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "no bind plugin instance", alertId);
+                List<AlertResult> alertResults = Lists.newArrayList(new AlertResult("false",
+                                                                                    "no bind plugin instance"));
+                alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, JSONUtils.toJsonString(alertResults), alertId);
                 continue;
             }
             AlertData alertData = AlertData.builder()
-                    .id(alertId)
-                    .content(alert.getContent())
-                    .log(alert.getLog())
-                    .title(alert.getTitle())
-                    .warnType(alert.getWarningType().getCode())
-                    .build();
+                .id(alertId)
+                .content(alert.getContent())
+                .log(alert.getLog())
+                .title(alert.getTitle())
+                .warnType(alert.getWarningType().getCode())
+                .alertType(alert.getAlertType().getCode())
+                .build();
 
             int sendSuccessCount = 0;
+            List<AlertResult> alertResults = new ArrayList<>();
             for (AlertPluginInstance instance : alertInstanceList) {
                 AlertResult alertResult = this.alertResultHandler(instance, alertData);
                 if (alertResult != null) {
                     AlertStatus sendStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE;
-                    alertDao.addAlertSendStatus(sendStatus,alertResult.getMessage(),alertId,instance.getId());
+                    alertDao.addAlertSendStatus(sendStatus, JSONUtils.toJsonString(alertResult), alertId, instance.getId());
                     if (sendStatus.equals(AlertStatus.EXECUTION_SUCCESS)) {
                         sendSuccessCount++;
                     }
+                    alertResults.add(alertResult);
                 }
             }
             AlertStatus alertStatus = AlertStatus.EXECUTION_SUCCESS;
@@ -117,7 +127,7 @@ public final class AlertSenderService extends Thread {
             } else if (sendSuccessCount < alertInstanceList.size()) {
                 alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS;
             }
-            alertDao.updateAlert(alertStatus, "", alertId);
+            alertDao.updateAlert(alertStatus, JSONUtils.toJsonString(alertResults), alertId);
         }
     }
 
@@ -170,17 +180,18 @@ public final class AlertSenderService extends Thread {
      * @param alertData alertData
      * @return AlertResult
      */
-    private AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) {
-        Optional<AlertChannel> alertChannel = alertPluginManager.getAlertChannel(instance.getPluginDefineId());
-        AlertResult alertResultExtend = new AlertResult();
+    private @Nullable AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) {
         String pluginInstanceName = instance.getInstanceName();
-        if (!alertChannel.isPresent()) {
-            String message = String.format("Alert Plugin %s send error : return value is null", pluginInstanceName);
-            alertResultExtend.setStatus(String.valueOf(false));
-            alertResultExtend.setMessage(message);
-            logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, instance.getPluginDefineId());
-            return alertResultExtend;
+        int pluginDefineId = instance.getPluginDefineId();
+        Optional<AlertChannel> alertChannelOptional = alertPluginManager.getAlertChannel(instance.getPluginDefineId());
+        if (!alertChannelOptional.isPresent()) {
+            String message = String.format("Alert Plugin %s send error: the channel doesn't exist, pluginDefineId: %s",
+                                           pluginInstanceName,
+                                           pluginDefineId);
+            logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginDefineId);
+            return new AlertResult("false", message);
         }
+        AlertChannel alertChannel = alertChannelOptional.get();
 
         Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams());
         String instanceWarnType = WarningType.ALL.getDescp();
@@ -193,10 +204,8 @@ public final class AlertSenderService extends Thread {
 
         if (warningType == null) {
             String message = String.format("Alert Plugin %s send error : plugin warnType is null", pluginInstanceName);
-            alertResultExtend.setStatus(String.valueOf(false));
-            alertResultExtend.setMessage(message);
             logger.error("Alert Plugin {} send error : plugin warnType is null", pluginInstanceName);
-            return alertResultExtend;
+            return new AlertResult("false", message);
         }
 
         boolean sendWarning = false;
@@ -231,10 +240,18 @@ public final class AlertSenderService extends Thread {
         AlertResult alertResult;
         try {
             if (waitTimeout <= 0) {
-                alertResult = alertChannel.get().process(alertInfo);
+                if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
+                    alertResult = alertChannel.closeAlert(alertInfo);
+                } else {
+                    alertResult = alertChannel.process(alertInfo);
+                }
             } else {
-                CompletableFuture<AlertResult> future =
-                        CompletableFuture.supplyAsync(() -> alertChannel.get().process(alertInfo));
+                CompletableFuture<AlertResult> future;
+                if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
+                    future = CompletableFuture.supplyAsync(() -> alertChannel.closeAlert(alertInfo));
+                } else {
+                    future = CompletableFuture.supplyAsync(() -> alertChannel.process(alertInfo));
+                }
                 alertResult = future.get(waitTimeout, TimeUnit.MILLISECONDS);
             }
         } catch (InterruptedException e) {
@@ -246,18 +263,19 @@ public final class AlertSenderService extends Thread {
             logger.error("send alert error alert data id :{},", alertData.getId(), e);
         }
 
+        AlertResult alertResultExtend = new AlertResult();
         if (alertResult == null) {
             String message = String.format("Alert Plugin %s send error : return alertResult value is null", pluginInstanceName);
-            alertResultExtend.setStatus(String.valueOf(false));
+            alertResultExtend.setStatus("false");
             alertResultExtend.setMessage(message);
             logger.info("Alert Plugin {} send error : return alertResult value is null", pluginInstanceName);
         } else if (!Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))) {
-            alertResultExtend.setStatus(String.valueOf(false));
+            alertResultExtend.setStatus("false");
             alertResultExtend.setMessage(alertResult.getMessage());
             logger.info("Alert Plugin {} send error : {}", pluginInstanceName, alertResult.getMessage());
         } else {
             String message = String.format("Alert Plugin %s send success", pluginInstanceName);
-            alertResultExtend.setStatus(String.valueOf(true));
+            alertResultExtend.setStatus("true");
             alertResultExtend.setMessage(message);
             logger.info("Alert Plugin {} send success", pluginInstanceName);
         }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java
index 8b97dbdf93..d5713a775a 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java
@@ -25,7 +25,7 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
 public enum AlertType {
     /**
      * 0 process instance failure, 1 process instance success, 2 process instance blocked, 3 process instance timeout, 4 fault tolerance warning,
-     * 5 task failure, 6 task success, 7 task timeout
+     * 5 task failure, 6 task success, 7 task timeout, 8 close alert
       */
     PROCESS_INSTANCE_FAILURE(0, "process instance failure"),
     PROCESS_INSTANCE_SUCCESS(1, "process instance success"),
@@ -34,7 +34,10 @@ public enum AlertType {
     FAULT_TOLERANCE_WARNING(4, "fault tolerance warning"),
     TASK_FAILURE(5, "task failure"),
     TASK_SUCCESS(6, "task success"),
-    TASK_TIMEOUT(7, "task timeout"),;
+    TASK_TIMEOUT(7, "task timeout"),
+
+    CLOSE_ALERT(8, "the process instance success, can close the before alert")
+    ;
 
     AlertType(int code, String descp) {
         this.code = code;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
index 76fb17e143..018cea4827 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
@@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.AlertSendStatusMapper;
 
 import org.apache.commons.codec.digest.DigestUtils;
+
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -89,7 +90,7 @@ public class AlertDao {
      * update alert sending(execution) status
      *
      * @param alertStatus alertStatus
-     * @param log log
+     * @param log alert results json
      * @param id id
      * @return update alert result
      */
@@ -253,7 +254,13 @@ public class AlertDao {
      */
     public List<Alert> listPendingAlerts() {
         LambdaQueryWrapper<Alert> wrapper = new QueryWrapper<>(new Alert()).lambda()
-                .eq(Alert::getAlertStatus, AlertStatus.WAIT_EXECUTION);
+            .eq(Alert::getAlertStatus, AlertStatus.WAIT_EXECUTION);
+        return alertMapper.selectList(wrapper);
+    }
+
+    public List<Alert> listAlerts(int processInstanceId) {
+        LambdaQueryWrapper<Alert> wrapper = new QueryWrapper<>(new Alert()).lambda()
+            .eq(Alert::getProcessInstanceId, processInstanceId);
         return alertMapper.selectList(wrapper);
     }
 
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 27f90d3d00..f2965f79f5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -692,16 +692,20 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
      */
     public void endProcess() {
         this.stateEvents.clear();
-        if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) {
+        if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType()
+            .typeIsSerialPriority()) {
             checkSerialProcess(processDefinition);
         }
         if (processInstance.getState().typeIsWaitingThread()) {
             processService.createRecoveryWaitingThreadCommand(null, processInstance);
         }
+        ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
         if (processAlertManager.isNeedToSendWarning(processInstance)) {
-            ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
             processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
         }
+        if (processInstance.getState().typeIsSuccess()) {
+            processAlertManager.closeAlert(processInstance);
+        }
         if (checkTaskQueue()) {
             //release task group
             processService.releaseAllTaskGroup(processInstance.getId());
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
index 4fca7b47c6..9296aebe27 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
@@ -33,6 +33,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskAlertContent;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
 
+import org.apache.commons.collections4.CollectionUtils;
+
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
@@ -270,11 +272,34 @@ public class ProcessAlertManager {
         return sendWarning;
     }
 
+    /**
+     * Send a close alert event, if the processInstance has sent alert before, then will insert a closed event.
+     *
+     * @param processInstance success process instance
+     */
+    public void closeAlert(ProcessInstance processInstance) {
+        List<Alert> alerts = alertDao.listAlerts(processInstance.getId());
+        if (CollectionUtils.isEmpty(alerts)) {
+            // no need to close alert
+            return;
+        }
+
+        Alert alert = new Alert();
+        alert.setAlertGroupId(processInstance.getWarningGroupId());
+        alert.setUpdateTime(new Date());
+        alert.setCreateTime(new Date());
+        alert.setProjectCode(processInstance.getProcessDefinition().getProjectCode());
+        alert.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
+        alert.setProcessInstanceId(processInstance.getId());
+        alert.setAlertType(AlertType.CLOSE_ALERT);
+        alertDao.addAlert(alert);
+    }
+
     /**
      * send process timeout alert
      *
      * @param processInstance process instance
-     * @param projectUser projectUser
+     * @param projectUser     projectUser
      */
     public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProjectUser projectUser) {
         alertDao.sendProcessTimeoutAlert(processInstance, projectUser);