You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/04/08 02:02:16 UTC

[dolphinscheduler] branch dev updated: [Fix-9221] [alert-server] optimization and gracefully close (#9246)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new ca95d2f928 [Fix-9221] [alert-server] optimization and gracefully close (#9246)
ca95d2f928 is described below

commit ca95d2f9282fd234d954194bab40a5bfc713c433
Author: guoshupei <15...@163.com>
AuthorDate: Fri Apr 8 10:02:10 2022 +0800

    [Fix-9221] [alert-server] optimization and gracefully close (#9246)
    
    * [Fix-9221] [alert-server] optimization and gracefully close
    
    This closes #9221
    
    * [Fix-9221] [alert-server] remove unused mock data
    
    This closes #9221
    
    * [Fix-9221] [alert-server] remove unused mock data
    
    This closes #9221
    
    * [Fix-9221] [alert-server] remove unnecessary Mockito stubbings
    
    * [Fix-9221] [alert-server] init AlertPluginManager in AlertServer
    
    * [Fix-9221] [alert-server] AlertServerTest add AlertPluginManager installPlugin
    
    * [Fix-9221] [alert-server] replace @Eventlistener with @PostConstruct
    
    * [Fix-9221] [alert-server] sonar check solution
    
    * [Improvement-9221] [alert] update constructor injection and replace IStoppable with Closeable
    
    Co-authored-by: guoshupei <gu...@lixiang.com>
---
 .../dolphinscheduler/alert/AlertPluginManager.java |  24 +++--
 .../alert/AlertRequestProcessor.java               |   8 +-
 .../{AlertSender.java => AlertSenderService.java}  |  69 ++++++++-----
 .../apache/dolphinscheduler/alert/AlertServer.java | 107 +++++++++++----------
 .../dolphinscheduler/alert/AlertServerTest.java    |  19 +++-
 .../alert/processor/AlertRequestProcessorTest.java |  21 ++--
 ...SenderTest.java => AlertSenderServiceTest.java} |  35 ++++---
 .../apache/dolphinscheduler/common/Constants.java  |   1 +
 .../command/alert/AlertSendRequestCommand.java     |  10 ++
 9 files changed, 177 insertions(+), 117 deletions(-)

diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java
index f590d31aee..cc84769304 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java
@@ -41,8 +41,6 @@ import java.util.Optional;
 import java.util.ServiceLoader;
 import java.util.Set;
 
-import javax.annotation.PostConstruct;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.context.event.ApplicationReadyEvent;
@@ -55,23 +53,23 @@ public final class AlertPluginManager {
 
     private final PluginDao pluginDao;
 
-    private final Map<Integer, AlertChannel> channelKeyedById = new HashMap<>();
-
-    private final PluginParams warningTypeParams = getWarningTypeParams();
-
     public AlertPluginManager(PluginDao pluginDao) {
         this.pluginDao = pluginDao;
     }
 
+    private final Map<Integer, AlertChannel> channelKeyedById = new HashMap<>();
+
+    private final PluginParams warningTypeParams = getWarningTypeParams();
+
     public PluginParams getWarningTypeParams() {
         return
-            RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE)
-                .addParamsOptions(new ParamsOptions(WarningType.SUCCESS.getDescp(), WarningType.SUCCESS.getDescp(), false))
-                .addParamsOptions(new ParamsOptions(WarningType.FAILURE.getDescp(), WarningType.FAILURE.getDescp(), false))
-                .addParamsOptions(new ParamsOptions(WarningType.ALL.getDescp(), WarningType.ALL.getDescp(), false))
-                .setValue(WarningType.ALL.getDescp())
-                .addValidate(Validate.newBuilder().setRequired(true).build())
-                .build();
+                RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE)
+                        .addParamsOptions(new ParamsOptions(WarningType.SUCCESS.getDescp(), WarningType.SUCCESS.getDescp(), false))
+                        .addParamsOptions(new ParamsOptions(WarningType.FAILURE.getDescp(), WarningType.FAILURE.getDescp(), false))
+                        .addParamsOptions(new ParamsOptions(WarningType.ALL.getDescp(), WarningType.ALL.getDescp(), false))
+                        .setValue(WarningType.ALL.getDescp())
+                        .addValidate(Validate.newBuilder().setRequired(true).build())
+                        .build();
     }
 
     @EventListener
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java
index c85292f725..6bb4aa0049 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java
@@ -36,10 +36,10 @@ import io.netty.channel.Channel;
 public final class AlertRequestProcessor implements NettyRequestProcessor {
     private static final Logger logger = LoggerFactory.getLogger(AlertRequestProcessor.class);
 
-    private final AlertSender alertSender;
+    private final AlertSenderService alertSenderService;
 
-    public AlertRequestProcessor(AlertSender alertSender) {
-        this.alertSender = alertSender;
+    public AlertRequestProcessor(AlertSenderService alertSenderService) {
+        this.alertSenderService = alertSenderService;
     }
 
     @Override
@@ -51,7 +51,7 @@ public final class AlertRequestProcessor implements NettyRequestProcessor {
 
         logger.info("Received command : {}", alertSendRequestCommand);
 
-        AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(
+        AlertSendResponseCommand alertSendResponseCommand = alertSenderService.syncHandler(
             alertSendRequestCommand.getGroupId(),
             alertSendRequestCommand.getTitle(),
             alertSendRequestCommand.getContent(),
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSender.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
similarity index 85%
rename from dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSender.java
rename to dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
index 3ddc5c5795..af47913537 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSender.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
@@ -17,21 +17,26 @@
 
 package org.apache.dolphinscheduler.alert;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.dolphinscheduler.alert.api.AlertChannel;
 import org.apache.dolphinscheduler.alert.api.AlertConstants;
 import org.apache.dolphinscheduler.alert.api.AlertData;
 import org.apache.dolphinscheduler.alert.api.AlertInfo;
 import org.apache.dolphinscheduler.alert.api.AlertResult;
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AlertStatus;
 import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.Alert;
 import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
 import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
 import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;
-
-import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -40,22 +45,39 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-@Component
-public final class AlertSender {
-    private static final Logger logger = LoggerFactory.getLogger(AlertSender.class);
+@Service
+public final class AlertSenderService extends Thread {
+    private static final Logger logger = LoggerFactory.getLogger(AlertSenderService.class);
 
     private final AlertDao alertDao;
     private final AlertPluginManager alertPluginManager;
 
-    public AlertSender(AlertDao alertDao, AlertPluginManager alertPluginManager) {
+    public AlertSenderService(AlertDao alertDao, AlertPluginManager alertPluginManager) {
         this.alertDao = alertDao;
         this.alertPluginManager = alertPluginManager;
     }
 
+    @Override
+    public synchronized void start() {
+        super.setName("AlertSenderService");
+        super.start();
+    }
+
+    @Override
+    public void run() {
+        logger.info("alert sender started");
+        while (Stopper.isRunning()) {
+            try {
+                List<Alert> alerts = alertDao.listPendingAlerts();
+                this.send(alerts);
+                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L);
+            } catch (Exception e) {
+                logger.error("alert sender thread error", e);
+            }
+        }
+    }
+
+
     public void send(List<Alert> alerts) {
         for (Alert alert : alerts) {
             //get alert group from alert
@@ -68,11 +90,11 @@ public final class AlertSender {
             }
             AlertData alertData = new AlertData();
             alertData.setId(alert.getId())
-                     .setContent(alert.getContent())
-                     .setLog(alert.getLog())
-                     .setTitle(alert.getTitle())
-                     .setTitle(alert.getTitle())
-                     .setWarnType(alert.getWarningType().getCode());
+                    .setContent(alert.getContent())
+                    .setLog(alert.getLog())
+                    .setTitle(alert.getTitle())
+                    .setTitle(alert.getTitle())
+                    .setWarnType(alert.getWarningType().getCode());
 
             int sendSuccessCount = 0;
             for (AlertPluginInstance instance : alertInstanceList) {
@@ -93,23 +115,22 @@ public final class AlertSender {
             }
             alertDao.updateAlert(alertStatus, "", alert.getId());
         }
-
     }
 
     /**
      * sync send alert handler
      *
      * @param alertGroupId alertGroupId
-     * @param title title
-     * @param content content
+     * @param title        title
+     * @param content      content
      * @return AlertSendResponseCommand
      */
-    public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content , int warnType) {
+    public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content, int warnType) {
         List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
         AlertData alertData = new AlertData();
         alertData.setContent(content)
-                 .setTitle(title)
-                 .setWarnType(warnType);
+                .setTitle(title)
+                .setWarnType(warnType);
 
         boolean sendResponseStatus = true;
         List<AlertSendResponseResult> sendResponseResults = new ArrayList<>();
@@ -128,7 +149,7 @@ public final class AlertSender {
             AlertResult alertResult = this.alertResultHandler(instance, alertData);
             if (alertResult != null) {
                 AlertSendResponseResult alertSendResponseResult = new AlertSendResponseResult(
-                    Boolean.parseBoolean(String.valueOf(alertResult.getStatus())), alertResult.getMessage());
+                        Boolean.parseBoolean(String.valueOf(alertResult.getStatus())), alertResult.getMessage());
                 sendResponseStatus = sendResponseStatus && alertSendResponseResult.getStatus();
                 sendResponseResults.add(alertSendResponseResult);
             }
@@ -140,7 +161,7 @@ public final class AlertSender {
     /**
      * alert result handler
      *
-     * @param instance instance
+     * @param instance  instance
      * @param alertData alertData
      * @return AlertResult
      */
@@ -159,7 +180,7 @@ public final class AlertSender {
         Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams());
         String instanceWarnType = WarningType.ALL.getDescp();
 
-        if(paramsMap != null){
+        if (paramsMap != null) {
             instanceWarnType = paramsMap.getOrDefault(AlertConstants.NAME_WARNING_TYPE, WarningType.ALL.getDescp());
         }
 
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
index 6b18befe39..60bd09e6e2 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
@@ -17,73 +17,95 @@
 
 package org.apache.dolphinscheduler.alert;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.dao.AlertDao;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.dao.PluginDao;
-import org.apache.dolphinscheduler.dao.entity.Alert;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
-
-import java.io.Closeable;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.PreDestroy;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.SpringApplication;
+import org.springframework.boot.WebApplicationType;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.builder.SpringApplicationBuilder;
 import org.springframework.boot.context.event.ApplicationReadyEvent;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.event.EventListener;
 
+import javax.annotation.PreDestroy;
+import java.io.Closeable;
+
 @SpringBootApplication
 @ComponentScan("org.apache.dolphinscheduler")
 public class AlertServer implements Closeable {
     private static final Logger logger = LoggerFactory.getLogger(AlertServer.class);
 
     private final PluginDao pluginDao;
-    private final AlertDao alertDao;
-    private final AlertPluginManager alertPluginManager;
-    private final AlertSender alertSender;
+    private final AlertSenderService alertSenderService;
     private final AlertRequestProcessor alertRequestProcessor;
+    private final AlertConfig alertConfig;
+    private NettyRemotingServer nettyRemotingServer;
 
-    private NettyRemotingServer server;
-
-    @Autowired
-    private AlertConfig config;
-
-    public AlertServer(PluginDao pluginDao, AlertDao alertDao, AlertPluginManager alertPluginManager, AlertSender alertSender, AlertRequestProcessor alertRequestProcessor) {
+    public AlertServer(PluginDao pluginDao, AlertSenderService alertSenderService, AlertRequestProcessor alertRequestProcessor, AlertConfig alertConfig) {
         this.pluginDao = pluginDao;
-        this.alertDao = alertDao;
-        this.alertPluginManager = alertPluginManager;
-        this.alertSender = alertSender;
+        this.alertSenderService = alertSenderService;
         this.alertRequestProcessor = alertRequestProcessor;
+        this.alertConfig = alertConfig;
     }
 
+    /**
+     * alert server startup, not use web service
+     *
+     * @param args arguments
+     */
     public static void main(String[] args) {
-        SpringApplication.run(AlertServer.class, args);
+        Thread.currentThread().setName(Constants.THREAD_NAME_ALERT_SERVER);
+        new SpringApplicationBuilder(AlertServer.class).web(WebApplicationType.NONE).run(args);
     }
 
     @EventListener
-    public void start(ApplicationReadyEvent readyEvent) {
-        logger.info("Starting Alert server");
+    public void run(ApplicationReadyEvent readyEvent) {
+        logger.info("alert server starting...");
 
         checkTable();
         startServer();
-
-        Executors.newScheduledThreadPool(1)
-                .scheduleAtFixedRate(new Sender(), 5, 5, TimeUnit.SECONDS);
+        alertSenderService.start();
     }
 
     @Override
     @PreDestroy
     public void close() {
-        server.close();
+        destroy("alert server destroy");
+    }
+
+    /**
+     * gracefully stop
+     *
+     * @param cause stop cause
+     */
+    public void destroy(String cause) {
+
+        try {
+            // execute only once
+            if (Stopper.isStopped()) {
+                return;
+            }
+
+            logger.info("alert server is stopping ..., cause : {}", cause);
+
+            // set stop signal is true
+            Stopper.stop();
+
+            // thread sleep 3 seconds for thread quietly stop
+            ThreadUtils.sleep(3000L);
+
+            // close
+            this.nettyRemotingServer.close();
+
+        } catch (Exception e) {
+            logger.error("alert server stop exception ", e);
+        }
     }
 
     private void checkTable() {
@@ -95,26 +117,11 @@ public class AlertServer implements Closeable {
 
     private void startServer() {
         NettyServerConfig serverConfig = new NettyServerConfig();
-        serverConfig.setListenPort(config.getPort());
+        serverConfig.setListenPort(alertConfig.getPort());
 
-        server = new NettyRemotingServer(serverConfig);
-        server.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor);
-        server.start();
+        nettyRemotingServer = new NettyRemotingServer(serverConfig);
+        nettyRemotingServer.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor);
+        nettyRemotingServer.start();
     }
 
-    final class Sender implements Runnable {
-        @Override
-        public void run() {
-            if (!Stopper.isRunning()) {
-                return;
-            }
-
-            try {
-                final List<Alert> alerts = alertDao.listPendingAlerts();
-                alertSender.send(alerts);
-            } catch (Exception e) {
-                logger.error("Failed to send alert", e);
-            }
-        }
-    }
 }
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
index 911b912d76..c37cf6fe7a 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
@@ -18,7 +18,9 @@
 package org.apache.dolphinscheduler.alert;
 
 import junit.framework.TestCase;
+import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.PluginDao;
+import org.apache.dolphinscheduler.dao.entity.Alert;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.junit.Assert;
@@ -27,9 +29,13 @@ import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
+import java.util.List;
+
 
 @RunWith(MockitoJUnitRunner.class)
 public class AlertServerTest extends TestCase {
@@ -42,19 +48,26 @@ public class AlertServerTest extends TestCase {
     
     @Mock
     private AlertConfig alertConfig;
+
+    @Mock
+    private AlertSenderService alertSenderService;
     
     @Test
     public void testStart() {
+
         Mockito.when(pluginDao.checkPluginDefineTableExist()).thenReturn(true);
         
         Mockito.when(alertConfig.getPort()).thenReturn(50053);
-        
-        alertServer.start(null);
+
+        Mockito.doNothing().when(alertSenderService).start();
+
+        alertServer.run(null);
     
-        NettyRemotingServer nettyRemotingServer = Whitebox.getInternalState(alertServer, "server");
+        NettyRemotingServer nettyRemotingServer = Whitebox.getInternalState(alertServer, "nettyRemotingServer");
     
         NettyServerConfig nettyServerConfig = Whitebox.getInternalState(nettyRemotingServer, "serverConfig");
         
         Assert.assertEquals(50053, nettyServerConfig.getListenPort());
+
     }
 }
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java
index 64e92a02ad..41277fad50 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java
@@ -20,30 +20,35 @@ package org.apache.dolphinscheduler.alert.processor;
 import static org.mockito.Mockito.mock;
 
 import org.apache.dolphinscheduler.alert.AlertRequestProcessor;
-import org.apache.dolphinscheduler.alert.AlertSender;
+import org.apache.dolphinscheduler.alert.AlertSenderService;
 import org.apache.dolphinscheduler.common.enums.WarningType;
-import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
 
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import io.netty.channel.Channel;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class AlertRequestProcessorTest {
+    @InjectMocks
     private AlertRequestProcessor alertRequestProcessor;
 
-    @Before
-    public void before() {
-        final AlertDao alertDao = mock(AlertDao.class);
-        alertRequestProcessor = new AlertRequestProcessor(new AlertSender(alertDao, null));
-    }
+    @Mock
+    private AlertSenderService alertSenderService;
+
 
     @Test
     public void testProcess() {
+        Mockito.when(alertSenderService.syncHandler(1, "title", "content", WarningType.FAILURE.getCode())).thenReturn(new AlertSendResponseCommand());
         Channel channel = mock(Channel.class);
         AlertSendRequestCommand alertSendRequestCommand = new AlertSendRequestCommand(1, "title", "content", WarningType.FAILURE.getCode());
         Command reqCommand = alertSendRequestCommand.convert2Command();
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java
similarity index 86%
rename from dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java
rename to dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java
index 4060e46345..7b7853daf5 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java
@@ -21,7 +21,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import org.apache.dolphinscheduler.alert.AlertPluginManager;
-import org.apache.dolphinscheduler.alert.AlertSender;
+import org.apache.dolphinscheduler.alert.AlertSenderService;
 import org.apache.dolphinscheduler.alert.api.AlertChannel;
 import org.apache.dolphinscheduler.alert.api.AlertResult;
 import org.apache.dolphinscheduler.common.enums.WarningType;
@@ -39,24 +39,29 @@ import java.util.Optional;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class AlertSenderTest {
-    private static final Logger logger = LoggerFactory.getLogger(AlertSenderTest.class);
+public class AlertSenderServiceTest {
+    private static final Logger logger = LoggerFactory.getLogger(AlertSenderServiceTest.class);
 
+    @Mock
     private AlertDao alertDao;
+    @Mock
     private PluginDao pluginDao;
+    @Mock
     private AlertPluginManager alertPluginManager;
 
-    private AlertSender alertSender;
+    @InjectMocks
+    private AlertSenderService alertSenderService;
 
     @Before
     public void before() {
-        alertDao = mock(AlertDao.class);
-        pluginDao = mock(PluginDao.class);
-        alertPluginManager = mock(AlertPluginManager.class);
+        MockitoAnnotations.initMocks(this);
     }
 
     @Test
@@ -65,12 +70,11 @@ public class AlertSenderTest {
         int alertGroupId = 1;
         String title = "alert mail test title";
         String content = "alert mail test content";
-        alertSender = new AlertSender(alertDao, alertPluginManager);
 
         //1.alert instance does not exist
         when(alertDao.listInstanceByAlertGroupId(alertGroupId)).thenReturn(null);
 
-        AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
+        AlertSendResponseCommand alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
         Assert.assertFalse(alertSendResponseCommand.getResStatus());
         alertSendResponseCommand.getResResults().forEach(result ->
             logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@@ -89,7 +93,7 @@ public class AlertSenderTest {
         PluginDefine pluginDefine = new PluginDefine(pluginName, "1", null);
         when(pluginDao.getPluginDefineById(pluginDefineId)).thenReturn(pluginDefine);
 
-        alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
+        alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
         Assert.assertFalse(alertSendResponseCommand.getResStatus());
         alertSendResponseCommand.getResResults().forEach(result ->
             logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@@ -99,7 +103,7 @@ public class AlertSenderTest {
         when(alertChannelMock.process(Mockito.any())).thenReturn(null);
         when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
 
-        alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
+        alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
         Assert.assertFalse(alertSendResponseCommand.getResStatus());
         alertSendResponseCommand.getResResults().forEach(result ->
             logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@@ -111,7 +115,7 @@ public class AlertSenderTest {
         when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
         when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
 
-        alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
+        alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
         Assert.assertFalse(alertSendResponseCommand.getResStatus());
         alertSendResponseCommand.getResResults().forEach(result ->
             logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@@ -123,7 +127,7 @@ public class AlertSenderTest {
         when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
         when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
 
-        alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
+        alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
         Assert.assertTrue(alertSendResponseCommand.getResStatus());
         alertSendResponseCommand.getResResults().forEach(result ->
             logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@@ -143,7 +147,7 @@ public class AlertSenderTest {
         alert.setWarningType(WarningType.FAILURE);
         alertList.add(alert);
 
-        alertSender = new AlertSender(alertDao, alertPluginManager);
+//        alertSenderService = new AlertSenderService();
 
         int pluginDefineId = 1;
         String pluginInstanceParams = "alert-instance-mail-params";
@@ -165,6 +169,7 @@ public class AlertSenderTest {
         when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
         when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
         Assert.assertTrue(Boolean.parseBoolean(alertResult.getStatus()));
-        alertSender.send(alertList);
+        when(alertDao.listInstanceByAlertGroupId(1)).thenReturn(new ArrayList<>());
+        alertSenderService.send(alertList);
     }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 0572e8f9ec..4199627f71 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -327,6 +327,7 @@ public final class Constants {
     public static final String NULL = "NULL";
     public static final String THREAD_NAME_MASTER_SERVER = "Master-Server";
     public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server";
+    public static final String THREAD_NAME_ALERT_SERVER = "Alert-Server";
 
     /**
      * command parameter keys
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java
index ba37e22d2a..83c81c9fbe 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java
@@ -88,4 +88,14 @@ public class AlertSendRequestCommand implements Serializable {
         command.setBody(body);
         return command;
     }
+
+    @Override
+    public String toString() {
+        return "AlertSendRequestCommand{" +
+                "groupId=" + groupId +
+                ", title='" + title + '\'' +
+                ", content='" + content + '\'' +
+                ", warnType=" + warnType +
+                '}';
+    }
 }