You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/04/08 02:02:16 UTC
[dolphinscheduler] branch dev updated: [Fix-9221] [alert-server] optimization and gracefully close (#9246)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ca95d2f928 [Fix-9221] [alert-server] optimization and gracefully close (#9246)
ca95d2f928 is described below
commit ca95d2f9282fd234d954194bab40a5bfc713c433
Author: guoshupei <15...@163.com>
AuthorDate: Fri Apr 8 10:02:10 2022 +0800
[Fix-9221] [alert-server] optimization and gracefully close (#9246)
* [Fix-9221] [alert-server] optimization and gracefully close
This closes #9221
* [Fix-9221] [alert-server] remove unused mock data
This closes #9221
* [Fix-9221] [alert-server] remove unused mock data
This closes #9221
* [Fix-9221] [alert-server] remove unnecessary Mockito stubbings
* [Fix-9221] [alert-server] init AlertPluginManager in AlertServer
* [Fix-9221] [alert-server] AlertServerTest add AlertPluginManager installPlugin
* [Fix-9221] [alert-server] replace @Eventlistener with @PostConstruct
* [Fix-9221] [alert-server] sonar check solution
* [Improvement-9221] [alert] update constructor injection and replace IStoppable with Closeable
Co-authored-by: guoshupei <gu...@lixiang.com>
---
.../dolphinscheduler/alert/AlertPluginManager.java | 24 +++--
.../alert/AlertRequestProcessor.java | 8 +-
.../{AlertSender.java => AlertSenderService.java} | 69 ++++++++-----
.../apache/dolphinscheduler/alert/AlertServer.java | 107 +++++++++++----------
.../dolphinscheduler/alert/AlertServerTest.java | 19 +++-
.../alert/processor/AlertRequestProcessorTest.java | 21 ++--
...SenderTest.java => AlertSenderServiceTest.java} | 35 ++++---
.../apache/dolphinscheduler/common/Constants.java | 1 +
.../command/alert/AlertSendRequestCommand.java | 10 ++
9 files changed, 177 insertions(+), 117 deletions(-)
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java
index f590d31aee..cc84769304 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java
@@ -41,8 +41,6 @@ import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
-import javax.annotation.PostConstruct;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
@@ -55,23 +53,23 @@ public final class AlertPluginManager {
private final PluginDao pluginDao;
- private final Map<Integer, AlertChannel> channelKeyedById = new HashMap<>();
-
- private final PluginParams warningTypeParams = getWarningTypeParams();
-
public AlertPluginManager(PluginDao pluginDao) {
this.pluginDao = pluginDao;
}
+ private final Map<Integer, AlertChannel> channelKeyedById = new HashMap<>();
+
+ private final PluginParams warningTypeParams = getWarningTypeParams();
+
public PluginParams getWarningTypeParams() {
return
- RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE)
- .addParamsOptions(new ParamsOptions(WarningType.SUCCESS.getDescp(), WarningType.SUCCESS.getDescp(), false))
- .addParamsOptions(new ParamsOptions(WarningType.FAILURE.getDescp(), WarningType.FAILURE.getDescp(), false))
- .addParamsOptions(new ParamsOptions(WarningType.ALL.getDescp(), WarningType.ALL.getDescp(), false))
- .setValue(WarningType.ALL.getDescp())
- .addValidate(Validate.newBuilder().setRequired(true).build())
- .build();
+ RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE)
+ .addParamsOptions(new ParamsOptions(WarningType.SUCCESS.getDescp(), WarningType.SUCCESS.getDescp(), false))
+ .addParamsOptions(new ParamsOptions(WarningType.FAILURE.getDescp(), WarningType.FAILURE.getDescp(), false))
+ .addParamsOptions(new ParamsOptions(WarningType.ALL.getDescp(), WarningType.ALL.getDescp(), false))
+ .setValue(WarningType.ALL.getDescp())
+ .addValidate(Validate.newBuilder().setRequired(true).build())
+ .build();
}
@EventListener
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java
index c85292f725..6bb4aa0049 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java
@@ -36,10 +36,10 @@ import io.netty.channel.Channel;
public final class AlertRequestProcessor implements NettyRequestProcessor {
private static final Logger logger = LoggerFactory.getLogger(AlertRequestProcessor.class);
- private final AlertSender alertSender;
+ private final AlertSenderService alertSenderService;
- public AlertRequestProcessor(AlertSender alertSender) {
- this.alertSender = alertSender;
+ public AlertRequestProcessor(AlertSenderService alertSenderService) {
+ this.alertSenderService = alertSenderService;
}
@Override
@@ -51,7 +51,7 @@ public final class AlertRequestProcessor implements NettyRequestProcessor {
logger.info("Received command : {}", alertSendRequestCommand);
- AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(
+ AlertSendResponseCommand alertSendResponseCommand = alertSenderService.syncHandler(
alertSendRequestCommand.getGroupId(),
alertSendRequestCommand.getTitle(),
alertSendRequestCommand.getContent(),
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSender.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
similarity index 85%
rename from dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSender.java
rename to dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
index 3ddc5c5795..af47913537 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSender.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
@@ -17,21 +17,26 @@
package org.apache.dolphinscheduler.alert;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertConstants;
import org.apache.dolphinscheduler.alert.api.AlertData;
import org.apache.dolphinscheduler.alert.api.AlertInfo;
import org.apache.dolphinscheduler.alert.api.AlertResult;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;
-
-import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashSet;
@@ -40,22 +45,39 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
-@Component
-public final class AlertSender {
- private static final Logger logger = LoggerFactory.getLogger(AlertSender.class);
+@Service
+public final class AlertSenderService extends Thread {
+ private static final Logger logger = LoggerFactory.getLogger(AlertSenderService.class);
private final AlertDao alertDao;
private final AlertPluginManager alertPluginManager;
- public AlertSender(AlertDao alertDao, AlertPluginManager alertPluginManager) {
+ public AlertSenderService(AlertDao alertDao, AlertPluginManager alertPluginManager) {
this.alertDao = alertDao;
this.alertPluginManager = alertPluginManager;
}
+ @Override
+ public synchronized void start() {
+ super.setName("AlertSenderService");
+ super.start();
+ }
+
+ @Override
+ public void run() {
+ logger.info("alert sender started");
+ while (Stopper.isRunning()) {
+ try {
+ List<Alert> alerts = alertDao.listPendingAlerts();
+ this.send(alerts);
+ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L);
+ } catch (Exception e) {
+ logger.error("alert sender thread error", e);
+ }
+ }
+ }
+
+
public void send(List<Alert> alerts) {
for (Alert alert : alerts) {
//get alert group from alert
@@ -68,11 +90,11 @@ public final class AlertSender {
}
AlertData alertData = new AlertData();
alertData.setId(alert.getId())
- .setContent(alert.getContent())
- .setLog(alert.getLog())
- .setTitle(alert.getTitle())
- .setTitle(alert.getTitle())
- .setWarnType(alert.getWarningType().getCode());
+ .setContent(alert.getContent())
+ .setLog(alert.getLog())
+ .setTitle(alert.getTitle())
+ .setTitle(alert.getTitle())
+ .setWarnType(alert.getWarningType().getCode());
int sendSuccessCount = 0;
for (AlertPluginInstance instance : alertInstanceList) {
@@ -93,23 +115,22 @@ public final class AlertSender {
}
alertDao.updateAlert(alertStatus, "", alert.getId());
}
-
}
/**
* sync send alert handler
*
* @param alertGroupId alertGroupId
- * @param title title
- * @param content content
+ * @param title title
+ * @param content content
* @return AlertSendResponseCommand
*/
- public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content , int warnType) {
+ public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content, int warnType) {
List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
AlertData alertData = new AlertData();
alertData.setContent(content)
- .setTitle(title)
- .setWarnType(warnType);
+ .setTitle(title)
+ .setWarnType(warnType);
boolean sendResponseStatus = true;
List<AlertSendResponseResult> sendResponseResults = new ArrayList<>();
@@ -128,7 +149,7 @@ public final class AlertSender {
AlertResult alertResult = this.alertResultHandler(instance, alertData);
if (alertResult != null) {
AlertSendResponseResult alertSendResponseResult = new AlertSendResponseResult(
- Boolean.parseBoolean(String.valueOf(alertResult.getStatus())), alertResult.getMessage());
+ Boolean.parseBoolean(String.valueOf(alertResult.getStatus())), alertResult.getMessage());
sendResponseStatus = sendResponseStatus && alertSendResponseResult.getStatus();
sendResponseResults.add(alertSendResponseResult);
}
@@ -140,7 +161,7 @@ public final class AlertSender {
/**
* alert result handler
*
- * @param instance instance
+ * @param instance instance
* @param alertData alertData
* @return AlertResult
*/
@@ -159,7 +180,7 @@ public final class AlertSender {
Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams());
String instanceWarnType = WarningType.ALL.getDescp();
- if(paramsMap != null){
+ if (paramsMap != null) {
instanceWarnType = paramsMap.getOrDefault(AlertConstants.NAME_WARNING_TYPE, WarningType.ALL.getDescp());
}
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
index 6b18befe39..60bd09e6e2 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
@@ -17,73 +17,95 @@
package org.apache.dolphinscheduler.alert;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.dao.AlertDao;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.PluginDao;
-import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
-
-import java.io.Closeable;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.PreDestroy;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.SpringApplication;
+import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.event.EventListener;
+import javax.annotation.PreDestroy;
+import java.io.Closeable;
+
@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler")
public class AlertServer implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(AlertServer.class);
private final PluginDao pluginDao;
- private final AlertDao alertDao;
- private final AlertPluginManager alertPluginManager;
- private final AlertSender alertSender;
+ private final AlertSenderService alertSenderService;
private final AlertRequestProcessor alertRequestProcessor;
+ private final AlertConfig alertConfig;
+ private NettyRemotingServer nettyRemotingServer;
- private NettyRemotingServer server;
-
- @Autowired
- private AlertConfig config;
-
- public AlertServer(PluginDao pluginDao, AlertDao alertDao, AlertPluginManager alertPluginManager, AlertSender alertSender, AlertRequestProcessor alertRequestProcessor) {
+ public AlertServer(PluginDao pluginDao, AlertSenderService alertSenderService, AlertRequestProcessor alertRequestProcessor, AlertConfig alertConfig) {
this.pluginDao = pluginDao;
- this.alertDao = alertDao;
- this.alertPluginManager = alertPluginManager;
- this.alertSender = alertSender;
+ this.alertSenderService = alertSenderService;
this.alertRequestProcessor = alertRequestProcessor;
+ this.alertConfig = alertConfig;
}
+ /**
+ * alert server startup, not use web service
+ *
+ * @param args arguments
+ */
public static void main(String[] args) {
- SpringApplication.run(AlertServer.class, args);
+ Thread.currentThread().setName(Constants.THREAD_NAME_ALERT_SERVER);
+ new SpringApplicationBuilder(AlertServer.class).web(WebApplicationType.NONE).run(args);
}
@EventListener
- public void start(ApplicationReadyEvent readyEvent) {
- logger.info("Starting Alert server");
+ public void run(ApplicationReadyEvent readyEvent) {
+ logger.info("alert server starting...");
checkTable();
startServer();
-
- Executors.newScheduledThreadPool(1)
- .scheduleAtFixedRate(new Sender(), 5, 5, TimeUnit.SECONDS);
+ alertSenderService.start();
}
@Override
@PreDestroy
public void close() {
- server.close();
+ destroy("alert server destroy");
+ }
+
+ /**
+ * gracefully stop
+ *
+ * @param cause stop cause
+ */
+ public void destroy(String cause) {
+
+ try {
+ // execute only once
+ if (Stopper.isStopped()) {
+ return;
+ }
+
+ logger.info("alert server is stopping ..., cause : {}", cause);
+
+ // set stop signal is true
+ Stopper.stop();
+
+ // thread sleep 3 seconds for thread quietly stop
+ ThreadUtils.sleep(3000L);
+
+ // close
+ this.nettyRemotingServer.close();
+
+ } catch (Exception e) {
+ logger.error("alert server stop exception ", e);
+ }
}
private void checkTable() {
@@ -95,26 +117,11 @@ public class AlertServer implements Closeable {
private void startServer() {
NettyServerConfig serverConfig = new NettyServerConfig();
- serverConfig.setListenPort(config.getPort());
+ serverConfig.setListenPort(alertConfig.getPort());
- server = new NettyRemotingServer(serverConfig);
- server.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor);
- server.start();
+ nettyRemotingServer = new NettyRemotingServer(serverConfig);
+ nettyRemotingServer.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor);
+ nettyRemotingServer.start();
}
- final class Sender implements Runnable {
- @Override
- public void run() {
- if (!Stopper.isRunning()) {
- return;
- }
-
- try {
- final List<Alert> alerts = alertDao.listPendingAlerts();
- alertSender.send(alerts);
- } catch (Exception e) {
- logger.error("Failed to send alert", e);
- }
- }
- }
}
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
index 911b912d76..c37cf6fe7a 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
@@ -18,7 +18,9 @@
package org.apache.dolphinscheduler.alert;
import junit.framework.TestCase;
+import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.PluginDao;
+import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.junit.Assert;
@@ -27,9 +29,13 @@ import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
import org.mockito.junit.MockitoJUnitRunner;
import org.powermock.reflect.Whitebox;
+import java.util.ArrayList;
+import java.util.List;
+
@RunWith(MockitoJUnitRunner.class)
public class AlertServerTest extends TestCase {
@@ -42,19 +48,26 @@ public class AlertServerTest extends TestCase {
@Mock
private AlertConfig alertConfig;
+
+ @Mock
+ private AlertSenderService alertSenderService;
@Test
public void testStart() {
+
Mockito.when(pluginDao.checkPluginDefineTableExist()).thenReturn(true);
Mockito.when(alertConfig.getPort()).thenReturn(50053);
-
- alertServer.start(null);
+
+ Mockito.doNothing().when(alertSenderService).start();
+
+ alertServer.run(null);
- NettyRemotingServer nettyRemotingServer = Whitebox.getInternalState(alertServer, "server");
+ NettyRemotingServer nettyRemotingServer = Whitebox.getInternalState(alertServer, "nettyRemotingServer");
NettyServerConfig nettyServerConfig = Whitebox.getInternalState(nettyRemotingServer, "serverConfig");
Assert.assertEquals(50053, nettyServerConfig.getListenPort());
+
}
}
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java
index 64e92a02ad..41277fad50 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java
@@ -20,30 +20,35 @@ package org.apache.dolphinscheduler.alert.processor;
import static org.mockito.Mockito.mock;
import org.apache.dolphinscheduler.alert.AlertRequestProcessor;
-import org.apache.dolphinscheduler.alert.AlertSender;
+import org.apache.dolphinscheduler.alert.AlertSenderService;
import org.apache.dolphinscheduler.common.enums.WarningType;
-import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import io.netty.channel.Channel;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+@RunWith(MockitoJUnitRunner.class)
public class AlertRequestProcessorTest {
+ @InjectMocks
private AlertRequestProcessor alertRequestProcessor;
- @Before
- public void before() {
- final AlertDao alertDao = mock(AlertDao.class);
- alertRequestProcessor = new AlertRequestProcessor(new AlertSender(alertDao, null));
- }
+ @Mock
+ private AlertSenderService alertSenderService;
+
@Test
public void testProcess() {
+ Mockito.when(alertSenderService.syncHandler(1, "title", "content", WarningType.FAILURE.getCode())).thenReturn(new AlertSendResponseCommand());
Channel channel = mock(Channel.class);
AlertSendRequestCommand alertSendRequestCommand = new AlertSendRequestCommand(1, "title", "content", WarningType.FAILURE.getCode());
Command reqCommand = alertSendRequestCommand.convert2Command();
diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java
similarity index 86%
rename from dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java
rename to dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java
index 4060e46345..7b7853daf5 100644
--- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java
+++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java
@@ -21,7 +21,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.alert.AlertPluginManager;
-import org.apache.dolphinscheduler.alert.AlertSender;
+import org.apache.dolphinscheduler.alert.AlertSenderService;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.common.enums.WarningType;
@@ -39,24 +39,29 @@ import java.util.Optional;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AlertSenderTest {
- private static final Logger logger = LoggerFactory.getLogger(AlertSenderTest.class);
+public class AlertSenderServiceTest {
+ private static final Logger logger = LoggerFactory.getLogger(AlertSenderServiceTest.class);
+ @Mock
private AlertDao alertDao;
+ @Mock
private PluginDao pluginDao;
+ @Mock
private AlertPluginManager alertPluginManager;
- private AlertSender alertSender;
+ @InjectMocks
+ private AlertSenderService alertSenderService;
@Before
public void before() {
- alertDao = mock(AlertDao.class);
- pluginDao = mock(PluginDao.class);
- alertPluginManager = mock(AlertPluginManager.class);
+ MockitoAnnotations.initMocks(this);
}
@Test
@@ -65,12 +70,11 @@ public class AlertSenderTest {
int alertGroupId = 1;
String title = "alert mail test title";
String content = "alert mail test content";
- alertSender = new AlertSender(alertDao, alertPluginManager);
//1.alert instance does not exist
when(alertDao.listInstanceByAlertGroupId(alertGroupId)).thenReturn(null);
- AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
+ AlertSendResponseCommand alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@@ -89,7 +93,7 @@ public class AlertSenderTest {
PluginDefine pluginDefine = new PluginDefine(pluginName, "1", null);
when(pluginDao.getPluginDefineById(pluginDefineId)).thenReturn(pluginDefine);
- alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
+ alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@@ -99,7 +103,7 @@ public class AlertSenderTest {
when(alertChannelMock.process(Mockito.any())).thenReturn(null);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
- alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
+ alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@@ -111,7 +115,7 @@ public class AlertSenderTest {
when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
- alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
+ alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@@ -123,7 +127,7 @@ public class AlertSenderTest {
when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
- alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
+ alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertTrue(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@@ -143,7 +147,7 @@ public class AlertSenderTest {
alert.setWarningType(WarningType.FAILURE);
alertList.add(alert);
- alertSender = new AlertSender(alertDao, alertPluginManager);
+// alertSenderService = new AlertSenderService();
int pluginDefineId = 1;
String pluginInstanceParams = "alert-instance-mail-params";
@@ -165,6 +169,7 @@ public class AlertSenderTest {
when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
Assert.assertTrue(Boolean.parseBoolean(alertResult.getStatus()));
- alertSender.send(alertList);
+ when(alertDao.listInstanceByAlertGroupId(1)).thenReturn(new ArrayList<>());
+ alertSenderService.send(alertList);
}
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 0572e8f9ec..4199627f71 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -327,6 +327,7 @@ public final class Constants {
public static final String NULL = "NULL";
public static final String THREAD_NAME_MASTER_SERVER = "Master-Server";
public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server";
+ public static final String THREAD_NAME_ALERT_SERVER = "Alert-Server";
/**
* command parameter keys
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java
index ba37e22d2a..83c81c9fbe 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java
@@ -88,4 +88,14 @@ public class AlertSendRequestCommand implements Serializable {
command.setBody(body);
return command;
}
+
+ @Override
+ public String toString() {
+ return "AlertSendRequestCommand{" +
+ "groupId=" + groupId +
+ ", title='" + title + '\'' +
+ ", content='" + content + '\'' +
+ ", warnType=" + warnType +
+ '}';
+ }
}