You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2020/12/10 14:41:17 UTC
[incubator-dolphinscheduler] branch alert_plugin_design updated:
[Feature-3749][Alert-SPI] SqlTask should send notifications by alert server
api (#4080)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch alert_plugin_design
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/alert_plugin_design by this push:
new 169a4ac [Feature-3749][Alert-SPI] SqlTask should send notifications by alert server api (#4080)
169a4ac is described below
commit 169a4ace7757952bf2fc4492ec3e03905dbaa8e5
Author: zhuangchong <37...@users.noreply.github.com>
AuthorDate: Thu Dec 10 22:41:11 2020 +0800
[Feature-3749][Alert-SPI] SqlTask should send notifications by alert server api (#4080)
* add sqltask send sync alert server.
* update alert-sms license.
* update AlertServer test.
* remote EmailAlertPluginTest.
* update sqltask.
* update test class.
---
.../dolphinscheduler/alert/AlertServerTest.java | 4 +
.../alert/plugin/EmailAlertPluginTest.java | 2 +
.../common/task/sql/SqlParameters.java | 67 +++++--------
.../server/worker/WorkerServer.java | 23 ++++-
.../server/worker/config/WorkerConfig.java | 19 +++-
.../worker/processor/TaskExecuteProcessor.java | 27 +++--
.../server/worker/runner/TaskExecuteThread.java | 29 +++---
.../server/worker/task/TaskManager.java | 6 +-
.../server/worker/task/sql/SqlTask.java | 67 +++----------
.../src/main/resources/worker.properties | 3 +
.../worker/runner/TaskExecuteThreadTest.java | 11 +-
.../server/worker/task/TaskManagerTest.java | 27 +++--
.../server/worker/task/sql/SqlTaskTest.java | 111 +++++++++++++++++++++
.../service/alert/AlertClientService.java | 26 +++++
.../home/pages/dag/_source/formModel/tasks/sql.vue | 64 +++++-------
.../src/js/module/i18n/locale/en_US.js | 3 +
.../src/js/module/i18n/locale/zh_CN.js | 3 +
install.sh | 2 +
pom.xml | 2 +-
19 files changed, 318 insertions(+), 178 deletions(-)
diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
index 37d54c8..a8ead79 100644
--- a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
+++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.alert;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
+import org.apache.dolphinscheduler.alert.plugin.DolphinPluginLoader;
import org.apache.dolphinscheduler.alert.plugin.DolphinPluginManagerConfig;
import org.apache.dolphinscheduler.alert.runner.AlertSender;
import org.apache.dolphinscheduler.alert.utils.Constants;
@@ -71,6 +72,9 @@ public class AlertServerTest {
AlertSender alertSender = PowerMockito.mock(AlertSender.class);
PowerMockito.whenNew(AlertSender.class).withAnyArguments().thenReturn(alertSender);
+ DolphinPluginLoader dolphinPluginLoader = PowerMockito.mock(DolphinPluginLoader.class);
+ PowerMockito.whenNew(DolphinPluginLoader.class).withAnyArguments().thenReturn(dolphinPluginLoader);
+
AlertServer alertServer = AlertServer.getInstance();
Assert.assertNotNull(alertServer);
diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java
index 6558419..a6bd51c 100644
--- a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java
+++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java
@@ -49,6 +49,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
@@ -62,6 +63,7 @@ public class EmailAlertPluginTest {
PluginDao pluginDao = DaoFactory.getDaoInstance(PluginDao.class);
@Test
+ @Ignore
public void testRunSend() throws Exception {
//create alert group
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
index 4604234..3b5d39f 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
@@ -14,11 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.common.task.sql;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
@@ -75,19 +76,13 @@ public class SqlParameters extends AbstractParameters {
private List<String> postStatements;
/**
- * title
- */
- private String title;
-
- /**
- * receivers
+ * groupId
*/
- private String receivers;
-
+ private int groupId;
/**
- * receivers cc
+ * title
*/
- private String receiversCc;
+ private String title;
public String getType() {
return type;
@@ -153,21 +148,6 @@ public class SqlParameters extends AbstractParameters {
this.title = title;
}
- public String getReceivers() {
- return receivers;
- }
-
- public void setReceivers(String receivers) {
- this.receivers = receivers;
- }
-
- public String getReceiversCc() {
- return receiversCc;
- }
-
- public void setReceiversCc(String receiversCc) {
- this.receiversCc = receiversCc;
- }
public List<String> getPreStatements() {
return preStatements;
}
@@ -184,6 +164,14 @@ public class SqlParameters extends AbstractParameters {
this.postStatements = postStatements;
}
+ public int getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(int groupId) {
+ this.groupId = groupId;
+ }
+
@Override
public boolean checkParameters() {
return datasource != 0 && StringUtils.isNotEmpty(type) && StringUtils.isNotEmpty(sql);
@@ -196,19 +184,18 @@ public class SqlParameters extends AbstractParameters {
@Override
public String toString() {
- return "SqlParameters{" +
- "type='" + type + '\'' +
- ", datasource=" + datasource +
- ", sql='" + sql + '\'' +
- ", sqlType=" + sqlType +
- ", udfs='" + udfs + '\'' +
- ", showType='" + showType + '\'' +
- ", connParams='" + connParams + '\'' +
- ", title='" + title + '\'' +
- ", receivers='" + receivers + '\'' +
- ", receiversCc='" + receiversCc + '\'' +
- ", preStatements=" + preStatements +
- ", postStatements=" + postStatements +
- '}';
+ return "SqlParameters{"
+ + "type='" + type + '\''
+ + ", datasource=" + datasource
+ + ", sql='" + sql + '\''
+ + ", sqlType=" + sqlType
+ + ", udfs='" + udfs + '\''
+ + ", showType='" + showType + '\''
+ + ", connParams='" + connParams + '\''
+ + ", groupId='" + groupId + '\''
+ + ", title='" + title + '\''
+ + ", preStatements=" + preStatements
+ + ", postStatements=" + postStatements
+ + '}';
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index f0833cb..072e76a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants;
@@ -25,7 +26,11 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import javax.annotation.PostConstruct;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -33,8 +38,6 @@ import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
-import javax.annotation.PostConstruct;
-
/**
* worker server
*/
@@ -71,6 +74,11 @@ public class WorkerServer {
private SpringApplicationContext springApplicationContext;
/**
+ * alert model netty remote server
+ */
+ private AlertClientService alertClientService;
+
+ /**
* worker server startup
*
* worker server not use web service
@@ -86,7 +94,7 @@ public class WorkerServer {
* worker server run
*/
@PostConstruct
- public void run(){
+ public void run() {
logger.info("start worker server...");
//init remoting server
@@ -100,6 +108,9 @@ public class WorkerServer {
// worker registry
this.workerRegistry.registry();
+ //alert-server client registry
+ alertClientService = new AlertClientService(workerConfig.getAlertListenHost(),Constants.ALERT_RPC_PORT);
+
/**
* register hooks, which are called before the process exits
*/
@@ -115,7 +126,7 @@ public class WorkerServer {
try {
//execute only once
- if(Stopper.isStopped()){
+ if (Stopper.isStopped()) {
return;
}
@@ -127,13 +138,15 @@ public class WorkerServer {
try {
//thread sleep 3 seconds for thread quitely stop
Thread.sleep(3000L);
- }catch (Exception e){
+ } catch (Exception e) {
logger.warn("thread sleep exception", e);
}
this.nettyRemotingServer.close();
this.workerRegistry.unRegistry();
+ this.alertClientService.close();
+
} catch (Exception e) {
logger.error("worker server stop exception ", e);
System.exit(-1);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index fa97403..a32d4c8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -1,4 +1,3 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -15,11 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.config;
+import org.apache.dolphinscheduler.common.Constants;
+
import java.util.Set;
-import org.apache.dolphinscheduler.common.Constants;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
@@ -52,6 +53,9 @@ public class WorkerConfig {
@Value("${worker.weight:100}")
private int weight;
+ @Value("${alert.listen.host:localhost}")
+ private String alertListenHost;
+
public int getListenPort() {
return listenPort;
}
@@ -101,7 +105,7 @@ public class WorkerConfig {
}
public int getWorkerMaxCpuloadAvg() {
- if (workerMaxCpuloadAvg == -1){
+ if (workerMaxCpuloadAvg == -1) {
return Constants.DEFAULT_WORKER_CPU_LOAD;
}
return workerMaxCpuloadAvg;
@@ -111,7 +115,6 @@ public class WorkerConfig {
this.workerMaxCpuloadAvg = workerMaxCpuloadAvg;
}
-
public int getWeight() {
return weight;
}
@@ -119,4 +122,12 @@ public class WorkerConfig {
public void setWeight(int weight) {
this.weight = weight;
}
+
+ public String getAlertListenHost() {
+ return alertListenHost;
+ }
+
+ public void setAlertListenHost(String alertListenHost) {
+ this.alertListenHost = alertListenHost;
+ }
}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 3717ce3..adef703 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
@@ -73,12 +74,24 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
*/
private final TaskCallbackService taskCallbackService;
- public TaskExecuteProcessor(){
+ /**
+ * alert client service
+ */
+ private AlertClientService alertClientService;
+
+ public TaskExecuteProcessor() {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
}
+ public TaskExecuteProcessor(AlertClientService alertClientService) {
+ this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
+ this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
+ this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
+ this.alertClientService = alertClientService;
+ }
+
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
@@ -89,7 +102,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
logger.info("received command : {}", taskRequestCommand);
- if(taskRequestCommand == null){
+ if (taskRequestCommand == null) {
logger.error("task execute request command is null");
return;
}
@@ -97,7 +110,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
- if(taskExecutionContext == null){
+ if (taskExecutionContext == null) {
logger.error("task execution context is null");
return;
}
@@ -144,7 +157,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
return Boolean.TRUE;
});
// submit task
- workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger));
+ workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService));
} catch (ExecutionException | RetryException e) {
logger.error(e.getMessage(), e);
}
@@ -162,9 +175,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
ackCommand.setHost(taskExecutionContext.getHost());
ackCommand.setStartTime(taskExecutionContext.getStartTime());
- if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
+ if (taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())) {
ackCommand.setExecutePath(null);
- }else{
+ } else {
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
}
taskExecutionContext.setLogPath(ackCommand.getLogPath());
@@ -176,7 +189,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
* @param taskExecutionContext taskExecutionContext
* @return execute local path
*/
- private String getExecLocalPath(TaskExecutionContext taskExecutionContext){
+ private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 58f7433..39046e9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.Constants;
@@ -37,6 +38,7 @@ import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContext
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.commons.collections.MapUtils;
@@ -55,7 +57,6 @@ import org.slf4j.LoggerFactory;
import com.github.rholder.retry.RetryException;
-
/**
* task scheduler thread
*/
@@ -92,17 +93,23 @@ public class TaskExecuteThread implements Runnable {
private Logger taskLogger;
/**
+ * alert client server
+ */
+ private AlertClientService alertClientService;
+
+ /**
* constructor
* @param taskExecutionContext taskExecutionContext
* @param taskCallbackService taskCallbackService
*/
public TaskExecuteThread(TaskExecutionContext taskExecutionContext
, TaskCallbackService taskCallbackService
- , Logger taskLogger) {
+ , Logger taskLogger, AlertClientService alertClientService) {
this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService;
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
this.taskLogger = taskLogger;
+ this.alertClientService = alertClientService;
}
@Override
@@ -140,7 +147,7 @@ public class TaskExecuteThread implements Runnable {
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
- task = TaskManager.newTask(taskExecutionContext, taskLogger);
+ task = TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService);
// task init
task.init();
@@ -201,10 +208,10 @@ public class TaskExecuteThread implements Runnable {
// the default timeout is the maximum value of the integer
taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE);
TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
- if (taskTimeoutParameter.getEnable()){
+ if (taskTimeoutParameter.getEnable()) {
// get timeout strategy
taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode());
- switch (taskTimeoutParameter.getStrategy()){
+ switch (taskTimeoutParameter.getStrategy()) {
case WARN:
break;
case FAILED:
@@ -225,21 +232,19 @@ public class TaskExecuteThread implements Runnable {
}
}
-
/**
* kill task
*/
- public void kill(){
- if (task != null){
+ public void kill() {
+ if (task != null) {
try {
task.cancelApplication(true);
- }catch (Exception e){
+ } catch (Exception e) {
logger.error(e.getMessage(),e);
}
}
}
-
/**
* download resource file
*
@@ -250,7 +255,7 @@ public class TaskExecuteThread implements Runnable {
private void downloadResource(String execLocalPath,
Map<String,String> projectRes,
Logger logger) throws Exception {
- if (MapUtils.isEmpty(projectRes)){
+ if (MapUtils.isEmpty(projectRes)) {
return;
}
@@ -267,7 +272,7 @@ public class TaskExecuteThread implements Runnable {
logger.info("get resource file from hdfs :{}", resHdfsPath);
HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true);
- }catch (Exception e){
+ } catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage());
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
index a946333..b89c8d4 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
@@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
import org.apache.dolphinscheduler.server.worker.task.spark.SparkTask;
import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask;
import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopTask;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+
import org.slf4j.Logger;
/**
@@ -44,7 +46,7 @@ public class TaskManager {
* @return AbstractTask
* @throws IllegalArgumentException illegal argument exception
*/
- public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger) throws IllegalArgumentException {
+ public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) throws IllegalArgumentException {
TaskType anEnum = EnumUtils.getEnum(TaskType.class, taskExecutionContext.getTaskType());
if (anEnum == null) {
logger.error("not support task type: {}", taskExecutionContext.getTaskType());
@@ -57,7 +59,7 @@ public class TaskManager {
case PROCEDURE:
return new ProcedureTask(taskExecutionContext, logger);
case SQL:
- return new SqlTask(taskExecutionContext, logger);
+ return new SqlTask(taskExecutionContext, logger, alertClientService);
case MR:
return new MapReduceTask(taskExecutionContext, logger);
case SPARK:
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index cba7a48..8686e9a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.task.sql;
-import static org.apache.dolphinscheduler.common.Constants.COMMA;
import static org.apache.dolphinscheduler.common.Constants.HIVE_CONF;
import static org.apache.dolphinscheduler.common.Constants.PASSWORD;
import static org.apache.dolphinscheduler.common.Constants.SEMICOLON;
@@ -38,16 +37,15 @@ import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
-import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.UDFUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -80,10 +78,7 @@ public class SqlTask extends AbstractTask {
* sql parameters
*/
private SqlParameters sqlParameters;
- /**
- * alert dao
- */
- private AlertDao alertDao;
+
/**
* base datasource
*/
@@ -99,7 +94,10 @@ public class SqlTask extends AbstractTask {
*/
private static final int LIMIT = 10000;
- public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger) {
+
+ private AlertClientService alertClientService;
+
+ public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
@@ -111,7 +109,7 @@ public class SqlTask extends AbstractTask {
throw new RuntimeException("sql task params is not valid");
}
- this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
+ this.alertClientService = alertClientService;
}
@Override
@@ -291,9 +289,7 @@ public class SqlTask extends AbstractTask {
String result = JSONUtils.toJsonString(resultJSONArray);
logger.debug("execute sql : {}", result);
- sendAttachment(StringUtils.isNotEmpty(sqlParameters.getTitle())
- ? sqlParameters.getTitle() : taskExecutionContext.getTaskName()
- + " query result sets",
+ sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets",
JSONUtils.toJsonString(resultJSONArray));
}
@@ -444,48 +440,11 @@ public class SqlTask extends AbstractTask {
* @param title title
* @param content content
*/
- public void sendAttachment(String title, String content) {
-
- List<User> users = alertDao.queryUserByAlertGroupId(taskExecutionContext.getSqlTaskExecutionContext().getWarningGroupId());
-
- // receiving group list
- List<String> receiversList = new ArrayList<>();
- for (User user : users) {
- receiversList.add(user.getEmail().trim());
+ public void sendAttachment(int groupId, String title, String content) {
+ AlertSendResponseCommand alertSendResponseCommand = alertClientService.sendAlert(groupId, title, content);
+ if (!alertSendResponseCommand.getResStatus()) {
+ throw new RuntimeException("send mail failed!");
}
- // custom receiver
- String receivers = sqlParameters.getReceivers();
- if (StringUtils.isNotEmpty(receivers)) {
- String[] splits = receivers.split(COMMA);
- for (String receiver : splits) {
- receiversList.add(receiver.trim());
- }
- }
-
- // copy list
- List<String> receiversCcList = new ArrayList<>();
- // Custom Copier
- String receiversCc = sqlParameters.getReceiversCc();
- if (StringUtils.isNotEmpty(receiversCc)) {
- String[] splits = receiversCc.split(COMMA);
- for (String receiverCc : splits) {
- receiversCcList.add(receiverCc.trim());
- }
- }
-
- String showTypeName = sqlParameters.getShowType().replace(COMMA, "").trim();
- /*
- if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
- Map<String, Object> mailResult = MailUtils.sendMails(receviersList,
- receviersCcList, title, content, ShowType.valueOf(showTypeName).getDescp());
- if(!(boolean) mailResult.get(STATUS)){
- throw new RuntimeException("send mail failed!");
- }
- //TODO AlertServer should provide a grpc interface, which is called when other services need to send alerts
- }else{
- logger.error("showType: {} is not valid " ,showTypeName);
- throw new RuntimeException(String.format("showType: %s is not valid ",showTypeName));
- }*/
}
/**
diff --git a/dolphinscheduler-server/src/main/resources/worker.properties b/dolphinscheduler-server/src/main/resources/worker.properties
index 9fba30c..cea1b4e 100644
--- a/dolphinscheduler-server/src/main/resources/worker.properties
+++ b/dolphinscheduler-server/src/main/resources/worker.properties
@@ -35,3 +35,6 @@
# default worker weight
#work.weight=100
+
+# alert server listener host
+alert.listen.host=localhost
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
index 2e7e531..27c10db 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContext
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
@@ -64,6 +65,8 @@ public class TaskExecuteThreadTest {
private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
+ private AlertClientService alertClientService;
+
@Before
public void before() {
// init task execution context, logger
@@ -100,8 +103,10 @@ public class TaskExecuteThreadTest {
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
.thenReturn(taskExecutionContextCacheManager);
+ alertClientService = PowerMockito.mock(AlertClientService.class);
+
PowerMockito.mockStatic(TaskManager.class);
- PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger))
+ PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService))
.thenReturn(new SimpleTask(taskExecutionContext, taskLogger));
PowerMockito.mockStatic(JSONUtils.class);
@@ -117,7 +122,7 @@ public class TaskExecuteThreadTest {
taskExecutionContext.setTaskType("SQL");
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
- TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger);
+ TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
taskExecuteThread.run();
Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus());
@@ -129,7 +134,7 @@ public class TaskExecuteThreadTest {
taskExecutionContext.setStartTime(null);
taskExecutionContext.setDelayTime(1);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
- TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger);
+ TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
taskExecuteThread.run();
Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus());
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
index eb03839..6acfd18 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
@@ -46,6 +47,8 @@ public class TaskManagerTest {
private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
+ private AlertClientService alertClientService;
+
@Before
public void before() {
// init task execution context, logger
@@ -74,41 +77,43 @@ public class TaskManagerTest {
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
.thenReturn(taskExecutionContextCacheManager);
+
+ alertClientService = PowerMockito.mock(AlertClientService.class);
}
@Test
public void testNewTask() {
taskExecutionContext.setTaskType("SHELL");
- Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("WATERDROP");
- Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("HTTP");
- Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("MR");
- Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("SPARK");
- Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("FLINK");
- Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("PYTHON");
- Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("DATAX");
- Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("SQOOP");
- Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
}
@Test(expected = IllegalArgumentException.class)
public void testNewTaskIsNull() {
taskExecutionContext.setTaskType(null);
- TaskManager.newTask(taskExecutionContext,taskLogger);
+ TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
}
@Test(expected = IllegalArgumentException.class)
public void testNewTaskIsNotExists() {
taskExecutionContext.setTaskType("XXX");
- TaskManager.newTask(taskExecutionContext,taskLogger);
+ TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
}
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
new file mode 100644
index 0000000..64db568
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.task.sql;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.Date;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * sql task test
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(value = {SqlTask.class, DriverManager.class})
+public class SqlTaskTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(SqlTaskTest.class);
+
+ private static final String CONNECTION_PARAMS = "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\","
+ + "\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}";
+
+ private SqlTask sqlTask;
+
+ private TaskExecutionContext taskExecutionContext;
+
+ private AlertClientService alertClientService;
+ @Before
+ public void before() throws Exception {
+ taskExecutionContext = new TaskExecutionContext();
+
+ TaskProps props = new TaskProps();
+ props.setExecutePath("/tmp");
+ props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
+ props.setTaskInstanceId(1);
+ props.setTenantCode("1");
+ props.setEnvFile(".dolphinscheduler_env.sh");
+ props.setTaskStartTime(new Date());
+ props.setTaskTimeout(0);
+ props.setTaskParams(
+ "{\"localParams\":[],\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\",\"sqlType\":1}");
+
+ taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
+ PowerMockito.when(taskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams());
+ PowerMockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
+ PowerMockito.when(taskExecutionContext.getTaskAppId()).thenReturn("1");
+ PowerMockito.when(taskExecutionContext.getTenantCode()).thenReturn("root");
+ PowerMockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date());
+ PowerMockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000);
+ PowerMockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx");
+
+ SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
+ sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS);
+ PowerMockito.when(taskExecutionContext.getSqlTaskExecutionContext()).thenReturn(sqlTaskExecutionContext);
+
+ alertClientService = PowerMockito.mock(AlertClientService.class);
+ sqlTask = new SqlTask(taskExecutionContext, logger, alertClientService);
+ sqlTask.init();
+ }
+
+ @Test
+ public void testGetParameters() {
+ Assert.assertNotNull(sqlTask.getParameters());
+ }
+
+ @Test(expected = Exception.class)
+ public void testHandle() throws Exception {
+ Connection connection = PowerMockito.mock(Connection.class);
+ PowerMockito.mockStatic(DriverManager.class);
+ PowerMockito.when(DriverManager.getConnection(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(connection);
+ PreparedStatement preparedStatement = PowerMockito.mock(PreparedStatement.class);
+ PowerMockito.when(connection.prepareStatement(Mockito.any())).thenReturn(preparedStatement);
+ PowerMockito.mockStatic(ParameterUtils.class);
+ PowerMockito.when(ParameterUtils.replaceScheduleTime(Mockito.any(), Mockito.any())).thenReturn("insert into tb_1 values('1','2')");
+
+ sqlTask.handle();
+ Assert.assertEquals(Constants.EXIT_CODE_SUCCESS,sqlTask.getExitStatusCode());
+ }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
index 7839b4a..49977fa 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
@@ -38,6 +38,10 @@ public class AlertClientService {
private volatile boolean isRunning;
+ private String host;
+
+ private int port;
+
/**
* request time out
*/
@@ -53,6 +57,17 @@ public class AlertClientService {
}
/**
+ * alert client
+ */
+ public AlertClientService(String host, int port) {
+ this.clientConfig = new NettyClientConfig();
+ this.client = new NettyRemotingClient(clientConfig);
+ this.isRunning = true;
+ this.host = host;
+ this.port = port;
+ }
+
+ /**
* close
*/
public void close() {
@@ -63,6 +78,17 @@ public class AlertClientService {
/**
* alert sync send data
+ * @param groupId
+ * @param title
+ * @param content
+ * @return
+ */
+ public AlertSendResponseCommand sendAlert(int groupId, String title, String content) {
+ return this.sendAlert(this.host,this.port,groupId,title,content);
+ }
+
+ /**
+ * alert sync send data
* @param host host
* @param port port
* @param groupId groupId
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue
index 94ddaf8..8e892ab 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue
@@ -35,12 +35,14 @@
:sql-type="sqlType">
</m-sql-type>
</div>
+ <!--
<div v-if="sqlType==0" style="display: inline-block;padding-left: 10px;margin-top: 2px;">
<x-checkbox-group v-model="showType">
<x-checkbox :label="'TABLE'" :disabled="isDetails">{{$t('TableMode')}}</x-checkbox>
<x-checkbox :label="'ATTACHMENT'" :disabled="isDetails">{{$t('Attachment')}}</x-checkbox>
</x-checkbox-group>
</div>
+ -->
</div>
</m-list-box>
<template v-if="sqlType==0">
@@ -50,21 +52,21 @@
<x-input
type="input"
v-model="title"
- :placeholder="$t('Please enter the title of email')"
+ :placeholder="$t('Please enter the title of alert')"
autocomplete="off">
</x-input>
</div>
</m-list-box>
<m-list-box>
- <div slot="text"><strong class='requiredIcon'>*</strong>{{$t('Recipient')}}</div>
+ <!-- TODO Wait for the alarm group/instance page to be developed and add specific content -->
+ <div slot="text"><strong class='requiredIcon'>*</strong>{{$t('AlertGroup')}}</div>
<div slot="content">
- <m-email ref="refEmail" v-model="receivers" :disabled="isDetails" :repeat-data="receiversCc"></m-email>
- </div>
- </m-list-box>
- <m-list-box>
- <div slot="text">{{$t('Cc')}}</div>
- <div slot="content">
- <m-email ref="refCc" v-model="receiversCc" :disabled="isDetails" :repeat-data="receivers"></m-email>
+ <x-input
+ type="input"
+ v-model="groupId"
+ :placeholder="$t('Please select the alert group')"
+ autocomplete="off">
+ </x-input>
</div>
</m-list-box>
</template>
@@ -174,6 +176,8 @@
sqlType: '0',
// Email title
title: '',
+ // Alert groupId
+ groupId: '',
// Form/attachment
showType: ['TABLE'],
// Sql parameter
@@ -181,11 +185,7 @@
// Pre statements
preStatements: [],
// Post statements
- postStatements: [],
- // recipients
- receivers: [],
- // copy to
- receiversCc: []
+ postStatements: []
}
},
mixins: [disabledState],
@@ -275,24 +275,18 @@
if (!this.$refs.refDs._verifDatasource()) {
return false
}
+ /*
if (this.sqlType==0 && !this.showType.length) {
this.$message.warning(`${i18n.$t('One form or attachment must be selected')}`)
return false
}
+ */
if (this.sqlType==0 && !this.title) {
- this.$message.warning(`${i18n.$t('Mail subject required')}`)
- return false
- }
- if (this.sqlType==0 && !this.receivers.length) {
- this.$message.warning(`${i18n.$t('Recipient required')}`)
+ this.$message.warning(`${i18n.$t('Please enter the title of alert')}`)
return false
}
- // receivers Subcomponent verification
- if (this.sqlType==0 && !this.$refs.refEmail._manualEmail()) {
- return false
- }
- // receiversCc Subcomponent verification
- if (this.sqlType==0 && !this.$refs.refCc._manualEmail()) {
+ if (this.sqlType==0 && !this.groupId) {
+ this.$message.warning(`${i18n.$t('Please select the alert group')}`)
return false
}
// udfs Subcomponent verification Verification only if the data type is HIVE
@@ -325,8 +319,7 @@
udfs: this.udfs,
sqlType: this.sqlType,
title: this.title,
- receivers: this.receivers.join(','),
- receiversCc: this.receiversCc.join(','),
+ groupId: this.groupId,
showType: (() => {
/**
* Special processing return order TABLE,ATTACHMENT
@@ -387,10 +380,7 @@
} else {
param.processInstanceId = current.params.id
}
- this.store.dispatch('dag/getReceiver', param).then(res => {
- this.receivers = res.receivers && res.receivers.split(',') || []
- this.receiversCc = res.receiversCc && res.receiversCc.split(',') || []
- })
+
},
_cacheParams () {
this.$emit('on-cache-params', {
@@ -400,8 +390,7 @@
udfs: this.udfs,
sqlType: this.sqlType,
title: this.title,
- receivers: this.receivers.join(','),
- receiversCc: this.receiversCc.join(','),
+ groupId: this.groupId,
showType: (() => {
let showType = this.showType
@@ -433,8 +422,7 @@
}
if (val != 0) {
this.title = ''
- this.receivers = []
- this.receiversCc = []
+ this.groupId = ''
}
},
// Listening data source
@@ -469,8 +457,7 @@
this.preStatements = o.params.preStatements || []
this.postStatements = o.params.postStatements || []
this.title = o.params.title || ''
- this.receivers = o.params.receivers && o.params.receivers.split(',') || []
- this.receiversCc = o.params.receiversCc && o.params.receiversCc.split(',') || []
+ this.groupId = o.params.groupId || ''
}
// read tasks from cache
if (!_.some(this.store.state.dag.cacheTasks, { id: this.createNodeId }) &&
@@ -501,8 +488,7 @@
udfs: this.udfs,
sqlType: this.sqlType,
title: this.title,
- receivers: this.receivers.join(','),
- receiversCc: this.receiversCc.join(','),
+ groupId: this.groupId,
showType: (() => {
let showType = this.showType
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
index f1295da..00d6757 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
@@ -122,6 +122,9 @@ export default {
'SQL Type': 'SQL Type',
Title: 'Title',
'Please enter the title of email': 'Please enter the title of email',
+ 'Please enter the title of alert': 'Please enter the title of alert',
+ 'AlertGroup': 'AlertGroup',
+ 'Please select the alert group': 'Please select the alert group',
Table: 'Table',
TableMode: 'Table',
Attachment: 'Attachment',
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
index 148bc33..e936400 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
@@ -122,6 +122,9 @@ export default {
'SQL Type': 'sql类型',
Title: '主题',
'Please enter the title of email': '请输入邮件主题',
+ 'Please enter the title of alert': '请输入告警主题',
+ 'AlertGroup': '告警组',
+ 'Please select the alert group': '请选择告警组',
Table: '表名',
TableMode: '表格',
Attachment: '附件',
diff --git a/install.sh b/install.sh
index da21085..7991275 100755
--- a/install.sh
+++ b/install.sh
@@ -64,6 +64,8 @@ sed -i ${txt} "s#mail.smtp.starttls.enable.*#mail.smtp.starttls.enable=${starttl
sed -i ${txt} "s#mail.smtp.ssl.trust.*#mail.smtp.ssl.trust=${sslTrust}#g" conf/alert.properties
sed -i ${txt} "s#mail.smtp.ssl.enable.*#mail.smtp.ssl.enable=${sslEnable}#g" conf/alert.properties
+sed -i ${txt} "s#alert.listen.host.*#alert.listen.host=${alertServer}#g" conf/worker.properties
+
# 2.create directory
echo "2.create directory"
diff --git a/pom.xml b/pom.xml
index 8368155..5f49824 100644
--- a/pom.xml
+++ b/pom.xml
@@ -904,6 +904,7 @@
<include>**/server/worker/registry/WorkerRegistryTest.java</include>
<include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
<include>**/server/worker/sql/SqlExecutorTest.java</include>
+ <include>**/server/worker/task/sql/SqlTaskTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
<include>**/server/worker/task/EnvFileTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
@@ -920,7 +921,6 @@
<include>**/service/zk/CuratorZookeeperClientTest.java</include>
<include>**/service/queue/TaskUpdateQueueTest.java</include>
<include>**/service/alert/AlertClientServiceTest.java</include>
-
<include>**/dao/mapper/DataSourceUserMapperTest.java</include>
<!--<iTaskUpdateQueueConsumerThreadnclude>**/dao/mapper/ErrorCommandMapperTest.java</iTaskUpdateQueueConsumerThreadnclude>-->
<include>**/dao/mapper/ProcessDefinitionMapperTest.java</include>