You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/05/10 01:56:24 UTC
[dolphinscheduler] branch dev updated: [Improvement-5058][Master]
The master-server creates the DB data source twice (#5139)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 9c77faa [Improvement-5058][Master] The master-server creates the DB data source twice (#5139)
9c77faa is described below
commit 9c77faa8ace7242ab9d37b9c6e65a699f1e2f829
Author: zhuangchong <37...@users.noreply.github.com>
AuthorDate: Mon May 10 09:56:17 2021 +0800
[Improvement-5058][Master] The master-server creates the DB data source twice (#5139)
* reduce the number of data sources created twice by master server.
* update ProcessAlertManagerTest test class to increase coverage.
* add DolphinSchedulerPluginLoaderTest test ignore.
Co-authored-by: zhuang chong
---
.../plugin/DolphinSchedulerPluginLoaderTest.java | 2 +
.../server/master/runner/MasterExecThread.java | 14 +--
.../master/runner/MasterSchedulerService.java | 7 +-
.../server/utils/DependentExecute.java | 87 ++++++++--------
.../dolphinscheduler/server/utils/ParamUtils.java | 27 +++--
.../server/master/AlertManagerTest.java | 113 ---------------------
.../service/alert/ProcessAlertManager.java | 19 ++--
.../service/alert/ProcessAlertManagerTest.java | 93 +++++++++++++++++
pom.xml | 2 +-
9 files changed, 175 insertions(+), 189 deletions(-)
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/plugin/DolphinSchedulerPluginLoaderTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/plugin/DolphinSchedulerPluginLoaderTest.java
index 8f69621..c67ca42 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/plugin/DolphinSchedulerPluginLoaderTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/plugin/DolphinSchedulerPluginLoaderTest.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.plugin;
import java.util.Objects;
+import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
@@ -29,6 +30,7 @@ public class DolphinSchedulerPluginLoaderTest {
* Method: loadPlugins()
*/
@Test
+ @Ignore
public void testLoadPlugins() {
PluginManagerTest pluginManager = new PluginManagerTest();
DolphinPluginManagerConfig alertPluginManagerConfig = new DolphinPluginManagerConfig();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 19e10b4..1e8403a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -55,7 +55,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.utils.AlertManager;
+import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
@@ -147,7 +147,7 @@ public class MasterExecThread implements Runnable {
/**
* alert manager
*/
- private AlertManager alertManager;
+ private ProcessAlertManager processAlertManager;
/**
* the object of DAG
@@ -185,7 +185,7 @@ public class MasterExecThread implements Runnable {
public MasterExecThread(ProcessInstance processInstance
, ProcessService processService
, NettyRemotingClient nettyRemotingClient
- , AlertManager alertManager
+ , ProcessAlertManager processAlertManager
, MasterConfig masterConfig) {
this.processService = processService;
@@ -195,7 +195,7 @@ public class MasterExecThread implements Runnable {
this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",
masterTaskExecNum);
this.nettyRemotingClient = nettyRemotingClient;
- this.alertManager = alertManager;
+ this.processAlertManager = processAlertManager;
}
@Override
@@ -369,7 +369,7 @@ public class MasterExecThread implements Runnable {
}
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(processInstance.getId());
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
- alertManager.sendAlertProcessInstance(processInstance, taskInstances, projectUser);
+ processAlertManager.sendAlertProcessInstance(processInstance, taskInstances, projectUser);
}
/**
@@ -930,7 +930,7 @@ public class MasterExecThread implements Runnable {
// send warning email if process time out.
if (!sendTimeWarning && checkProcessTimeOut(processInstance)) {
- alertManager.sendProcessTimeoutAlert(processInstance,
+ processAlertManager.sendProcessTimeoutAlert(processInstance,
processService.findProcessDefineById(processInstance.getProcessDefinitionId()));
sendTimeWarning = true;
}
@@ -993,7 +993,7 @@ public class MasterExecThread implements Runnable {
}
// send alert
if (CollectionUtils.isNotEmpty(this.recoverToleranceFaultTaskList)) {
- alertManager.sendAlertWorkerToleranceFault(processInstance, recoverToleranceFaultTaskList);
+ processAlertManager.sendAlertWorkerToleranceFault(processInstance, recoverToleranceFaultTaskList);
this.recoverToleranceFaultTaskList.clear();
}
// updateProcessInstance completed task status
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 1112bca..a2caf17 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -28,7 +28,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.zk.ZKMasterClient;
-import org.apache.dolphinscheduler.server.utils.AlertManager;
+import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.curator.framework.imps.CuratorFrameworkState;
@@ -76,7 +76,8 @@ public class MasterSchedulerService extends Thread {
/**
* alert manager
*/
- private AlertManager alertManager = new AlertManager();
+ @Autowired
+ private ProcessAlertManager processAlertManager;
/**
* netty remoting client
@@ -165,7 +166,7 @@ public class MasterSchedulerService extends Thread {
processInstance
, processService
, nettyRemotingClient
- , alertManager
+ , processAlertManager
, masterConfig));
}
} catch (Exception e) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
index 7f76baa..5f10453 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
@@ -22,19 +23,21 @@ import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.DependentItem;
-import org.apache.dolphinscheduler.common.model.TaskNode;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-
/**
* dependent item execute
*/
@@ -74,7 +77,7 @@ public class DependentExecute {
* @param itemList item list
* @param relation relation
*/
- public DependentExecute(List<DependentItem> itemList, DependentRelation relation){
+ public DependentExecute(List<DependentItem> itemList, DependentRelation relation) {
this.dependItemList = itemList;
this.relation = relation;
}
@@ -85,9 +88,9 @@ public class DependentExecute {
* @param currentTime current time
* @return DependResult
*/
- private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime){
+ private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime) {
List<DateInterval> dateIntervals = DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue());
- return calculateResultForTasks(dependentItem, dateIntervals );
+ return calculateResultForTasks(dependentItem, dateIntervals);
}
/**
@@ -100,19 +103,18 @@ public class DependentExecute {
List<DateInterval> dateIntervals) {
DependResult result = DependResult.FAILED;
- for(DateInterval dateInterval : dateIntervals){
- ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(),
- dateInterval);
- if(processInstance == null){
+ for (DateInterval dateInterval : dateIntervals) {
+ ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(), dateInterval);
+ if (processInstance == null) {
return DependResult.WAITING;
}
// need to check workflow for updates, so get all task and check the task state
- if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){
+ if (dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)) {
result = dependResultByProcessInstance(processInstance);
- }else{
+ } else {
result = getDependTaskResult(dependentItem.getDepTasks(),processInstance);
}
- if(result != DependResult.SUCCESS){
+ if (result != DependResult.SUCCESS) {
break;
}
}
@@ -123,11 +125,11 @@ public class DependentExecute {
* depend type = depend_all
* @return
*/
- private DependResult dependResultByProcessInstance(ProcessInstance processInstance){
- if(!processInstance.getState().typeIsFinished()){
+ private DependResult dependResultByProcessInstance(ProcessInstance processInstance) {
+ if (!processInstance.getState().typeIsFinished()) {
return DependResult.WAITING;
}
- if(processInstance.getState().typeIsSuccess()){
+ if (processInstance.getState().typeIsSuccess()) {
return DependResult.SUCCESS;
}
return DependResult.FAILED;
@@ -144,22 +146,22 @@ public class DependentExecute {
TaskInstance taskInstance = null;
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
- for(TaskInstance task : taskInstanceList){
- if(task.getName().equals(taskName)){
+ for (TaskInstance task : taskInstanceList) {
+ if (task.getName().equals(taskName)) {
taskInstance = task;
break;
}
}
- if(taskInstance == null){
+ if (taskInstance == null) {
// cannot find task in the process instance
// maybe because process instance is running or failed.
- if(processInstance.getState().typeIsFinished()){
+ if (processInstance.getState().typeIsFinished()) {
result = DependResult.FAILED;
- }else{
+ } else {
return DependResult.WAITING;
}
- }else{
+ } else {
result = getDependResultByState(taskInstance.getState());
}
@@ -177,7 +179,7 @@ public class DependentExecute {
private ProcessInstance findLastProcessInterval(int definitionId, DateInterval dateInterval) {
ProcessInstance runningProcess = processService.findLastRunningProcess(definitionId, dateInterval.getStartTime(), dateInterval.getEndTime());
- if(runningProcess != null){
+ if (runningProcess != null) {
return runningProcess;
}
@@ -189,15 +191,14 @@ public class DependentExecute {
definitionId, dateInterval
);
- if(lastManualProcess ==null){
+ if (lastManualProcess == null) {
return lastSchedulerProcess;
}
- if(lastSchedulerProcess == null){
+ if (lastSchedulerProcess == null) {
return lastManualProcess;
}
- return (lastManualProcess.getEndTime().after(lastSchedulerProcess.getEndTime()))?
- lastManualProcess : lastSchedulerProcess;
+ return (lastManualProcess.getEndTime().after(lastSchedulerProcess.getEndTime())) ? lastManualProcess : lastSchedulerProcess;
}
/**
@@ -207,11 +208,11 @@ public class DependentExecute {
*/
private DependResult getDependResultByState(ExecutionStatus state) {
- if(!state.typeIsFinished()){
+ if (!state.typeIsFinished()) {
return DependResult.WAITING;
- }else if(state.typeIsSuccess()){
+ } else if (state.typeIsSuccess()) {
return DependResult.SUCCESS;
- }else{
+ } else {
return DependResult.FAILED;
}
}
@@ -223,11 +224,11 @@ public class DependentExecute {
*/
private DependResult getDependResultByProcessStateWhenTaskNull(ExecutionStatus state) {
- if(state.typeIsRunning()
+ if (state.typeIsRunning()
|| state == ExecutionStatus.SUBMITTED_SUCCESS
- || state == ExecutionStatus.WAITTING_THREAD){
+ || state == ExecutionStatus.WAITTING_THREAD) {
return DependResult.WAITING;
- }else{
+ } else {
return DependResult.FAILED;
}
}
@@ -237,8 +238,8 @@ public class DependentExecute {
* @param currentTime current time
* @return boolean
*/
- public boolean finish(Date currentTime){
- if(modelDependResult == DependResult.WAITING){
+ public boolean finish(Date currentTime) {
+ if (modelDependResult == DependResult.WAITING) {
modelDependResult = getModelDependResult(currentTime);
return false;
}
@@ -250,13 +251,13 @@ public class DependentExecute {
* @param currentTime current time
* @return DependResult
*/
- public DependResult getModelDependResult(Date currentTime){
+ public DependResult getModelDependResult(Date currentTime) {
List<DependResult> dependResultList = new ArrayList<>();
- for(DependentItem dependentItem : dependItemList){
+ for (DependentItem dependentItem : dependItemList) {
DependResult dependResult = getDependResultForItem(dependentItem, currentTime);
- if(dependResult != DependResult.WAITING){
+ if (dependResult != DependResult.WAITING) {
dependResultMap.put(dependentItem.getKey(), dependResult);
}
dependResultList.add(dependResult);
@@ -273,15 +274,15 @@ public class DependentExecute {
* @param currentTime current time
* @return DependResult
*/
- private DependResult getDependResultForItem(DependentItem item, Date currentTime){
+ private DependResult getDependResultForItem(DependentItem item, Date currentTime) {
String key = item.getKey();
- if(dependResultMap.containsKey(key)){
+ if (dependResultMap.containsKey(key)) {
return dependResultMap.get(key);
}
return getDependentResultForItem(item, currentTime);
}
- public Map<String, DependResult> getDependResultMap(){
+ public Map<String, DependResult> getDependResultMap() {
return dependResultMap;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
index 125bd96..875c69c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -47,9 +48,8 @@ public class ParamUtils {
Map<String,String> globalParamsMap,
Map<String,Property> localParams,
CommandType commandType,
- Date scheduleTime){
- if (globalParams == null
- && localParams == null){
+ Date scheduleTime) {
+ if (globalParams == null && localParams == null) {
return null;
}
// if it is a complement,
@@ -59,22 +59,22 @@ public class ParamUtils {
.getBusinessTime(commandType,
scheduleTime);
- if (globalParamsMap != null){
+ if (globalParamsMap != null) {
timeParams.putAll(globalParamsMap);
}
- if (globalParams != null && localParams != null){
+ if (globalParams != null && localParams != null) {
globalParams.putAll(localParams);
- }else if (globalParams == null && localParams != null){
+ } else if (globalParams == null && localParams != null) {
globalParams = localParams;
}
Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
- while (iter.hasNext()){
+ while (iter.hasNext()) {
Map.Entry<String, Property> en = iter.next();
Property property = en.getValue();
if (StringUtils.isNotEmpty(property.getValue())
- && property.getValue().startsWith("$")){
+ && property.getValue().startsWith("$")) {
/**
* local parameter refers to global parameter with the same name
* note: the global parameters of the process instance here are solidified parameters,
@@ -94,21 +94,20 @@ public class ParamUtils {
* @param paramsMap params map
* @return Map of converted
*/
- public static Map<String,String> convert(Map<String,Property> paramsMap){
- if(paramsMap == null){
+ public static Map<String,String> convert(Map<String,Property> paramsMap) {
+ if (paramsMap == null) {
return null;
}
Map<String,String> map = new HashMap<>();
Iterator<Map.Entry<String, Property>> iter = paramsMap.entrySet().iterator();
- while (iter.hasNext()){
+ while (iter.hasNext()) {
Map.Entry<String, Property> en = iter.next();
map.put(en.getKey(),en.getValue().getValue());
}
return map;
}
-
/**
* get parameters map
* @param definedParams definedParams
@@ -118,9 +117,9 @@ public class ParamUtils {
if (definedParams != null) {
Map<String,Property> userDefParamsMaps = new HashMap<>();
Iterator<Map.Entry<String, String>> iter = definedParams.entrySet().iterator();
- while (iter.hasNext()){
+ while (iter.hasNext()) {
Map.Entry<String, String> en = iter.next();
- Property property = new Property(en.getKey(), Direct.IN, DataType.VARCHAR , en.getValue());
+ Property property = new Property(en.getKey(), Direct.IN, DataType.VARCHAR, en.getValue());
userDefParamsMaps.put(property.getProp(),property);
}
return userDefParamsMaps;
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/AlertManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/AlertManagerTest.java
deleted file mode 100644
index 58fea5e..0000000
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/AlertManagerTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.master;
-
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.ProjectUser;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
-import org.apache.dolphinscheduler.server.utils.AlertManager;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * alert manager test
- */
-@Ignore
-public class AlertManagerTest {
-
- private static final Logger logger = LoggerFactory.getLogger(AlertManagerTest.class);
-
- @Autowired
- ProcessDefinitionMapper processDefinitionMapper;
-
- @Autowired
- ProcessInstanceMapper processInstanceMapper;
-
- @Autowired
- TaskInstanceMapper taskInstanceMapper;
-
- @Autowired
- ProjectMapper projectMapper;
-
- AlertManager alertManager;
-
- /**
- * send worker alert fault tolerance
- */
- @Test
- public void sendWarnningWorkerleranceFaultTest(){
- // process instance
- ProcessInstance processInstance = processInstanceMapper.queryDetailById(13028);
-
- // set process definition
- ProcessDefinition processDefinition = processDefinitionMapper.selectById(47);
- processInstance.setProcessDefinition(processDefinition);
-
-
- // fault task instance
- TaskInstance toleranceTask1 = taskInstanceMapper.selectById(5038);
- TaskInstance toleranceTask2 = taskInstanceMapper.selectById(5039);
-
- List<TaskInstance> toleranceTaskList = new ArrayList<>(2);
- toleranceTaskList.add(toleranceTask1);
- toleranceTaskList.add(toleranceTask2);
-
- alertManager.sendAlertWorkerToleranceFault(processInstance, toleranceTaskList);
- }
-
-
- /**
- * send worker alert fault tolerance
- */
- @Test
- public void sendWarnningOfProcessInstanceTest(){
- // process instance
- ProcessInstance processInstance = processInstanceMapper.queryDetailById(13028);
-
- // set process definition
- ProcessDefinition processDefinition = processDefinitionMapper.selectById(47);
- processInstance.setProcessDefinition(processDefinition);
-
- // fault task instance
- TaskInstance toleranceTask1 = taskInstanceMapper.selectById(5038);
- toleranceTask1.setState(ExecutionStatus.FAILURE);
- TaskInstance toleranceTask2 = taskInstanceMapper.selectById(5039);
- toleranceTask2.setState(ExecutionStatus.FAILURE);
-
- List<TaskInstance> toleranceTaskList = new ArrayList<>(2);
- toleranceTaskList.add(toleranceTask1);
- toleranceTaskList.add(toleranceTask2);
-
- ProjectUser projectUser = projectMapper.queryProjectWithUserByProcessInstanceId(processInstance.getId());
-
- alertManager.sendAlertProcessInstance(processInstance, toleranceTaskList, projectUser);
- }
-
-}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
similarity index 95%
rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
index 576a3b1..827fb12 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
@@ -15,14 +15,13 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.utils;
+package org.apache.dolphinscheduler.service.alert;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
-import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -36,21 +35,25 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
/**
- * alert manager
+ * process alert manager
*/
-public class AlertManager {
+@Component
+public class ProcessAlertManager {
/**
* logger of AlertManager
*/
- private static final Logger logger = LoggerFactory.getLogger(AlertManager.class);
+ private static final Logger logger = LoggerFactory.getLogger(ProcessAlertManager.class);
/**
* alert dao
*/
- private final AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);
+ @Autowired
+ private AlertDao alertDao;
/**
* command type convert chinese
@@ -183,7 +186,7 @@ public class AlertManager {
alert.setCreateTime(new Date());
alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1 : processInstance.getWarningGroupId());
alertDao.addAlert(alert);
- logger.info("add alert to db , alert : {}", alert.toString());
+ logger.info("add alert to db , alert : {}", alert);
} catch (Exception e) {
logger.error("send alert failed:{} ", e.getMessage());
@@ -237,7 +240,7 @@ public class AlertManager {
alert.setAlertGroupId(processInstance.getWarningGroupId());
alert.setCreateTime(new Date());
alertDao.addAlert(alert);
- logger.info("add alert to db , alert: {}", alert.toString());
+ logger.info("add alert to db , alert: {}", alert);
}
/**
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java
new file mode 100644
index 0000000..f6ba45e
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.alert;
+
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.dao.AlertDao;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProjectUser;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ProcessAlertManager Test
+ */
+@RunWith(PowerMockRunner.class)
+public class ProcessAlertManagerTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(ProcessAlertManagerTest.class);
+
+ @InjectMocks
+ ProcessAlertManager processAlertManager = new ProcessAlertManager();
+
+ @Mock
+ private AlertDao alertDao;
+
+ /**
+ * send worker alert fault tolerance
+ */
+ @Test
+ public void sendWarningWorkerToleranceFaultTest() {
+ // process instance
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setName("test");
+
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setName("test-task-1");
+ taskInstance.setHost("127.0.0.1");
+ taskInstance.setRetryTimes(3);
+ List<TaskInstance> taskInstanceList = new ArrayList<>();
+ taskInstanceList.add(taskInstance);
+
+ processAlertManager.sendAlertWorkerToleranceFault(processInstance, taskInstanceList);
+ }
+
+
+ /**
+ * send worker alert fault tolerance
+ */
+ @Test
+ public void sendWarnningOfProcessInstanceTest() {
+ // process instance
+ ProcessInstance processInstance = new ProcessInstance();
+ processInstance.setWarningType(WarningType.SUCCESS);
+ processInstance.setState(ExecutionStatus.SUCCESS);
+ processInstance.setCommandType(CommandType.COMPLEMENT_DATA);
+ processInstance.setWarningGroupId(1);
+
+ ProjectUser projectUser = new ProjectUser();
+ TaskInstance taskInstance = new TaskInstance();
+ List<TaskInstance> taskInstanceList = new ArrayList<>();
+ taskInstanceList.add(taskInstance);
+
+ processAlertManager.sendAlertProcessInstance(processInstance, taskInstanceList, projectUser);
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 31a037c..7d45da6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -941,7 +941,6 @@
<include>**/server/master/register/MasterRegistryTest.java</include>
<include>**/server/master/registry/ServerNodeManagerTest.java</include>
<include>**/server/master/dispatch/host/assign/RoundRobinHostManagerTest.java</include>
- <include>**/server/master/AlertManagerTest.java</include>
<include>**/server/master/MasterCommandTest.java</include>
<include>**/server/master/DependentTaskTest.java</include>
<include>**/server/master/ConditionsTaskTest.java</include>
@@ -991,6 +990,7 @@
<include>**/service/queue/PeerTaskInstancePriorityQueueTest.java</include>
<include>**/service/log/LogClientServiceTest.java</include>
<include>**/service/alert/AlertClientServiceTest.java</include>
+ <include>**/service/alert/ProcessAlertManagerTest.java</include>
<include>**/dao/mapper/DataSourceUserMapperTest.java</include>
<!--<iTaskUpdateQueueConsumerThreadnclude>**/dao/mapper/ErrorCommandMapperTest.java</iTaskUpdateQueueConsumerThreadnclude>-->
<include>**/dao/mapper/ProcessDefinitionMapperTest.java</include>