You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2020/12/10 14:41:17 UTC

[incubator-dolphinscheduler] branch alert_plugin_design updated: [Feature-3749][Alert-SPI] SqlTask should send notifications by alert server api (#4080)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch alert_plugin_design
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/alert_plugin_design by this push:
     new 169a4ac  [Feature-3749][Alert-SPI] SqlTask should send notifications by alert server api (#4080)
169a4ac is described below

commit 169a4ace7757952bf2fc4492ec3e03905dbaa8e5
Author: zhuangchong <37...@users.noreply.github.com>
AuthorDate: Thu Dec 10 22:41:11 2020 +0800

    [Feature-3749][Alert-SPI] SqlTask should send notifications by alert server api (#4080)
    
    * add sqltask send sync alert server.
    
    * update alert-sms license.
    
    * update AlertServer test.
    
    * remote EmailAlertPluginTest.
    
    * update sqltask.
    
    * update test class.
---
 .../dolphinscheduler/alert/AlertServerTest.java    |   4 +
 .../alert/plugin/EmailAlertPluginTest.java         |   2 +
 .../common/task/sql/SqlParameters.java             |  67 +++++--------
 .../server/worker/WorkerServer.java                |  23 ++++-
 .../server/worker/config/WorkerConfig.java         |  19 +++-
 .../worker/processor/TaskExecuteProcessor.java     |  27 +++--
 .../server/worker/runner/TaskExecuteThread.java    |  29 +++---
 .../server/worker/task/TaskManager.java            |   6 +-
 .../server/worker/task/sql/SqlTask.java            |  67 +++----------
 .../src/main/resources/worker.properties           |   3 +
 .../worker/runner/TaskExecuteThreadTest.java       |  11 +-
 .../server/worker/task/TaskManagerTest.java        |  27 +++--
 .../server/worker/task/sql/SqlTaskTest.java        | 111 +++++++++++++++++++++
 .../service/alert/AlertClientService.java          |  26 +++++
 .../home/pages/dag/_source/formModel/tasks/sql.vue |  64 +++++-------
 .../src/js/module/i18n/locale/en_US.js             |   3 +
 .../src/js/module/i18n/locale/zh_CN.js             |   3 +
 install.sh                                         |   2 +
 pom.xml                                            |   2 +-
 19 files changed, 318 insertions(+), 178 deletions(-)

diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
index 37d54c8..a8ead79 100644
--- a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
+++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.alert;
 
 import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
+import org.apache.dolphinscheduler.alert.plugin.DolphinPluginLoader;
 import org.apache.dolphinscheduler.alert.plugin.DolphinPluginManagerConfig;
 import org.apache.dolphinscheduler.alert.runner.AlertSender;
 import org.apache.dolphinscheduler.alert.utils.Constants;
@@ -71,6 +72,9 @@ public class AlertServerTest {
         AlertSender alertSender = PowerMockito.mock(AlertSender.class);
         PowerMockito.whenNew(AlertSender.class).withAnyArguments().thenReturn(alertSender);
 
+        DolphinPluginLoader dolphinPluginLoader = PowerMockito.mock(DolphinPluginLoader.class);
+        PowerMockito.whenNew(DolphinPluginLoader.class).withAnyArguments().thenReturn(dolphinPluginLoader);
+
         AlertServer alertServer = AlertServer.getInstance();
         Assert.assertNotNull(alertServer);
 
diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java
index 6558419..a6bd51c 100644
--- a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java
+++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java
@@ -49,6 +49,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
@@ -62,6 +63,7 @@ public class EmailAlertPluginTest {
     PluginDao pluginDao = DaoFactory.getDaoInstance(PluginDao.class);
 
     @Test
+    @Ignore
     public void testRunSend() throws Exception {
 
         //create alert group
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
index 4604234..3b5d39f 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
@@ -14,11 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.common.task.sql;
 
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -75,19 +76,13 @@ public class SqlParameters extends AbstractParameters {
     private List<String> postStatements;
 
     /**
-     * title
-     */
-    private String title;
-
-    /**
-     * receivers
+     * groupId
      */
-    private String receivers;
-
+    private int groupId;
     /**
-     * receivers cc
+     * title
      */
-    private String receiversCc;
+    private String title;
 
     public String getType() {
         return type;
@@ -153,21 +148,6 @@ public class SqlParameters extends AbstractParameters {
         this.title = title;
     }
 
-    public String getReceivers() {
-        return receivers;
-    }
-
-    public void setReceivers(String receivers) {
-        this.receivers = receivers;
-    }
-
-    public String getReceiversCc() {
-        return receiversCc;
-    }
-
-    public void setReceiversCc(String receiversCc) {
-        this.receiversCc = receiversCc;
-    }
     public List<String> getPreStatements() {
         return preStatements;
     }
@@ -184,6 +164,14 @@ public class SqlParameters extends AbstractParameters {
         this.postStatements = postStatements;
     }
 
+    public int getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(int groupId) {
+        this.groupId = groupId;
+    }
+
     @Override
     public boolean checkParameters() {
         return datasource != 0 && StringUtils.isNotEmpty(type) && StringUtils.isNotEmpty(sql);
@@ -196,19 +184,18 @@ public class SqlParameters extends AbstractParameters {
 
     @Override
     public String toString() {
-        return "SqlParameters{" +
-                "type='" + type + '\'' +
-                ", datasource=" + datasource +
-                ", sql='" + sql + '\'' +
-                ", sqlType=" + sqlType +
-                ", udfs='" + udfs + '\'' +
-                ", showType='" + showType + '\'' +
-                ", connParams='" + connParams + '\'' +
-                ", title='" + title + '\'' +
-                ", receivers='" + receivers + '\'' +
-                ", receiversCc='" + receiversCc + '\'' +
-                ", preStatements=" + preStatements +
-                ", postStatements=" + postStatements +
-                '}';
+        return "SqlParameters{"
+                + "type='" + type + '\''
+                + ", datasource=" + datasource
+                + ", sql='" + sql + '\''
+                + ", sqlType=" + sqlType
+                + ", udfs='" + udfs + '\''
+                + ", showType='" + showType + '\''
+                + ", connParams='" + connParams + '\''
+                + ", groupId='" + groupId + '\''
+                + ", title='" + title + '\''
+                + ", preStatements=" + preStatements
+                + ", postStatements=" + postStatements
+                + '}';
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index f0833cb..072e76a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker;
 
 import org.apache.dolphinscheduler.common.Constants;
@@ -25,7 +26,11 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import javax.annotation.PostConstruct;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -33,8 +38,6 @@ import org.springframework.boot.WebApplicationType;
 import org.springframework.boot.builder.SpringApplicationBuilder;
 import org.springframework.context.annotation.ComponentScan;
 
-import javax.annotation.PostConstruct;
-
 /**
  *  worker server
  */
@@ -71,6 +74,11 @@ public class WorkerServer {
     private SpringApplicationContext springApplicationContext;
 
     /**
+     *  alert model netty remote server
+     */
+    private AlertClientService alertClientService;
+
+    /**
      * worker server startup
      *
      * worker server not use web service
@@ -86,7 +94,7 @@ public class WorkerServer {
      * worker server run
      */
     @PostConstruct
-    public void run(){
+    public void run() {
         logger.info("start worker server...");
 
         //init remoting server
@@ -100,6 +108,9 @@ public class WorkerServer {
         // worker registry
         this.workerRegistry.registry();
 
+        //alert-server client registry
+        alertClientService = new AlertClientService(workerConfig.getAlertListenHost(),Constants.ALERT_RPC_PORT);
+
         /**
          * register hooks, which are called before the process exits
          */
@@ -115,7 +126,7 @@ public class WorkerServer {
 
         try {
             //execute only once
-            if(Stopper.isStopped()){
+            if (Stopper.isStopped()) {
                 return;
             }
 
@@ -127,13 +138,15 @@ public class WorkerServer {
             try {
                 //thread sleep 3 seconds for thread quitely stop
                 Thread.sleep(3000L);
-            }catch (Exception e){
+            } catch (Exception e) {
                 logger.warn("thread sleep exception", e);
             }
 
             this.nettyRemotingServer.close();
             this.workerRegistry.unRegistry();
 
+            this.alertClientService.close();
+
         } catch (Exception e) {
             logger.error("worker server stop exception ", e);
             System.exit(-1);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index fa97403..a32d4c8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -1,4 +1,3 @@
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -15,11 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.config;
 
+import org.apache.dolphinscheduler.common.Constants;
+
 import java.util.Set;
 
-import org.apache.dolphinscheduler.common.Constants;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.PropertySource;
 import org.springframework.stereotype.Component;
@@ -52,6 +53,9 @@ public class WorkerConfig {
     @Value("${worker.weight:100}")
     private int weight;
 
+    @Value("${alert.listen.host:localhost}")
+    private String alertListenHost;
+
     public int getListenPort() {
         return listenPort;
     }
@@ -101,7 +105,7 @@ public class WorkerConfig {
     }
 
     public int getWorkerMaxCpuloadAvg() {
-        if (workerMaxCpuloadAvg == -1){
+        if (workerMaxCpuloadAvg == -1) {
             return Constants.DEFAULT_WORKER_CPU_LOAD;
         }
         return workerMaxCpuloadAvg;
@@ -111,7 +115,6 @@ public class WorkerConfig {
         this.workerMaxCpuloadAvg = workerMaxCpuloadAvg;
     }
 
-
     public int getWeight() {
         return weight;
     }
@@ -119,4 +122,12 @@ public class WorkerConfig {
     public void setWeight(int weight) {
         this.weight = weight;
     }
+
+    public String getAlertListenHost() {
+        return alertListenHost;
+    }
+
+    public void setAlertListenHost(String alertListenHost) {
+        this.alertListenHost = alertListenHost;
+    }
 }
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 3717ce3..adef703 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.LogUtils;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import java.util.Date;
@@ -73,12 +74,24 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
      */
     private final TaskCallbackService taskCallbackService;
 
-    public TaskExecuteProcessor(){
+    /**
+     *  alert client service
+     */
+    private AlertClientService alertClientService;
+
+    public TaskExecuteProcessor() {
         this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
         this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
         this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
     }
 
+    public TaskExecuteProcessor(AlertClientService alertClientService) {
+        this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
+        this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
+        this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
+        this.alertClientService = alertClientService;
+    }
+
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
@@ -89,7 +102,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
 
         logger.info("received command : {}", taskRequestCommand);
 
-        if(taskRequestCommand == null){
+        if (taskRequestCommand == null) {
             logger.error("task execute request command is null");
             return;
         }
@@ -97,7 +110,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
         String contextJson = taskRequestCommand.getTaskExecutionContext();
         TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
 
-        if(taskExecutionContext == null){
+        if (taskExecutionContext == null) {
             logger.error("task execution context is null");
             return;
         }
@@ -144,7 +157,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
                 return Boolean.TRUE;
             });
             // submit task
-            workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger));
+            workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService));
         } catch (ExecutionException | RetryException e) {
             logger.error(e.getMessage(), e);
         }
@@ -162,9 +175,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
         ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
         ackCommand.setHost(taskExecutionContext.getHost());
         ackCommand.setStartTime(taskExecutionContext.getStartTime());
-        if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
+        if (taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())) {
             ackCommand.setExecutePath(null);
-        }else{
+        } else {
             ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
         }
         taskExecutionContext.setLogPath(ackCommand.getLogPath());
@@ -176,7 +189,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
      * @param taskExecutionContext taskExecutionContext
      * @return execute local path
      */
-    private String getExecLocalPath(TaskExecutionContext taskExecutionContext){
+    private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
         return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
                 taskExecutionContext.getProcessDefineId(),
                 taskExecutionContext.getProcessInstanceId(),
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 58f7433..39046e9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
@@ -37,6 +38,7 @@ import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContext
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.TaskManager;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import org.apache.commons.collections.MapUtils;
@@ -55,7 +57,6 @@ import org.slf4j.LoggerFactory;
 
 import com.github.rholder.retry.RetryException;
 
-
 /**
  *  task scheduler thread
  */
@@ -92,17 +93,23 @@ public class TaskExecuteThread implements Runnable {
     private Logger taskLogger;
 
     /**
+     * alert client server
+     */
+    private AlertClientService alertClientService;
+
+    /**
      *  constructor
      * @param taskExecutionContext taskExecutionContext
      * @param taskCallbackService taskCallbackService
      */
     public TaskExecuteThread(TaskExecutionContext taskExecutionContext
             , TaskCallbackService taskCallbackService
-            , Logger taskLogger) {
+            , Logger taskLogger, AlertClientService alertClientService) {
         this.taskExecutionContext = taskExecutionContext;
         this.taskCallbackService = taskCallbackService;
         this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
         this.taskLogger = taskLogger;
+        this.alertClientService = alertClientService;
     }
 
     @Override
@@ -140,7 +147,7 @@ public class TaskExecuteThread implements Runnable {
                     taskExecutionContext.getProcessInstanceId(),
                     taskExecutionContext.getTaskInstanceId()));
 
-            task = TaskManager.newTask(taskExecutionContext, taskLogger);
+            task = TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService);
 
             // task init
             task.init();
@@ -201,10 +208,10 @@ public class TaskExecuteThread implements Runnable {
         // the default timeout is the maximum value of the integer
         taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE);
         TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
-        if (taskTimeoutParameter.getEnable()){
+        if (taskTimeoutParameter.getEnable()) {
             // get timeout strategy
             taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode());
-            switch (taskTimeoutParameter.getStrategy()){
+            switch (taskTimeoutParameter.getStrategy()) {
                 case WARN:
                     break;
                 case FAILED:
@@ -225,21 +232,19 @@ public class TaskExecuteThread implements Runnable {
         }
     }
 
-
     /**
      *  kill task
      */
-    public void kill(){
-        if (task != null){
+    public void kill() {
+        if (task != null) {
             try {
                 task.cancelApplication(true);
-            }catch (Exception e){
+            } catch (Exception e) {
                 logger.error(e.getMessage(),e);
             }
         }
     }
 
-
     /**
      * download resource file
      *
@@ -250,7 +255,7 @@ public class TaskExecuteThread implements Runnable {
     private void downloadResource(String execLocalPath,
                                   Map<String,String> projectRes,
                                   Logger logger) throws Exception {
-        if (MapUtils.isEmpty(projectRes)){
+        if (MapUtils.isEmpty(projectRes)) {
             return;
         }
 
@@ -267,7 +272,7 @@ public class TaskExecuteThread implements Runnable {
 
                     logger.info("get resource file from hdfs :{}", resHdfsPath);
                     HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true);
-                }catch (Exception e){
+                } catch (Exception e) {
                     logger.error(e.getMessage(),e);
                     throw new RuntimeException(e.getMessage());
                 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
index a946333..b89c8d4 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
@@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
 import org.apache.dolphinscheduler.server.worker.task.spark.SparkTask;
 import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask;
 import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopTask;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+
 import org.slf4j.Logger;
 
 /**
@@ -44,7 +46,7 @@ public class TaskManager {
      * @return AbstractTask
      * @throws IllegalArgumentException illegal argument exception
      */
-    public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger) throws IllegalArgumentException {
+    public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) throws IllegalArgumentException {
         TaskType anEnum = EnumUtils.getEnum(TaskType.class, taskExecutionContext.getTaskType());
         if (anEnum == null) {
             logger.error("not support task type: {}", taskExecutionContext.getTaskType());
@@ -57,7 +59,7 @@ public class TaskManager {
             case PROCEDURE:
                 return new ProcedureTask(taskExecutionContext, logger);
             case SQL:
-                return new SqlTask(taskExecutionContext, logger);
+                return new SqlTask(taskExecutionContext, logger, alertClientService);
             case MR:
                 return new MapReduceTask(taskExecutionContext, logger);
             case SPARK:
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index cba7a48..8686e9a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.worker.task.sql;
 
-import static org.apache.dolphinscheduler.common.Constants.COMMA;
 import static org.apache.dolphinscheduler.common.Constants.HIVE_CONF;
 import static org.apache.dolphinscheduler.common.Constants.PASSWORD;
 import static org.apache.dolphinscheduler.common.Constants.SEMICOLON;
@@ -38,16 +37,15 @@ import org.apache.dolphinscheduler.common.utils.CommonUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
 import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
-import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
 import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.utils.UDFUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -80,10 +78,7 @@ public class SqlTask extends AbstractTask {
      * sql parameters
      */
     private SqlParameters sqlParameters;
-    /**
-     * alert dao
-     */
-    private AlertDao alertDao;
+
     /**
      * base datasource
      */
@@ -99,7 +94,10 @@ public class SqlTask extends AbstractTask {
      */
     private static final int LIMIT = 10000;
 
-    public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger) {
+
+    private AlertClientService alertClientService;
+
+    public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) {
         super(taskExecutionContext, logger);
 
         this.taskExecutionContext = taskExecutionContext;
@@ -111,7 +109,7 @@ public class SqlTask extends AbstractTask {
             throw new RuntimeException("sql task params is not valid");
         }
 
-        this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
+        this.alertClientService = alertClientService;
     }
 
     @Override
@@ -291,9 +289,7 @@ public class SqlTask extends AbstractTask {
         String result = JSONUtils.toJsonString(resultJSONArray);
         logger.debug("execute sql : {}", result);
 
-        sendAttachment(StringUtils.isNotEmpty(sqlParameters.getTitle())
-                        ? sqlParameters.getTitle() : taskExecutionContext.getTaskName()
-                        + " query result sets",
+        sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets",
                 JSONUtils.toJsonString(resultJSONArray));
     }
 
@@ -444,48 +440,11 @@ public class SqlTask extends AbstractTask {
      * @param title   title
      * @param content content
      */
-    public void sendAttachment(String title, String content) {
-
-        List<User> users = alertDao.queryUserByAlertGroupId(taskExecutionContext.getSqlTaskExecutionContext().getWarningGroupId());
-
-        // receiving group list
-        List<String> receiversList = new ArrayList<>();
-        for (User user : users) {
-            receiversList.add(user.getEmail().trim());
+    public void sendAttachment(int groupId, String title, String content) {
+        AlertSendResponseCommand alertSendResponseCommand  = alertClientService.sendAlert(groupId, title, content);
+        if (!alertSendResponseCommand.getResStatus()) {
+            throw new RuntimeException("send mail failed!");
         }
-        // custom receiver
-        String receivers = sqlParameters.getReceivers();
-        if (StringUtils.isNotEmpty(receivers)) {
-            String[] splits = receivers.split(COMMA);
-            for (String receiver : splits) {
-                receiversList.add(receiver.trim());
-            }
-        }
-
-        // copy list
-        List<String> receiversCcList = new ArrayList<>();
-        // Custom Copier
-        String receiversCc = sqlParameters.getReceiversCc();
-        if (StringUtils.isNotEmpty(receiversCc)) {
-            String[] splits = receiversCc.split(COMMA);
-            for (String receiverCc : splits) {
-                receiversCcList.add(receiverCc.trim());
-            }
-        }
-
-        String showTypeName = sqlParameters.getShowType().replace(COMMA, "").trim();
-        /*
-        if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
-            Map<String, Object> mailResult = MailUtils.sendMails(receviersList,
-                    receviersCcList, title, content, ShowType.valueOf(showTypeName).getDescp());
-            if(!(boolean) mailResult.get(STATUS)){
-                throw new RuntimeException("send mail failed!");
-            }
-        //TODO AlertServer should provide a grpc interface, which is called when other services need to send alerts
-        }else{
-            logger.error("showType: {} is not valid "  ,showTypeName);
-            throw new RuntimeException(String.format("showType: %s is not valid ",showTypeName));
-        }*/
     }
 
     /**
diff --git a/dolphinscheduler-server/src/main/resources/worker.properties b/dolphinscheduler-server/src/main/resources/worker.properties
index 9fba30c..cea1b4e 100644
--- a/dolphinscheduler-server/src/main/resources/worker.properties
+++ b/dolphinscheduler-server/src/main/resources/worker.properties
@@ -35,3 +35,6 @@
 
 # default worker weight
 #work.weight=100
+
+# alert server listener host
+alert.listen.host=localhost
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
index 2e7e531..27c10db 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContext
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.TaskManager;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import java.util.Date;
@@ -64,6 +65,8 @@ public class TaskExecuteThreadTest {
 
     private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
 
+    private AlertClientService alertClientService;
+
     @Before
     public void before() {
         // init task execution context, logger
@@ -100,8 +103,10 @@ public class TaskExecuteThreadTest {
         PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
                 .thenReturn(taskExecutionContextCacheManager);
 
+        alertClientService = PowerMockito.mock(AlertClientService.class);
+
         PowerMockito.mockStatic(TaskManager.class);
-        PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger))
+        PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService))
                 .thenReturn(new SimpleTask(taskExecutionContext, taskLogger));
 
         PowerMockito.mockStatic(JSONUtils.class);
@@ -117,7 +122,7 @@ public class TaskExecuteThreadTest {
         taskExecutionContext.setTaskType("SQL");
         taskExecutionContext.setStartTime(new Date());
         taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
-        TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger);
+        TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
         taskExecuteThread.run();
 
         Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus());
@@ -129,7 +134,7 @@ public class TaskExecuteThreadTest {
         taskExecutionContext.setStartTime(null);
         taskExecutionContext.setDelayTime(1);
         taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
-        TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger);
+        TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
         taskExecuteThread.run();
 
         Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus());
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
index eb03839..6acfd18 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.task;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
 import java.util.Date;
@@ -46,6 +47,8 @@ public class TaskManagerTest {
 
     private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
 
+    private AlertClientService alertClientService;
+
     @Before
     public void before() {
         // init task execution context, logger
@@ -74,41 +77,43 @@ public class TaskManagerTest {
         PowerMockito.mockStatic(SpringApplicationContext.class);
         PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
                 .thenReturn(taskExecutionContextCacheManager);
+
+        alertClientService = PowerMockito.mock(AlertClientService.class);
     }
 
     @Test
     public void testNewTask() {
 
         taskExecutionContext.setTaskType("SHELL");
-        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
         taskExecutionContext.setTaskType("WATERDROP");
-        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
         taskExecutionContext.setTaskType("HTTP");
-        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
         taskExecutionContext.setTaskType("MR");
-        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
         taskExecutionContext.setTaskType("SPARK");
-        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
         taskExecutionContext.setTaskType("FLINK");
-        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
         taskExecutionContext.setTaskType("PYTHON");
-        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
         taskExecutionContext.setTaskType("DATAX");
-        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
         taskExecutionContext.setTaskType("SQOOP");
-        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
 
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testNewTaskIsNull() {
         taskExecutionContext.setTaskType(null);
-        TaskManager.newTask(taskExecutionContext,taskLogger);
+        TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testNewTaskIsNotExists() {
         taskExecutionContext.setTaskType("XXX");
-        TaskManager.newTask(taskExecutionContext,taskLogger);
+        TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
     }
 }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
new file mode 100644
index 0000000..64db568
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.task.sql;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.Date;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  sql task test
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(value = {SqlTask.class, DriverManager.class})
+public class SqlTaskTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(SqlTaskTest.class);
+
+    private static final String CONNECTION_PARAMS = "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\","
+            + "\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}";
+
+    private SqlTask sqlTask;
+
+    private TaskExecutionContext taskExecutionContext;
+
+    private AlertClientService alertClientService;
+    @Before
+    public void before() throws Exception {
+        taskExecutionContext = new TaskExecutionContext();
+
+        TaskProps props = new TaskProps();
+        props.setExecutePath("/tmp");
+        props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
+        props.setTaskInstanceId(1);
+        props.setTenantCode("1");
+        props.setEnvFile(".dolphinscheduler_env.sh");
+        props.setTaskStartTime(new Date());
+        props.setTaskTimeout(0);
+        props.setTaskParams(
+                "{\"localParams\":[],\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\",\"sqlType\":1}");
+
+        taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
+        PowerMockito.when(taskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams());
+        PowerMockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
+        PowerMockito.when(taskExecutionContext.getTaskAppId()).thenReturn("1");
+        PowerMockito.when(taskExecutionContext.getTenantCode()).thenReturn("root");
+        PowerMockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date());
+        PowerMockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000);
+        PowerMockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx");
+
+        SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
+        sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS);
+        PowerMockito.when(taskExecutionContext.getSqlTaskExecutionContext()).thenReturn(sqlTaskExecutionContext);
+
+        alertClientService = PowerMockito.mock(AlertClientService.class);
+        sqlTask = new SqlTask(taskExecutionContext, logger, alertClientService);
+        sqlTask.init();
+    }
+
+    @Test
+    public void testGetParameters() {
+        Assert.assertNotNull(sqlTask.getParameters());
+    }
+
+    @Test(expected = Exception.class)
+    public void testHandle() throws Exception {
+        Connection connection = PowerMockito.mock(Connection.class);
+        PowerMockito.mockStatic(DriverManager.class);
+        PowerMockito.when(DriverManager.getConnection(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(connection);
+        PreparedStatement preparedStatement = PowerMockito.mock(PreparedStatement.class);
+        PowerMockito.when(connection.prepareStatement(Mockito.any())).thenReturn(preparedStatement);
+        PowerMockito.mockStatic(ParameterUtils.class);
+        PowerMockito.when(ParameterUtils.replaceScheduleTime(Mockito.any(), Mockito.any())).thenReturn("insert into tb_1 values('1','2')");
+
+        sqlTask.handle();
+        Assert.assertEquals(Constants.EXIT_CODE_SUCCESS,sqlTask.getExitStatusCode());
+    }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
index 7839b4a..49977fa 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
@@ -38,6 +38,10 @@ public class AlertClientService {
 
     private volatile boolean isRunning;
 
+    private String host;
+
+    private int port;
+
     /**
      * request time out
      */
@@ -53,6 +57,17 @@ public class AlertClientService {
     }
 
     /**
+     * alert client
+     */
+    public AlertClientService(String host, int port) {
+        this.clientConfig = new NettyClientConfig();
+        this.client = new NettyRemotingClient(clientConfig);
+        this.isRunning = true;
+        this.host = host;
+        this.port = port;
+    }
+
+    /**
      * close
      */
     public void close() {
@@ -63,6 +78,17 @@ public class AlertClientService {
 
     /**
      * alert sync send data
+     * @param groupId
+     * @param title
+     * @param content
+     * @return
+     */
+    public AlertSendResponseCommand sendAlert(int groupId, String title,  String content) {
+        return this.sendAlert(this.host,this.port,groupId,title,content);
+    }
+
+    /**
+     * alert sync send data
      * @param host host
      * @param port port
      * @param groupId groupId
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue
index 94ddaf8..8e892ab 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue
@@ -35,12 +35,14 @@
                   :sql-type="sqlType">
           </m-sql-type>
         </div>
+        <!--
         <div v-if="sqlType==0" style="display: inline-block;padding-left: 10px;margin-top: 2px;">
           <x-checkbox-group v-model="showType">
             <x-checkbox :label="'TABLE'" :disabled="isDetails">{{$t('TableMode')}}</x-checkbox>
             <x-checkbox :label="'ATTACHMENT'" :disabled="isDetails">{{$t('Attachment')}}</x-checkbox>
           </x-checkbox-group>
         </div>
+        -->
       </div>
     </m-list-box>
     <template v-if="sqlType==0">
@@ -50,21 +52,21 @@
           <x-input
             type="input"
             v-model="title"
-            :placeholder="$t('Please enter the title of email')"
+            :placeholder="$t('Please enter the title of alert')"
             autocomplete="off">
           </x-input>
         </div>
       </m-list-box>
       <m-list-box>
-        <div slot="text"><strong class='requiredIcon'>*</strong>{{$t('Recipient')}}</div>
+        <!-- TODO Wait for the alarm group/instance page to be developed and add specific content -->
+        <div slot="text"><strong class='requiredIcon'>*</strong>{{$t('AlertGroup')}}</div>
         <div slot="content">
-          <m-email ref="refEmail" v-model="receivers" :disabled="isDetails" :repeat-data="receiversCc"></m-email>
-        </div>
-      </m-list-box>
-      <m-list-box>
-        <div slot="text">{{$t('Cc')}}</div>
-        <div slot="content">
-          <m-email ref="refCc" v-model="receiversCc" :disabled="isDetails" :repeat-data="receivers"></m-email>
+          <x-input
+            type="input"
+            v-model="groupId"
+            :placeholder="$t('Please select the alert group')"
+            autocomplete="off">
+          </x-input>
         </div>
       </m-list-box>
     </template>
@@ -174,6 +176,8 @@
         sqlType: '0',
         // Email title
         title: '',
+        // Alert groupId
+        groupId: '',
         // Form/attachment
         showType: ['TABLE'],
         // Sql parameter
@@ -181,11 +185,7 @@
         // Pre statements
         preStatements: [],
         // Post statements
-        postStatements: [],
-        // recipients
-        receivers: [],
-        // copy to
-        receiversCc: []
+        postStatements: []
       }
     },
     mixins: [disabledState],
@@ -275,24 +275,18 @@
         if (!this.$refs.refDs._verifDatasource()) {
           return false
         }
+        /*
         if (this.sqlType==0 && !this.showType.length) {
           this.$message.warning(`${i18n.$t('One form or attachment must be selected')}`)
           return false
         }
+         */
         if (this.sqlType==0 && !this.title) {
-          this.$message.warning(`${i18n.$t('Mail subject required')}`)
-          return false
-        }
-        if (this.sqlType==0 && !this.receivers.length) {
-          this.$message.warning(`${i18n.$t('Recipient required')}`)
+          this.$message.warning(`${i18n.$t('Please enter the title of alert')}`)
           return false
         }
-        // receivers Subcomponent verification
-        if (this.sqlType==0 && !this.$refs.refEmail._manualEmail()) {
-          return false
-        }
-        // receiversCc Subcomponent verification
-        if (this.sqlType==0 && !this.$refs.refCc._manualEmail()) {
+        if (this.sqlType==0 && !this.groupId) {
+          this.$message.warning(`${i18n.$t('Please select the alert group')}`)
           return false
         }
         // udfs Subcomponent verification Verification only if the data type is HIVE
@@ -325,8 +319,7 @@
           udfs: this.udfs,
           sqlType: this.sqlType,
           title: this.title,
-          receivers: this.receivers.join(','),
-          receiversCc: this.receiversCc.join(','),
+          groupId: this.groupId,
           showType: (() => {
             /**
              * Special processing return order TABLE,ATTACHMENT
@@ -387,10 +380,7 @@
         } else {
           param.processInstanceId = current.params.id
         }
-        this.store.dispatch('dag/getReceiver', param).then(res => {
-          this.receivers = res.receivers && res.receivers.split(',') || []
-          this.receiversCc = res.receiversCc && res.receiversCc.split(',') || []
-        })
+
       },
       _cacheParams () {
         this.$emit('on-cache-params', {
@@ -400,8 +390,7 @@
           udfs: this.udfs,
           sqlType: this.sqlType,
           title: this.title,
-          receivers: this.receivers.join(','),
-          receiversCc: this.receiversCc.join(','),
+          groupId: this.groupId,
           showType: (() => {
 
             let showType = this.showType
@@ -433,8 +422,7 @@
         }
         if (val != 0) {
           this.title = ''
-          this.receivers = []
-          this.receiversCc = []
+          this.groupId = ''
         }
       },
       // Listening data source
@@ -469,8 +457,7 @@
         this.preStatements = o.params.preStatements || []
         this.postStatements = o.params.postStatements || []
         this.title = o.params.title || ''
-        this.receivers = o.params.receivers && o.params.receivers.split(',') || []
-        this.receiversCc = o.params.receiversCc && o.params.receiversCc.split(',') || []
+        this.groupId = o.params.groupId || ''
       }
       // read tasks from cache
       if (!_.some(this.store.state.dag.cacheTasks, { id: this.createNodeId }) &&
@@ -501,8 +488,7 @@
           udfs: this.udfs,
           sqlType: this.sqlType,
           title: this.title,
-          receivers: this.receivers.join(','),
-          receiversCc: this.receiversCc.join(','),
+          groupId: this.groupId,
           showType: (() => {
 
             let showType = this.showType
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
index f1295da..00d6757 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
@@ -122,6 +122,9 @@ export default {
   'SQL Type': 'SQL Type',
   Title: 'Title',
   'Please enter the title of email': 'Please enter the title of email',
+  'Please enter the title of alert': 'Please enter the title of alert',
+  'AlertGroup': 'AlertGroup',
+  'Please select the alert group': 'Please select the alert group',
   Table: 'Table',
   TableMode: 'Table',
   Attachment: 'Attachment',
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
index 148bc33..e936400 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
@@ -122,6 +122,9 @@ export default {
   'SQL Type': 'sql类型',
   Title: '主题',
   'Please enter the title of email': '请输入邮件主题',
+  'Please enter the title of alert': '请输入告警主题',
+  'AlertGroup': '告警组',
+  'Please select the alert group': '请选择告警组',
   Table: '表名',
   TableMode: '表格',
   Attachment: '附件',
diff --git a/install.sh b/install.sh
index da21085..7991275 100755
--- a/install.sh
+++ b/install.sh
@@ -64,6 +64,8 @@ sed -i ${txt} "s#mail.smtp.starttls.enable.*#mail.smtp.starttls.enable=${starttl
 sed -i ${txt} "s#mail.smtp.ssl.trust.*#mail.smtp.ssl.trust=${sslTrust}#g" conf/alert.properties
 sed -i ${txt} "s#mail.smtp.ssl.enable.*#mail.smtp.ssl.enable=${sslEnable}#g" conf/alert.properties
 
+sed -i ${txt} "s#alert.listen.host.*#alert.listen.host=${alertServer}#g" conf/worker.properties
+
 # 2.create directory
 echo "2.create directory"
 
diff --git a/pom.xml b/pom.xml
index 8368155..5f49824 100644
--- a/pom.xml
+++ b/pom.xml
@@ -904,6 +904,7 @@
                         <include>**/server/worker/registry/WorkerRegistryTest.java</include>
                         <include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
                         <include>**/server/worker/sql/SqlExecutorTest.java</include>
+                        <include>**/server/worker/task/sql/SqlTaskTest.java</include>
                         <include>**/server/worker/task/spark/SparkTaskTest.java</include>
                         <include>**/server/worker/task/EnvFileTest.java</include>
                         <include>**/server/worker/task/spark/SparkTaskTest.java</include>
@@ -920,7 +921,6 @@
                         <include>**/service/zk/CuratorZookeeperClientTest.java</include>
                         <include>**/service/queue/TaskUpdateQueueTest.java</include>
                         <include>**/service/alert/AlertClientServiceTest.java</include>
-
                         <include>**/dao/mapper/DataSourceUserMapperTest.java</include>
                         <!--<iTaskUpdateQueueConsumerThreadnclude>**/dao/mapper/ErrorCommandMapperTest.java</iTaskUpdateQueueConsumerThreadnclude>-->
                         <include>**/dao/mapper/ProcessDefinitionMapperTest.java</include>