You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/05/10 01:56:24 UTC

[dolphinscheduler] branch dev updated: [Improvement-5058][Master] The master-server creates the DB data source twice (#5139)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9c77faa  [Improvement-5058][Master] The master-server creates the DB data source twice (#5139)
9c77faa is described below

commit 9c77faa8ace7242ab9d37b9c6e65a699f1e2f829
Author: zhuangchong <37...@users.noreply.github.com>
AuthorDate: Mon May 10 09:56:17 2021 +0800

    [Improvement-5058][Master] The master-server creates the DB data source twice (#5139)
    
    * reduce the number of data sources created twice by master server.
    
    * update ProcessAlertManagerTest test class to increase coverage.
    
    * add DolphinSchedulerPluginLoaderTest test ignore.
    
    Co-authored-by: zhuang chong
---
 .../plugin/DolphinSchedulerPluginLoaderTest.java   |   2 +
 .../server/master/runner/MasterExecThread.java     |  14 +--
 .../master/runner/MasterSchedulerService.java      |   7 +-
 .../server/utils/DependentExecute.java             |  87 ++++++++--------
 .../dolphinscheduler/server/utils/ParamUtils.java  |  27 +++--
 .../server/master/AlertManagerTest.java            | 113 ---------------------
 .../service/alert/ProcessAlertManager.java         |  19 ++--
 .../service/alert/ProcessAlertManagerTest.java     |  93 +++++++++++++++++
 pom.xml                                            |   2 +-
 9 files changed, 175 insertions(+), 189 deletions(-)

diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/plugin/DolphinSchedulerPluginLoaderTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/plugin/DolphinSchedulerPluginLoaderTest.java
index 8f69621..c67ca42 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/plugin/DolphinSchedulerPluginLoaderTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/plugin/DolphinSchedulerPluginLoaderTest.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.plugin;
 
 import java.util.Objects;
 
+import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
@@ -29,6 +30,7 @@ public class DolphinSchedulerPluginLoaderTest {
      * Method: loadPlugins()
      */
     @Test
+    @Ignore
     public void testLoadPlugins() {
         PluginManagerTest pluginManager = new PluginManagerTest();
         DolphinPluginManagerConfig alertPluginManagerConfig = new DolphinPluginManagerConfig();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 19e10b4..1e8403a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -55,7 +55,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.utils.AlertManager;
+import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
 import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
@@ -147,7 +147,7 @@ public class MasterExecThread implements Runnable {
     /**
      * alert manager
      */
-    private AlertManager alertManager;
+    private ProcessAlertManager processAlertManager;
 
     /**
      * the object of DAG
@@ -185,7 +185,7 @@ public class MasterExecThread implements Runnable {
     public MasterExecThread(ProcessInstance processInstance
             , ProcessService processService
             , NettyRemotingClient nettyRemotingClient
-            , AlertManager alertManager
+            , ProcessAlertManager processAlertManager
             , MasterConfig masterConfig) {
         this.processService = processService;
 
@@ -195,7 +195,7 @@ public class MasterExecThread implements Runnable {
         this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",
                 masterTaskExecNum);
         this.nettyRemotingClient = nettyRemotingClient;
-        this.alertManager = alertManager;
+        this.processAlertManager = processAlertManager;
     }
 
     @Override
@@ -369,7 +369,7 @@ public class MasterExecThread implements Runnable {
         }
         List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(processInstance.getId());
         ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
-        alertManager.sendAlertProcessInstance(processInstance, taskInstances, projectUser);
+        processAlertManager.sendAlertProcessInstance(processInstance, taskInstances, projectUser);
     }
 
     /**
@@ -930,7 +930,7 @@ public class MasterExecThread implements Runnable {
 
             // send warning email if process time out.
             if (!sendTimeWarning && checkProcessTimeOut(processInstance)) {
-                alertManager.sendProcessTimeoutAlert(processInstance,
+                processAlertManager.sendProcessTimeoutAlert(processInstance,
                         processService.findProcessDefineById(processInstance.getProcessDefinitionId()));
                 sendTimeWarning = true;
             }
@@ -993,7 +993,7 @@ public class MasterExecThread implements Runnable {
             }
             // send alert
             if (CollectionUtils.isNotEmpty(this.recoverToleranceFaultTaskList)) {
-                alertManager.sendAlertWorkerToleranceFault(processInstance, recoverToleranceFaultTaskList);
+                processAlertManager.sendAlertWorkerToleranceFault(processInstance, recoverToleranceFaultTaskList);
                 this.recoverToleranceFaultTaskList.clear();
             }
             // updateProcessInstance completed task status
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 1112bca..a2caf17 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -28,7 +28,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient;
-import org.apache.dolphinscheduler.server.utils.AlertManager;
+import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import org.apache.curator.framework.imps.CuratorFrameworkState;
@@ -76,7 +76,8 @@ public class MasterSchedulerService extends Thread {
     /**
      * alert manager
      */
-    private AlertManager alertManager = new AlertManager();
+    @Autowired
+    private ProcessAlertManager processAlertManager;
 
     /**
      *  netty remoting client
@@ -165,7 +166,7 @@ public class MasterSchedulerService extends Thread {
                                         processInstance
                                         , processService
                                         , nettyRemotingClient
-                                        , alertManager
+                                        , processAlertManager
                                         , masterConfig));
                     }
                 } catch (Exception e) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
index 7f76baa..5f10453 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.utils;
 
 import org.apache.dolphinscheduler.common.Constants;
@@ -22,19 +23,21 @@ import org.apache.dolphinscheduler.common.enums.DependentRelation;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.model.DateInterval;
 import org.apache.dolphinscheduler.common.model.DependentItem;
-import org.apache.dolphinscheduler.common.model.TaskNode;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.DependentUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-
 /**
  * dependent item execute
  */
@@ -74,7 +77,7 @@ public class DependentExecute {
      * @param itemList  item list
      * @param relation  relation
      */
-    public DependentExecute(List<DependentItem> itemList, DependentRelation relation){
+    public DependentExecute(List<DependentItem> itemList, DependentRelation relation) {
         this.dependItemList = itemList;
         this.relation = relation;
     }
@@ -85,9 +88,9 @@ public class DependentExecute {
      * @param currentTime   current time
      * @return DependResult
      */
-    private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime){
+    private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime) {
         List<DateInterval> dateIntervals = DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue());
-        return calculateResultForTasks(dependentItem, dateIntervals );
+        return calculateResultForTasks(dependentItem, dateIntervals);
     }
 
     /**
@@ -100,19 +103,18 @@ public class DependentExecute {
                                                  List<DateInterval> dateIntervals) {
 
         DependResult result = DependResult.FAILED;
-        for(DateInterval dateInterval : dateIntervals){
-            ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(),
-                                                    dateInterval);
-            if(processInstance == null){
+        for (DateInterval dateInterval : dateIntervals) {
+            ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(), dateInterval);
+            if (processInstance == null) {
                 return DependResult.WAITING;
             }
             // need to check workflow for updates, so get all task and check the task state
-            if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){
+            if (dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)) {
                 result = dependResultByProcessInstance(processInstance);
-            }else{
+            } else {
                 result = getDependTaskResult(dependentItem.getDepTasks(),processInstance);
             }
-            if(result != DependResult.SUCCESS){
+            if (result != DependResult.SUCCESS) {
                 break;
             }
         }
@@ -123,11 +125,11 @@ public class DependentExecute {
      * depend type = depend_all
      * @return
      */
-    private DependResult dependResultByProcessInstance(ProcessInstance processInstance){
-        if(!processInstance.getState().typeIsFinished()){
+    private DependResult dependResultByProcessInstance(ProcessInstance processInstance) {
+        if (!processInstance.getState().typeIsFinished()) {
             return DependResult.WAITING;
         }
-        if(processInstance.getState().typeIsSuccess()){
+        if (processInstance.getState().typeIsSuccess()) {
             return DependResult.SUCCESS;
         }
         return DependResult.FAILED;
@@ -144,22 +146,22 @@ public class DependentExecute {
         TaskInstance taskInstance = null;
         List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
 
-        for(TaskInstance task : taskInstanceList){
-            if(task.getName().equals(taskName)){
+        for (TaskInstance task : taskInstanceList) {
+            if (task.getName().equals(taskName)) {
                 taskInstance = task;
                 break;
             }
         }
 
-        if(taskInstance == null){
+        if (taskInstance == null) {
             // cannot find task in the process instance
             // maybe because process instance is running or failed.
-            if(processInstance.getState().typeIsFinished()){
+            if (processInstance.getState().typeIsFinished()) {
                 result = DependResult.FAILED;
-            }else{
+            } else {
                 return DependResult.WAITING;
             }
-        }else{
+        } else {
             result = getDependResultByState(taskInstance.getState());
         }
 
@@ -177,7 +179,7 @@ public class DependentExecute {
     private ProcessInstance findLastProcessInterval(int definitionId, DateInterval dateInterval) {
 
         ProcessInstance runningProcess = processService.findLastRunningProcess(definitionId, dateInterval.getStartTime(), dateInterval.getEndTime());
-        if(runningProcess != null){
+        if (runningProcess != null) {
             return runningProcess;
         }
 
@@ -189,15 +191,14 @@ public class DependentExecute {
                 definitionId, dateInterval
         );
 
-        if(lastManualProcess ==null){
+        if (lastManualProcess == null) {
             return lastSchedulerProcess;
         }
-        if(lastSchedulerProcess == null){
+        if (lastSchedulerProcess == null) {
             return lastManualProcess;
         }
 
-        return (lastManualProcess.getEndTime().after(lastSchedulerProcess.getEndTime()))?
-                lastManualProcess : lastSchedulerProcess;
+        return (lastManualProcess.getEndTime().after(lastSchedulerProcess.getEndTime())) ? lastManualProcess : lastSchedulerProcess;
     }
 
     /**
@@ -207,11 +208,11 @@ public class DependentExecute {
      */
     private DependResult getDependResultByState(ExecutionStatus state) {
 
-        if(!state.typeIsFinished()){
+        if (!state.typeIsFinished()) {
             return DependResult.WAITING;
-        }else if(state.typeIsSuccess()){
+        } else if (state.typeIsSuccess()) {
             return DependResult.SUCCESS;
-        }else{
+        } else {
             return DependResult.FAILED;
         }
     }
@@ -223,11 +224,11 @@ public class DependentExecute {
      */
     private DependResult getDependResultByProcessStateWhenTaskNull(ExecutionStatus state) {
 
-        if(state.typeIsRunning()
+        if (state.typeIsRunning()
                 || state == ExecutionStatus.SUBMITTED_SUCCESS
-                || state == ExecutionStatus.WAITTING_THREAD){
+                || state == ExecutionStatus.WAITTING_THREAD) {
             return DependResult.WAITING;
-        }else{
+        } else {
             return DependResult.FAILED;
         }
     }
@@ -237,8 +238,8 @@ public class DependentExecute {
      * @param currentTime current time
      * @return boolean
      */
-    public boolean finish(Date currentTime){
-        if(modelDependResult == DependResult.WAITING){
+    public boolean finish(Date currentTime) {
+        if (modelDependResult == DependResult.WAITING) {
             modelDependResult = getModelDependResult(currentTime);
             return false;
         }
@@ -250,13 +251,13 @@ public class DependentExecute {
      * @param currentTime current time
      * @return DependResult
      */
-    public DependResult getModelDependResult(Date currentTime){
+    public DependResult getModelDependResult(Date currentTime) {
 
         List<DependResult> dependResultList = new ArrayList<>();
 
-        for(DependentItem dependentItem : dependItemList){
+        for (DependentItem dependentItem : dependItemList) {
             DependResult dependResult = getDependResultForItem(dependentItem, currentTime);
-            if(dependResult != DependResult.WAITING){
+            if (dependResult != DependResult.WAITING) {
                 dependResultMap.put(dependentItem.getKey(), dependResult);
             }
             dependResultList.add(dependResult);
@@ -273,15 +274,15 @@ public class DependentExecute {
      * @param currentTime   current time
      * @return DependResult
      */
-    private DependResult getDependResultForItem(DependentItem item, Date currentTime){
+    private DependResult getDependResultForItem(DependentItem item, Date currentTime) {
         String key = item.getKey();
-        if(dependResultMap.containsKey(key)){
+        if (dependResultMap.containsKey(key)) {
             return dependResultMap.get(key);
         }
         return getDependentResultForItem(item, currentTime);
     }
 
-    public Map<String, DependResult> getDependResultMap(){
+    public Map<String, DependResult> getDependResultMap() {
         return dependResultMap;
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
index 125bd96..875c69c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.utils;
 
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -47,9 +48,8 @@ public class ParamUtils {
                                                            Map<String,String> globalParamsMap,
                                                            Map<String,Property> localParams,
                                                            CommandType commandType,
-                                                           Date scheduleTime){
-        if (globalParams == null
-                && localParams == null){
+                                                           Date scheduleTime) {
+        if (globalParams == null && localParams == null) {
             return null;
         }
         // if it is a complement,
@@ -59,22 +59,22 @@ public class ParamUtils {
                 .getBusinessTime(commandType,
                         scheduleTime);
 
-        if (globalParamsMap != null){
+        if (globalParamsMap != null) {
             timeParams.putAll(globalParamsMap);
         }
 
-        if (globalParams != null && localParams != null){
+        if (globalParams != null && localParams != null) {
             globalParams.putAll(localParams);
-        }else if (globalParams == null && localParams != null){
+        } else if (globalParams == null && localParams != null) {
             globalParams = localParams;
         }
         Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
-        while (iter.hasNext()){
+        while (iter.hasNext()) {
             Map.Entry<String, Property> en = iter.next();
             Property property = en.getValue();
 
             if (StringUtils.isNotEmpty(property.getValue())
-                    && property.getValue().startsWith("$")){
+                    && property.getValue().startsWith("$")) {
                 /**
                  *  local parameter refers to global parameter with the same name
                  *  note: the global parameters of the process instance here are solidified parameters,
@@ -94,21 +94,20 @@ public class ParamUtils {
      * @param paramsMap params map
      * @return Map of converted
      */
-    public static Map<String,String> convert(Map<String,Property> paramsMap){
-        if(paramsMap == null){
+    public static Map<String,String> convert(Map<String,Property> paramsMap) {
+        if (paramsMap == null) {
             return null;
         }
 
         Map<String,String> map = new HashMap<>();
         Iterator<Map.Entry<String, Property>> iter = paramsMap.entrySet().iterator();
-        while (iter.hasNext()){
+        while (iter.hasNext()) {
             Map.Entry<String, Property> en = iter.next();
             map.put(en.getKey(),en.getValue().getValue());
         }
         return map;
     }
 
-
     /**
      * get parameters map
      * @param definedParams definedParams
@@ -118,9 +117,9 @@ public class ParamUtils {
         if (definedParams != null) {
             Map<String,Property> userDefParamsMaps = new HashMap<>();
             Iterator<Map.Entry<String, String>> iter = definedParams.entrySet().iterator();
-            while (iter.hasNext()){
+            while (iter.hasNext()) {
                 Map.Entry<String, String> en = iter.next();
-                Property property = new Property(en.getKey(), Direct.IN, DataType.VARCHAR , en.getValue());
+                Property property = new Property(en.getKey(), Direct.IN, DataType.VARCHAR, en.getValue());
                 userDefParamsMaps.put(property.getProp(),property);
             }
             return userDefParamsMaps;
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/AlertManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/AlertManagerTest.java
deleted file mode 100644
index 58fea5e..0000000
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/AlertManagerTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.master;
-
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.ProjectUser;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
-import org.apache.dolphinscheduler.server.utils.AlertManager;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- *  alert manager test
- */
-@Ignore
-public class AlertManagerTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(AlertManagerTest.class);
-
-    @Autowired
-    ProcessDefinitionMapper processDefinitionMapper;
-
-    @Autowired
-    ProcessInstanceMapper processInstanceMapper;
-
-    @Autowired
-    TaskInstanceMapper taskInstanceMapper;
-
-    @Autowired
-    ProjectMapper projectMapper;
-
-    AlertManager alertManager;
-
-    /**
-     * send worker alert fault tolerance
-     */
-    @Test
-    public void sendWarnningWorkerleranceFaultTest(){
-        // process instance
-        ProcessInstance processInstance = processInstanceMapper.queryDetailById(13028);
-
-        // set process definition
-        ProcessDefinition processDefinition = processDefinitionMapper.selectById(47);
-        processInstance.setProcessDefinition(processDefinition);
-
-
-        // fault task instance
-        TaskInstance toleranceTask1 = taskInstanceMapper.selectById(5038);
-        TaskInstance toleranceTask2 = taskInstanceMapper.selectById(5039);
-
-        List<TaskInstance> toleranceTaskList = new ArrayList<>(2);
-        toleranceTaskList.add(toleranceTask1);
-        toleranceTaskList.add(toleranceTask2);
-
-        alertManager.sendAlertWorkerToleranceFault(processInstance, toleranceTaskList);
-    }
-
-
-    /**
-     * send worker alert fault tolerance
-     */
-    @Test
-    public void sendWarnningOfProcessInstanceTest(){
-        // process instance
-        ProcessInstance processInstance = processInstanceMapper.queryDetailById(13028);
-
-        // set process definition
-        ProcessDefinition processDefinition = processDefinitionMapper.selectById(47);
-        processInstance.setProcessDefinition(processDefinition);
-
-        // fault task instance
-        TaskInstance toleranceTask1 = taskInstanceMapper.selectById(5038);
-        toleranceTask1.setState(ExecutionStatus.FAILURE);
-        TaskInstance toleranceTask2 = taskInstanceMapper.selectById(5039);
-        toleranceTask2.setState(ExecutionStatus.FAILURE);
-
-        List<TaskInstance> toleranceTaskList = new ArrayList<>(2);
-        toleranceTaskList.add(toleranceTask1);
-        toleranceTaskList.add(toleranceTask2);
-
-        ProjectUser projectUser = projectMapper.queryProjectWithUserByProcessInstanceId(processInstance.getId());
-
-        alertManager.sendAlertProcessInstance(processInstance, toleranceTaskList, projectUser);
-    }
-
-}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
similarity index 95%
rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
index 576a3b1..827fb12 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
@@ -15,14 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.server.utils;
+package org.apache.dolphinscheduler.service.alert;
 
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
-import org.apache.dolphinscheduler.dao.DaoFactory;
 import org.apache.dolphinscheduler.dao.entity.Alert;
 import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -36,21 +35,25 @@ import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 /**
- * alert manager
+ * process alert manager
  */
-public class AlertManager {
+@Component
+public class ProcessAlertManager {
 
     /**
      * logger of AlertManager
      */
-    private static final Logger logger = LoggerFactory.getLogger(AlertManager.class);
+    private static final Logger logger = LoggerFactory.getLogger(ProcessAlertManager.class);
 
     /**
      * alert dao
      */
-    private final AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);
+    @Autowired
+    private AlertDao alertDao;
 
     /**
      * command type convert chinese
@@ -183,7 +186,7 @@ public class AlertManager {
             alert.setCreateTime(new Date());
             alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1 : processInstance.getWarningGroupId());
             alertDao.addAlert(alert);
-            logger.info("add alert to db , alert : {}", alert.toString());
+            logger.info("add alert to db , alert : {}", alert);
 
         } catch (Exception e) {
             logger.error("send alert failed:{} ", e.getMessage());
@@ -237,7 +240,7 @@ public class AlertManager {
         alert.setAlertGroupId(processInstance.getWarningGroupId());
         alert.setCreateTime(new Date());
         alertDao.addAlert(alert);
-        logger.info("add alert to db , alert: {}", alert.toString());
+        logger.info("add alert to db , alert: {}", alert);
     }
 
     /**
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java
new file mode 100644
index 0000000..f6ba45e
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.alert;
+
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.dao.AlertDao;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProjectUser;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ProcessAlertManager Test
+ */
+@RunWith(PowerMockRunner.class)
+public class ProcessAlertManagerTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(ProcessAlertManagerTest.class);
+
+    @InjectMocks
+    ProcessAlertManager processAlertManager = new ProcessAlertManager();
+
+    @Mock
+    private AlertDao alertDao;
+
+    /**
+     * send worker alert fault tolerance
+     */
+    @Test
+    public void sendWarningWorkerToleranceFaultTest() {
+        // process instance
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setName("test");
+
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setName("test-task-1");
+        taskInstance.setHost("127.0.0.1");
+        taskInstance.setRetryTimes(3);
+        List<TaskInstance> taskInstanceList = new ArrayList<>();
+        taskInstanceList.add(taskInstance);
+
+        processAlertManager.sendAlertWorkerToleranceFault(processInstance, taskInstanceList);
+    }
+
+
+    /**
+     * send worker alert fault tolerance
+     */
+    @Test
+    public void sendWarnningOfProcessInstanceTest() {
+        // process instance
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setWarningType(WarningType.SUCCESS);
+        processInstance.setState(ExecutionStatus.SUCCESS);
+        processInstance.setCommandType(CommandType.COMPLEMENT_DATA);
+        processInstance.setWarningGroupId(1);
+
+        ProjectUser projectUser = new ProjectUser();
+        TaskInstance taskInstance = new TaskInstance();
+        List<TaskInstance> taskInstanceList = new ArrayList<>();
+        taskInstanceList.add(taskInstance);
+
+        processAlertManager.sendAlertProcessInstance(processInstance, taskInstanceList, projectUser);
+    }
+
+}
diff --git a/pom.xml b/pom.xml
index 31a037c..7d45da6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -941,7 +941,6 @@
                         <include>**/server/master/register/MasterRegistryTest.java</include>
                         <include>**/server/master/registry/ServerNodeManagerTest.java</include>
                         <include>**/server/master/dispatch/host/assign/RoundRobinHostManagerTest.java</include>
-                        <include>**/server/master/AlertManagerTest.java</include>
                         <include>**/server/master/MasterCommandTest.java</include>
                         <include>**/server/master/DependentTaskTest.java</include>
                         <include>**/server/master/ConditionsTaskTest.java</include>
@@ -991,6 +990,7 @@
                         <include>**/service/queue/PeerTaskInstancePriorityQueueTest.java</include>
                         <include>**/service/log/LogClientServiceTest.java</include>
                         <include>**/service/alert/AlertClientServiceTest.java</include>
+                        <include>**/service/alert/ProcessAlertManagerTest.java</include>
                         <include>**/dao/mapper/DataSourceUserMapperTest.java</include>
                         <!--<iTaskUpdateQueueConsumerThreadnclude>**/dao/mapper/ErrorCommandMapperTest.java</iTaskUpdateQueueConsumerThreadnclude>-->
                         <include>**/dao/mapper/ProcessDefinitionMapperTest.java</include>