You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/02/18 15:27:46 UTC

[incubator-dolphinscheduler] branch dev updated: [Improvement-3369][api] Introduce executor and datasource service interface for clear code (#4759)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 15a5b05  [Improvement-3369][api] Introduce executor and datasource service interface for clear code (#4759)
15a5b05 is described below

commit 15a5b0588399bbafd201405af02d611d548fac12
Author: Shiwen Cheng <ch...@gmail.com>
AuthorDate: Thu Feb 18 23:27:37 2021 +0800

    [Improvement-3369][api] Introduce executor and datasource service interface for clear code (#4759)
---
 .../api/controller/ExecutorController.java         |   3 +-
 .../api/controller/ResourcesController.java        |   8 +-
 .../api/service/DataSourceService.java             | 542 +--------------------
 .../api/service/ExecutorService.java               | 509 +------------------
 .../DataSourceServiceImpl.java}                    |  58 +--
 .../ExecutorServiceImpl.java}                      |  24 +-
 .../api/service/DataSourceServiceTest.java         |   3 +-
 .../api/service/ExecutorService2Test.java          |   3 +-
 .../api/service/ExecutorServiceTest.java           |   3 +-
 .../service/process/ProcessService.java            |   4 +-
 10 files changed, 77 insertions(+), 1080 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index c1e75a7..abca855 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -35,7 +35,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.User;
 
-import java.text.ParseException;
 import java.util.Map;
 
 import org.slf4j.Logger;
@@ -121,7 +120,7 @@ public class ExecutorController extends BaseController {
                                        @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
                                        @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
                                        @RequestParam(value = "timeout", required = false) Integer timeout,
-                                       @RequestParam(value = "startParams", required = false) String startParams) throws ParseException {
+                                       @RequestParam(value = "startParams", required = false) String startParams) {
         logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {},  "
                         + "failure policy: {}, node name: {}, node dep: {}, notify type: {}, "
                         + "notify group id: {}, run mode: {},process instance priority:{}, workerGroup: {}, timeout: {}, startParams: {} ",
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
index 52fd023..c631c8e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
@@ -322,9 +322,7 @@ public class ResourcesController extends BaseController {
                                        @RequestParam(value = "programType",required = false) ProgramType programType
     ) {
         String programTypeName = programType == null ? "" : programType.name();
-        String userName = loginUser.getUserName();
-        userName = userName.replaceAll("[\n|\r|\t]", "_");
-        logger.info("query resource list, login user:{}, resource type:{}, program type:{}", userName,programTypeName);
+        logger.info("query resource list, resource type:{}, program type:{}", type, programTypeName);
         Map<String, Object> result = resourceService.queryResourceByProgramType(loginUser, type,programType);
         return returnDataList(result);
     }
@@ -641,9 +639,7 @@ public class ResourcesController extends BaseController {
     @ApiException(QUERY_DATASOURCE_BY_TYPE_ERROR)
     public Result<Object> queryUdfFuncList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                                     @RequestParam("type") UdfType type) {
-        String userName = loginUser.getUserName();
-        userName = userName.replaceAll("[\n|\r|\t]", "_");
-        logger.info("query udf func list, user:{}, type:{}", userName, type);
+        logger.info("query udf func list, type:{}", type);
         Map<String, Object> result = udfFuncService.queryUdfFuncList(loginUser, type.ordinal());
         return returnDataList(result);
     }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
index 2ca9cbe..8d2a023 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
@@ -17,67 +17,17 @@
 
 package org.apache.dolphinscheduler.api.service;
 
-import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
-import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.DbConnectType;
 import org.apache.dolphinscheduler.common.enums.DbType;
-import org.apache.dolphinscheduler.common.utils.CommonUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
-import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
-import org.apache.dolphinscheduler.dao.datasource.OracleDataSource;
-import org.apache.dolphinscheduler.dao.entity.DataSource;
-import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
-import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
 
-import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
  * datasource service
  */
-@Service
-public class DataSourceService extends BaseService {
-
-    private static final Logger logger = LoggerFactory.getLogger(DataSourceService.class);
-
-    public static final String NAME = "name";
-    public static final String NOTE = "note";
-    public static final String TYPE = "type";
-    public static final String HOST = "host";
-    public static final String PORT = "port";
-    public static final String PRINCIPAL = "principal";
-    public static final String DATABASE = "database";
-    public static final String USER_NAME = "userName";
-    public static final String OTHER = "other";
-
-    @Autowired
-    private DataSourceMapper dataSourceMapper;
-
-    @Autowired
-    private DataSourceUserMapper datasourceUserMapper;
+public interface DataSourceService {
 
     /**
      * create data source
@@ -89,37 +39,7 @@ public class DataSourceService extends BaseService {
      * @param parameter datasource parameters
      * @return create result code
      */
-    public Result<Object> createDataSource(User loginUser, String name, String desc, DbType type, String parameter) {
-
-        Result<Object> result = new Result<>();
-        // check name can use or not
-        if (checkName(name)) {
-            putMsg(result, Status.DATASOURCE_EXIST);
-            return result;
-        }
-        Result<Object> isConnection = checkConnection(type, parameter);
-        if (Status.SUCCESS.getCode() != isConnection.getCode()) {
-            return result;
-        }
-
-        // build datasource
-        DataSource dataSource = new DataSource();
-        Date now = new Date();
-
-        dataSource.setName(name.trim());
-        dataSource.setNote(desc);
-        dataSource.setUserId(loginUser.getId());
-        dataSource.setUserName(loginUser.getUserName());
-        dataSource.setType(type);
-        dataSource.setConnectionParams(parameter);
-        dataSource.setCreateTime(now);
-        dataSource.setUpdateTime(now);
-        dataSourceMapper.insert(dataSource);
-
-        putMsg(result, Status.SUCCESS);
-
-        return result;
-    }
+    Result<Object> createDataSource(User loginUser, String name, String desc, DbType type, String parameter);
 
     /**
      * updateProcessInstance datasource
@@ -132,59 +52,7 @@ public class DataSourceService extends BaseService {
      * @param id        data source id
      * @return update result code
      */
-    public Result<Object> updateDataSource(int id, User loginUser, String name, String desc, DbType type, String parameter) {
-
-        Result<Object> result = new Result<>();
-        // determine whether the data source exists
-        DataSource dataSource = dataSourceMapper.selectById(id);
-        if (dataSource == null) {
-            putMsg(result, Status.RESOURCE_NOT_EXIST);
-            return result;
-        }
-
-        if (!hasPerm(loginUser, dataSource.getUserId())) {
-            putMsg(result, Status.USER_NO_OPERATION_PERM);
-            return result;
-        }
-
-        //check name can use or not
-        if (!name.trim().equals(dataSource.getName()) && checkName(name)) {
-            putMsg(result, Status.DATASOURCE_EXIST);
-            return result;
-        }
-        //check password,if the password is not updated, set to the old password.
-        ObjectNode paramObject = JSONUtils.parseObject(parameter);
-        String password = paramObject.path(Constants.PASSWORD).asText();
-        if (StringUtils.isBlank(password)) {
-            String oldConnectionParams = dataSource.getConnectionParams();
-            ObjectNode oldParams = JSONUtils.parseObject(oldConnectionParams);
-            paramObject.put(Constants.PASSWORD, oldParams.path(Constants.PASSWORD).asText());
-        }
-        // connectionParams json
-        String connectionParams = paramObject.toString();
-
-        Result<Object> isConnection = checkConnection(type, parameter);
-        if (Status.SUCCESS.getCode() != isConnection.getCode()) {
-            return result;
-        }
-
-        Date now = new Date();
-
-        dataSource.setName(name.trim());
-        dataSource.setNote(desc);
-        dataSource.setUserName(loginUser.getUserName());
-        dataSource.setType(type);
-        dataSource.setConnectionParams(connectionParams);
-        dataSource.setUpdateTime(now);
-        dataSourceMapper.updateById(dataSource);
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
-
-    private boolean checkName(String name) {
-        List<DataSource> queryDataSource = dataSourceMapper.queryDataSourceByName(name.trim());
-        return queryDataSource != null && queryDataSource.size() > 0;
-    }
+    Result<Object> updateDataSource(int id, User loginUser, String name, String desc, DbType type, String parameter);
 
     /**
      * updateProcessInstance datasource
@@ -192,91 +60,7 @@ public class DataSourceService extends BaseService {
      * @param id datasource id
      * @return data source detail
      */
-    public Map<String, Object> queryDataSource(int id) {
-
-        Map<String, Object> result = new HashMap<String, Object>(5);
-        DataSource dataSource = dataSourceMapper.selectById(id);
-        if (dataSource == null) {
-            putMsg(result, Status.RESOURCE_NOT_EXIST);
-            return result;
-        }
-        // type
-        String dataSourceType = dataSource.getType().toString();
-        // name
-        String dataSourceName = dataSource.getName();
-        // desc
-        String desc = dataSource.getNote();
-        // parameter
-        String parameter = dataSource.getConnectionParams();
-
-        BaseDataSource datasourceForm = DataSourceFactory.getDatasource(dataSource.getType(), parameter);
-        DbConnectType connectType = null;
-        String hostSeperator = Constants.DOUBLE_SLASH;
-        if (DbType.ORACLE.equals(dataSource.getType())) {
-            connectType = ((OracleDataSource) datasourceForm).getConnectType();
-            if (DbConnectType.ORACLE_SID.equals(connectType)) {
-                hostSeperator = Constants.AT_SIGN;
-            }
-        }
-        String database = datasourceForm.getDatabase();
-        // jdbc connection params
-        String other = datasourceForm.getOther();
-        String address = datasourceForm.getAddress();
-
-        String[] hostsPorts = getHostsAndPort(address, hostSeperator);
-        // ip host
-        String host = hostsPorts[0];
-        // prot
-        String port = hostsPorts[1];
-        String separator = "";
-
-        switch (dataSource.getType()) {
-            case HIVE:
-            case SQLSERVER:
-                separator = ";";
-                break;
-            case MYSQL:
-            case POSTGRESQL:
-            case CLICKHOUSE:
-            case ORACLE:
-            case PRESTO:
-                separator = "&";
-                break;
-            default:
-                separator = "&";
-                break;
-        }
-
-        Map<String, String> otherMap = new LinkedHashMap<String, String>();
-        if (other != null) {
-            String[] configs = other.split(separator);
-            for (String config : configs) {
-                otherMap.put(config.split("=")[0], config.split("=")[1]);
-            }
-
-        }
-
-        Map<String, Object> map = new HashMap<>(10);
-        map.put(NAME, dataSourceName);
-        map.put(NOTE, desc);
-        map.put(TYPE, dataSourceType);
-        if (connectType != null) {
-            map.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType);
-        }
-
-        map.put(HOST, host);
-        map.put(PORT, port);
-        map.put(PRINCIPAL, datasourceForm.getPrincipal());
-        map.put(Constants.KERBEROS_KRB5_CONF_PATH, datasourceForm.getJavaSecurityKrb5Conf());
-        map.put(Constants.KERBEROS_KEY_TAB_USERNAME, datasourceForm.getLoginUserKeytabUsername());
-        map.put(Constants.KERBEROS_KEY_TAB_PATH, datasourceForm.getLoginUserKeytabPath());
-        map.put(DATABASE, database);
-        map.put(USER_NAME, datasourceForm.getUser());
-        map.put(OTHER, otherMap);
-        result.put(Constants.DATA_LIST, map);
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
+    Map<String, Object> queryDataSource(int id);
 
     /**
      * query datasource list by keyword
@@ -287,44 +71,7 @@ public class DataSourceService extends BaseService {
      * @param pageSize  page size
      * @return data source list page
      */
-    public Map<String, Object> queryDataSourceListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
-        Map<String, Object> result = new HashMap<>();
-        IPage<DataSource> dataSourceList = null;
-        Page<DataSource> dataSourcePage = new Page(pageNo, pageSize);
-
-        if (isAdmin(loginUser)) {
-            dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, 0, searchVal);
-        } else {
-            dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, loginUser.getId(), searchVal);
-        }
-
-        List<DataSource> dataSources = dataSourceList != null ? dataSourceList.getRecords() : new ArrayList<>();
-        handlePasswd(dataSources);
-        PageInfo pageInfo = new PageInfo<Resource>(pageNo, pageSize);
-        pageInfo.setTotalCount((int) (dataSourceList != null ? dataSourceList.getTotal() : 0L));
-        pageInfo.setLists(dataSources);
-        result.put(Constants.DATA_LIST, pageInfo);
-        putMsg(result, Status.SUCCESS);
-
-        return result;
-    }
-
-    /**
-     * handle datasource connection password for safety
-     *
-     * @param dataSourceList
-     */
-    private void handlePasswd(List<DataSource> dataSourceList) {
-
-        for (DataSource dataSource : dataSourceList) {
-
-            String connectionParams = dataSource.getConnectionParams();
-            ObjectNode object = JSONUtils.parseObject(connectionParams);
-            object.put(Constants.PASSWORD, Constants.XXXXXX);
-            dataSource.setConnectionParams(object.toString());
-
-        }
-    }
+    Map<String, Object> queryDataSourceListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize);
 
     /**
      * query data resource list
@@ -333,22 +80,7 @@ public class DataSourceService extends BaseService {
      * @param type      data source type
      * @return data source list page
      */
-    public Map<String, Object> queryDataSourceList(User loginUser, Integer type) {
-        Map<String, Object> result = new HashMap<>();
-
-        List<DataSource> datasourceList;
-
-        if (isAdmin(loginUser)) {
-            datasourceList = dataSourceMapper.listAllDataSourceByType(type);
-        } else {
-            datasourceList = dataSourceMapper.queryDataSourceByType(loginUser.getId(), type);
-        }
-
-        result.put(Constants.DATA_LIST, datasourceList);
-        putMsg(result, Status.SUCCESS);
-
-        return result;
-    }
+    Map<String, Object> queryDataSourceList(User loginUser, Integer type);
 
     /**
      * verify datasource exists
@@ -356,18 +88,7 @@ public class DataSourceService extends BaseService {
      * @param name      datasource name
      * @return true if data datasource not exists, otherwise return false
      */
-    public Result<Object> verifyDataSourceName(String name) {
-        Result<Object> result = new Result<>();
-        List<DataSource> dataSourceList = dataSourceMapper.queryDataSourceByName(name);
-        if (dataSourceList != null && dataSourceList.size() > 0) {
-            logger.error("datasource name:{} has exist, can't create again.", name);
-            putMsg(result, Status.DATASOURCE_EXIST);
-        } else {
-            putMsg(result, Status.SUCCESS);
-        }
-
-        return result;
-    }
+    Result<Object> verifyDataSourceName(String name);
 
     /**
      * check connection
@@ -376,25 +97,7 @@ public class DataSourceService extends BaseService {
      * @param parameter data source parameters
      * @return true if connect successfully, otherwise false
      */
-    public Result<Object> checkConnection(DbType type, String parameter) {
-        Result<Object> result = new Result<>();
-        BaseDataSource datasource = DataSourceFactory.getDatasource(type, parameter);
-        if (datasource == null) {
-            putMsg(result, Status.DATASOURCE_TYPE_NOT_EXIST, type);
-            return result;
-        }
-        try (Connection connection = datasource.getConnection()) {
-            if (connection == null) {
-                putMsg(result, Status.CONNECTION_TEST_FAILURE);
-                return result;
-            }
-            putMsg(result, Status.SUCCESS);
-            return result;
-        } catch (Exception e) {
-            logger.error("datasource test connection error, dbType:{}, jdbcUrl:{}, message:{}.", type, datasource.getJdbcUrl(), e.getMessage());
-            return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(),e.getMessage());
-        }
-    }
+    Result<Object> checkConnection(DbType type, String parameter);
 
     /**
      * test connection
@@ -402,15 +105,7 @@ public class DataSourceService extends BaseService {
      * @param id datasource id
      * @return connect result code
      */
-    public Result<Object> connectionTest(int id) {
-        DataSource dataSource = dataSourceMapper.selectById(id);
-        if (dataSource == null) {
-            Result<Object> result = new Result<>();
-            putMsg(result, Status.RESOURCE_NOT_EXIST);
-            return result;
-        }
-        return checkConnection(dataSource.getType(), dataSource.getConnectionParams());
-    }
+    Result<Object> connectionTest(int id);
 
     /**
      * build paramters
@@ -425,116 +120,10 @@ public class DataSourceService extends BaseService {
      * @param principal principal
      * @return datasource parameter
      */
-    public String buildParameter(DbType type, String host,
-                                 String port, String database, String principal, String userName,
-                                 String password, DbConnectType connectType, String other,
-                                 String javaSecurityKrb5Conf, String loginUserKeytabUsername, String loginUserKeytabPath) {
-
-        String address = buildAddress(type, host, port, connectType);
-        Map<String, Object> parameterMap = new LinkedHashMap<String, Object>(6);
-        String jdbcUrl;
-        if (DbType.SQLSERVER == type) {
-            jdbcUrl = address + ";databaseName=" + database;
-        } else {
-            jdbcUrl = address + "/" + database;
-        }
-
-        if (Constants.ORACLE.equals(type.name())) {
-            parameterMap.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType);
-        }
-
-        if (CommonUtils.getKerberosStartupState()
-                && (type == DbType.HIVE || type == DbType.SPARK)) {
-            jdbcUrl += ";principal=" + principal;
-        }
-
-        String separator = "";
-        if (Constants.MYSQL.equals(type.name())
-                || Constants.POSTGRESQL.equals(type.name())
-                || Constants.CLICKHOUSE.equals(type.name())
-                || Constants.ORACLE.equals(type.name())
-                || Constants.PRESTO.equals(type.name())) {
-            separator = "&";
-        } else if (Constants.HIVE.equals(type.name())
-                || Constants.SPARK.equals(type.name())
-                || Constants.DB2.equals(type.name())
-                || Constants.SQLSERVER.equals(type.name())) {
-            separator = ";";
-        }
-
-        parameterMap.put(TYPE, connectType);
-        parameterMap.put(Constants.ADDRESS, address);
-        parameterMap.put(Constants.DATABASE, database);
-        parameterMap.put(Constants.JDBC_URL, jdbcUrl);
-        parameterMap.put(Constants.USER, userName);
-        parameterMap.put(Constants.PASSWORD, CommonUtils.encodePassword(password));
-        if (CommonUtils.getKerberosStartupState()
-                && (type == DbType.HIVE || type == DbType.SPARK)) {
-            parameterMap.put(Constants.PRINCIPAL, principal);
-            parameterMap.put(Constants.KERBEROS_KRB5_CONF_PATH, javaSecurityKrb5Conf);
-            parameterMap.put(Constants.KERBEROS_KEY_TAB_USERNAME, loginUserKeytabUsername);
-            parameterMap.put(Constants.KERBEROS_KEY_TAB_PATH, loginUserKeytabPath);
-        }
-
-        Map<String, String> map = JSONUtils.toMap(other);
-        if (map != null) {
-            StringBuilder otherSb = new StringBuilder();
-            for (Map.Entry<String, String> entry: map.entrySet()) {
-                otherSb.append(String.format("%s=%s%s", entry.getKey(), entry.getValue(), separator));
-            }
-            if (!Constants.DB2.equals(type.name())) {
-                otherSb.deleteCharAt(otherSb.length() - 1);
-            }
-            parameterMap.put(Constants.OTHER, otherSb);
-        }
-
-        if (logger.isDebugEnabled()) {
-            logger.info("parameters map:{}", JSONUtils.toJsonString(parameterMap));
-        }
-        return JSONUtils.toJsonString(parameterMap);
-
-    }
-
-    private String buildAddress(DbType type, String host, String port, DbConnectType connectType) {
-        StringBuilder sb = new StringBuilder();
-        if (Constants.MYSQL.equals(type.name())) {
-            sb.append(Constants.JDBC_MYSQL);
-            sb.append(host).append(":").append(port);
-        } else if (Constants.POSTGRESQL.equals(type.name())) {
-            sb.append(Constants.JDBC_POSTGRESQL);
-            sb.append(host).append(":").append(port);
-        } else if (Constants.HIVE.equals(type.name()) || Constants.SPARK.equals(type.name())) {
-            sb.append(Constants.JDBC_HIVE_2);
-            String[] hostArray = host.split(",");
-            if (hostArray.length > 0) {
-                for (String zkHost : hostArray) {
-                    sb.append(String.format("%s:%s,", zkHost, port));
-                }
-                sb.deleteCharAt(sb.length() - 1);
-            }
-        } else if (Constants.CLICKHOUSE.equals(type.name())) {
-            sb.append(Constants.JDBC_CLICKHOUSE);
-            sb.append(host).append(":").append(port);
-        } else if (Constants.ORACLE.equals(type.name())) {
-            if (connectType == DbConnectType.ORACLE_SID) {
-                sb.append(Constants.JDBC_ORACLE_SID);
-            } else {
-                sb.append(Constants.JDBC_ORACLE_SERVICE_NAME);
-            }
-            sb.append(host).append(":").append(port);
-        } else if (Constants.SQLSERVER.equals(type.name())) {
-            sb.append(Constants.JDBC_SQLSERVER);
-            sb.append(host).append(":").append(port);
-        } else if (Constants.DB2.equals(type.name())) {
-            sb.append(Constants.JDBC_DB2);
-            sb.append(host).append(":").append(port);
-        } else if (Constants.PRESTO.equals(type.name())) {
-            sb.append(Constants.JDBC_PRESTO);
-            sb.append(host).append(":").append(port);
-        }
-
-        return sb.toString();
-    }
+    String buildParameter(DbType type, String host,
+                          String port, String database, String principal, String userName,
+                          String password, DbConnectType connectType, String other,
+                          String javaSecurityKrb5Conf, String loginUserKeytabUsername, String loginUserKeytabPath);
 
     /**
      * delete datasource
@@ -543,30 +132,7 @@ public class DataSourceService extends BaseService {
      * @param datasourceId data source id
      * @return delete result code
      */
-    @Transactional(rollbackFor = RuntimeException.class)
-    public Result<Object> delete(User loginUser, int datasourceId) {
-        Result<Object> result = new Result<>();
-        try {
-            //query datasource by id
-            DataSource dataSource = dataSourceMapper.selectById(datasourceId);
-            if (dataSource == null) {
-                logger.error("resource id {} not exist", datasourceId);
-                putMsg(result, Status.RESOURCE_NOT_EXIST);
-                return result;
-            }
-            if (!hasPerm(loginUser, dataSource.getUserId())) {
-                putMsg(result, Status.USER_NO_OPERATION_PERM);
-                return result;
-            }
-            dataSourceMapper.deleteById(datasourceId);
-            datasourceUserMapper.deleteByDatasourceId(datasourceId);
-            putMsg(result, Status.SUCCESS);
-        } catch (Exception e) {
-            logger.error("delete datasource error", e);
-            throw new RuntimeException("delete datasource error");
-        }
-        return result;
-    }
+    Result<Object> delete(User loginUser, int datasourceId);
 
     /**
      * unauthorized datasource
@@ -575,38 +141,7 @@ public class DataSourceService extends BaseService {
      * @param userId    user id
      * @return unauthed data source result code
      */
-    public Map<String, Object> unauthDatasource(User loginUser, Integer userId) {
-
-        Map<String, Object> result = new HashMap<>();
-        //only admin operate
-        if (!isAdmin(loginUser)) {
-            putMsg(result, Status.USER_NO_OPERATION_PERM);
-            return result;
-        }
-
-        /**
-         * query all data sources except userId
-         */
-        List<DataSource> resultList = new ArrayList<>();
-        List<DataSource> datasourceList = dataSourceMapper.queryDatasourceExceptUserId(userId);
-        Set<DataSource> datasourceSet = null;
-        if (datasourceList != null && datasourceList.size() > 0) {
-            datasourceSet = new HashSet<>(datasourceList);
-
-            List<DataSource> authedDataSourceList = dataSourceMapper.queryAuthedDatasource(userId);
-
-            Set<DataSource> authedDataSourceSet = null;
-            if (authedDataSourceList != null && authedDataSourceList.size() > 0) {
-                authedDataSourceSet = new HashSet<>(authedDataSourceList);
-                datasourceSet.removeAll(authedDataSourceSet);
-
-            }
-            resultList = new ArrayList<>(datasourceSet);
-        }
-        result.put(Constants.DATA_LIST, resultList);
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
+    Map<String, Object> unauthDatasource(User loginUser, Integer userId);
 
     /**
      * authorized datasource
@@ -615,50 +150,5 @@ public class DataSourceService extends BaseService {
      * @param userId    user id
      * @return authorized result code
      */
-    public Map<String, Object> authedDatasource(User loginUser, Integer userId) {
-        Map<String, Object> result = new HashMap<>();
-
-        if (!isAdmin(loginUser)) {
-            putMsg(result, Status.USER_NO_OPERATION_PERM);
-            return result;
-        }
-
-        List<DataSource> authedDatasourceList = dataSourceMapper.queryAuthedDatasource(userId);
-        result.put(Constants.DATA_LIST, authedDatasourceList);
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
-
-    /**
-     * get host and port by address
-     *
-     * @param address address
-     * @return sting array: [host,port]
-     */
-    private String[] getHostsAndPort(String address) {
-        return getHostsAndPort(address, Constants.DOUBLE_SLASH);
-    }
-
-    /**
-     * get host and port by address
-     *
-     * @param address   address
-     * @param separator separator
-     * @return sting array: [host,port]
-     */
-    private String[] getHostsAndPort(String address, String separator) {
-        String[] result = new String[2];
-        String[] tmpArray = address.split(separator);
-        String hostsAndPorts = tmpArray[tmpArray.length - 1];
-        StringBuilder hosts = new StringBuilder();
-        String[] hostPortArray = hostsAndPorts.split(Constants.COMMA);
-        String port = hostPortArray[0].split(Constants.COLON)[1];
-        for (String hostPort : hostPortArray) {
-            hosts.append(hostPort.split(Constants.COLON)[0]).append(Constants.COMMA);
-        }
-        hosts.deleteCharAt(hosts.length() - 1);
-        result[0] = hosts.toString();
-        result[1] = port;
-        return result;
-    }
+    Map<String, Object> authedDatasource(User loginUser, Integer userId);
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index 77be0a0..6bed979 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -17,82 +17,22 @@
 
 package org.apache.dolphinscheduler.api.service;
 
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
-
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
-import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
 import org.apache.dolphinscheduler.common.enums.Priority;
-import org.apache.dolphinscheduler.common.enums.ReleaseState;
 import org.apache.dolphinscheduler.common.enums.RunMode;
 import org.apache.dolphinscheduler.common.enums.TaskDependType;
 import org.apache.dolphinscheduler.common.enums.WarningType;
-import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.Command;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
 
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
 /**
  * executor service
  */
-@Service
-public class ExecutorService extends BaseService {
-
-    private static final Logger logger = LoggerFactory.getLogger(ExecutorService.class);
-
-    @Autowired
-    private ProjectMapper projectMapper;
-
-    @Autowired
-    private ProjectService projectService;
-
-    @Autowired
-    private ProcessDefinitionMapper processDefinitionMapper;
-
-    @Autowired
-    private MonitorService monitorService;
-
-
-    @Autowired
-    private ProcessInstanceMapper processInstanceMapper;
-
-
-    @Autowired
-    private ProcessService processService;
+public interface ExecutorService {
 
     /**
      * execute process instance
@@ -113,80 +53,14 @@ public class ExecutorService extends BaseService {
      * @param timeout timeout
      * @param startParams the global param values which pass to new process instance
      * @return execute process instance code
-     * @throws ParseException Parse Exception
-     */
-    public Map<String, Object> execProcessInstance(User loginUser, String projectName,
-                                                   int processDefinitionId, String cronTime, CommandType commandType,
-                                                   FailureStrategy failureStrategy, String startNodeList,
-                                                   TaskDependType taskDependType, WarningType warningType, int warningGroupId,
-                                                   RunMode runMode,
-                                                   Priority processInstancePriority, String workerGroup, Integer timeout,
-                                                   Map<String, String> startParams) throws ParseException {
-        Map<String, Object> result = new HashMap<>();
-        // timeout is invalid
-        if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
-            putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR);
-            return result;
-        }
-        Project project = projectMapper.queryByName(projectName);
-        Map<String, Object> checkResultAndAuth = checkResultAndAuth(loginUser, projectName, project);
-        if (checkResultAndAuth != null) {
-            return checkResultAndAuth;
-        }
-
-        // check process define release state
-        ProcessDefinition processDefinition = processDefinitionMapper.selectById(processDefinitionId);
-        result = checkProcessDefinitionValid(processDefinition, processDefinitionId);
-        if (result.get(Constants.STATUS) != Status.SUCCESS) {
-            return result;
-        }
-
-        if (!checkTenantSuitable(processDefinition)) {
-            logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
-                    processDefinition.getId(), processDefinition.getName());
-            putMsg(result, Status.TENANT_NOT_SUITABLE);
-            return result;
-        }
-
-        // check master exists
-        if (!checkMasterExists(result)) {
-            return result;
-        }
-
-        /**
-         * create command
-         */
-        int create = this.createCommand(commandType, processDefinitionId,
-                taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
-                warningGroupId, runMode, processInstancePriority, workerGroup, startParams);
-
-        if (create > 0) {
-            processDefinition.setWarningGroupId(warningGroupId);
-            processDefinitionMapper.updateById(processDefinition);
-            putMsg(result, Status.SUCCESS);
-        } else {
-            putMsg(result, Status.START_PROCESS_INSTANCE_ERROR);
-        }
-        return result;
-    }
-
-    /**
-     * check whether master exists
-     *
-     * @param result result
-     * @return master exists return true , otherwise return false
      */
-    private boolean checkMasterExists(Map<String, Object> result) {
-        // check master server exists
-        List<Server> masterServers = monitorService.getServerListFromZK(true);
-
-        // no master
-        if (masterServers.size() == 0) {
-            putMsg(result, Status.MASTER_NOT_EXISTS);
-            return false;
-        }
-        return true;
-    }
+    Map<String, Object> execProcessInstance(User loginUser, String projectName,
+                                            int processDefinitionId, String cronTime, CommandType commandType,
+                                            FailureStrategy failureStrategy, String startNodeList,
+                                            TaskDependType taskDependType, WarningType warningType, int warningGroupId,
+                                            RunMode runMode,
+                                            Priority processInstancePriority, String workerGroup, Integer timeout,
+                                            Map<String, String> startParams);
 
     /**
      * check whether the process definition can be executed
@@ -195,19 +69,7 @@ public class ExecutorService extends BaseService {
      * @param processDefineId process definition id
      * @return check result code
      */
-    public Map<String, Object> checkProcessDefinitionValid(ProcessDefinition processDefinition, int processDefineId) {
-        Map<String, Object> result = new HashMap<>();
-        if (processDefinition == null) {
-            // check process definition exists
-            putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId);
-        } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
-            // check process definition online
-            putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineId);
-        } else {
-            result.put(Constants.STATUS, Status.SUCCESS);
-        }
-        return result;
-    }
+    Map<String, Object> checkProcessDefinitionValid(ProcessDefinition processDefinition, int processDefineId);
 
     /**
      * do action to process instance:pause, stop, repeat, recover from pause, recover from stop
@@ -218,194 +80,7 @@ public class ExecutorService extends BaseService {
      * @param executeType execute type
      * @return execute result code
      */
-    public Map<String, Object> execute(User loginUser, String projectName, Integer processInstanceId, ExecuteType executeType) {
-        Map<String, Object> result = new HashMap<>();
-        Project project = projectMapper.queryByName(projectName);
-
-        Map<String, Object> checkResult = checkResultAndAuth(loginUser, projectName, project);
-        if (checkResult != null) {
-            return checkResult;
-        }
-
-        // check master exists
-        if (!checkMasterExists(result)) {
-            return result;
-        }
-
-        ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
-        if (processInstance == null) {
-            putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
-            return result;
-        }
-
-        ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
-        if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
-            result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionId());
-            if (result.get(Constants.STATUS) != Status.SUCCESS) {
-                return result;
-            }
-        }
-
-        checkResult = checkExecuteType(processInstance, executeType);
-        Status status = (Status) checkResult.get(Constants.STATUS);
-        if (status != Status.SUCCESS) {
-            return checkResult;
-        }
-        if (!checkTenantSuitable(processDefinition)) {
-            logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
-                    processDefinition.getId(), processDefinition.getName());
-            putMsg(result, Status.TENANT_NOT_SUITABLE);
-        }
-
-        switch (executeType) {
-            case REPEAT_RUNNING:
-                result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING);
-                break;
-            case RECOVER_SUSPENDED_PROCESS:
-                result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RECOVER_SUSPENDED_PROCESS);
-                break;
-            case START_FAILURE_TASK_PROCESS:
-                result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.START_FAILURE_TASK_PROCESS);
-                break;
-            case STOP:
-                if (processInstance.getState() == ExecutionStatus.READY_STOP) {
-                    putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
-                } else {
-                    result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
-                }
-                break;
-            case PAUSE:
-                if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
-                    putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
-                } else {
-                    result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
-                }
-                break;
-            default:
-                logger.error("unknown execute type : {}", executeType);
-                putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
-
-                break;
-        }
-        return result;
-    }
-
-    /**
-     * check tenant suitable
-     *
-     * @param processDefinition process definition
-     * @return true if tenant suitable, otherwise return false
-     */
-    private boolean checkTenantSuitable(ProcessDefinition processDefinition) {
-        // checkTenantExists();
-        Tenant tenant = processService.getTenantForProcess(processDefinition.getTenantId(),
-                processDefinition.getUserId());
-        return tenant != null;
-    }
-
-    /**
-     * Check the state of process instance and the type of operation match
-     *
-     * @param processInstance process instance
-     * @param executeType execute type
-     * @return check result code
-     */
-    private Map<String, Object> checkExecuteType(ProcessInstance processInstance, ExecuteType executeType) {
-
-        Map<String, Object> result = new HashMap<>();
-        ExecutionStatus executionStatus = processInstance.getState();
-        boolean checkResult = false;
-        switch (executeType) {
-            case PAUSE:
-            case STOP:
-                if (executionStatus.typeIsRunning()) {
-                    checkResult = true;
-                }
-                break;
-            case REPEAT_RUNNING:
-                if (executionStatus.typeIsFinished()) {
-                    checkResult = true;
-                }
-                break;
-            case START_FAILURE_TASK_PROCESS:
-                if (executionStatus.typeIsFailure()) {
-                    checkResult = true;
-                }
-                break;
-            case RECOVER_SUSPENDED_PROCESS:
-                if (executionStatus.typeIsPause() || executionStatus.typeIsCancel()) {
-                    checkResult = true;
-                }
-                break;
-            default:
-                break;
-        }
-        if (!checkResult) {
-            putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), executionStatus.toString(), executeType.toString());
-        } else {
-            putMsg(result, Status.SUCCESS);
-        }
-        return result;
-    }
-
-    /**
-     * prepare to update process instance command type and status
-     *
-     * @param processInstance process instance
-     * @param commandType command type
-     * @param executionStatus execute status
-     * @return update result
-     */
-    private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) {
-        Map<String, Object> result = new HashMap<>();
-
-        processInstance.setCommandType(commandType);
-        processInstance.addHistoryCmd(commandType);
-        processInstance.setState(executionStatus);
-        int update = processService.updateProcessInstance(processInstance);
-
-        // determine whether the process is normal
-        if (update > 0) {
-            putMsg(result, Status.SUCCESS);
-        } else {
-            putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
-        }
-        return result;
-    }
-
-    /**
-     * insert command, used in the implementation of the page, re run, recovery (pause / failure) execution
-     *
-     * @param loginUser login user
-     * @param instanceId instance id
-     * @param processDefinitionId process definition id
-     * @param commandType command type
-     * @return insert result code
-     */
-    private Map<String, Object> insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType) {
-        Map<String, Object> result = new HashMap<>();
-        Command command = new Command();
-        command.setCommandType(commandType);
-        command.setProcessDefinitionId(processDefinitionId);
-        command.setCommandParam(String.format("{\"%s\":%d}",
-                CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId));
-        command.setExecutorId(loginUser.getId());
-
-        if (!processService.verifyIsNeedCreateCommand(command)) {
-            putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionId);
-            return result;
-        }
-
-        int create = processService.createCommand(command);
-
-        if (create > 0) {
-            putMsg(result, Status.SUCCESS);
-        } else {
-            putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
-        }
-
-        return result;
-    }
+    Map<String, Object> execute(User loginUser, String projectName, Integer processInstanceId, ExecuteType executeType);
 
     /**
      * check if sub processes are offline before starting process definition
@@ -413,167 +88,5 @@ public class ExecutorService extends BaseService {
      * @param processDefineId process definition id
      * @return check result code
      */
-    public Map<String, Object> startCheckByProcessDefinedId(int processDefineId) {
-        Map<String, Object> result = new HashMap<>();
-
-        if (processDefineId == 0) {
-            logger.error("process definition id is null");
-            putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "process definition id");
-        }
-        List<Integer> ids = new ArrayList<>();
-        processService.recurseFindSubProcessId(processDefineId, ids);
-        Integer[] idArray = ids.toArray(new Integer[ids.size()]);
-        if (!ids.isEmpty()) {
-            List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryDefinitionListByIdList(idArray);
-            if (processDefinitionList != null) {
-                for (ProcessDefinition processDefinition : processDefinitionList) {
-                    /**
-                     * if there is no online process, exit directly
-                     */
-                    if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
-                        putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
-                        logger.info("not release process definition id: {} , name : {}",
-                                processDefinition.getId(), processDefinition.getName());
-                        return result;
-                    }
-                }
-            }
-        }
-        putMsg(result, Status.SUCCESS);
-        return result;
-    }
-
-    /**
-     * create command
-     *
-     * @param commandType commandType
-     * @param processDefineId processDefineId
-     * @param nodeDep nodeDep
-     * @param failureStrategy failureStrategy
-     * @param startNodeList startNodeList
-     * @param schedule schedule
-     * @param warningType warningType
-     * @param executorId executorId
-     * @param warningGroupId warningGroupId
-     * @param runMode runMode
-     * @param processInstancePriority processInstancePriority
-     * @param workerGroup workerGroup
-     * @return command id
-     */
-    private int createCommand(CommandType commandType, int processDefineId,
-                              TaskDependType nodeDep, FailureStrategy failureStrategy,
-                              String startNodeList, String schedule, WarningType warningType,
-                              int executorId, int warningGroupId,
-                              RunMode runMode, Priority processInstancePriority, String workerGroup,
-                              Map<String, String> startParams) throws ParseException {
-
-        /**
-         * instantiate command schedule instance
-         */
-        Command command = new Command();
-
-        Map<String, String> cmdParam = new HashMap<>();
-        if (commandType == null) {
-            command.setCommandType(CommandType.START_PROCESS);
-        } else {
-            command.setCommandType(commandType);
-        }
-        command.setProcessDefinitionId(processDefineId);
-        if (nodeDep != null) {
-            command.setTaskDependType(nodeDep);
-        }
-        if (failureStrategy != null) {
-            command.setFailureStrategy(failureStrategy);
-        }
-
-        if (StringUtils.isNotEmpty(startNodeList)) {
-            cmdParam.put(CMD_PARAM_START_NODE_NAMES, startNodeList);
-        }
-        if (warningType != null) {
-            command.setWarningType(warningType);
-        }
-        if (startParams != null && startParams.size() > 0) {
-            cmdParam.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams));
-        }
-        command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-        command.setExecutorId(executorId);
-        command.setWarningGroupId(warningGroupId);
-        command.setProcessInstancePriority(processInstancePriority);
-        command.setWorkerGroup(workerGroup);
-
-        Date start = null;
-        Date end = null;
-        if (StringUtils.isNotEmpty(schedule)) {
-            String[] interval = schedule.split(",");
-            if (interval.length == 2) {
-                start = DateUtils.getScheduleDate(interval[0]);
-                end = DateUtils.getScheduleDate(interval[1]);
-            }
-        }
-
-        // determine whether to complement
-        if (commandType == CommandType.COMPLEMENT_DATA) {
-            runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
-            if (null != start && null != end && !start.after(end)) {
-                if (runMode == RunMode.RUN_MODE_SERIAL) {
-                    cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
-                    cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
-                    command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                    return processService.createCommand(command);
-                } else if (runMode == RunMode.RUN_MODE_PARALLEL) {
-                    List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefineId);
-                    List<Date> listDate = new LinkedList<>();
-                    if (!CollectionUtils.isEmpty(schedules)) {
-                        for (Schedule item : schedules) {
-                            listDate.addAll(CronUtils.getSelfFireDateList(start, end, item.getCrontab()));
-                        }
-                    }
-                    if (!CollectionUtils.isEmpty(listDate)) {
-                        // loop by schedule date
-                        for (Date date : listDate) {
-                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(date));
-                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(date));
-                            command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                            processService.createCommand(command);
-                        }
-                        return listDate.size();
-                    } else {
-                        // loop by day
-                        int runCunt = 0;
-                        while (!start.after(end)) {
-                            runCunt += 1;
-                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
-                            cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start));
-                            command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-                            processService.createCommand(command);
-                            start = DateUtils.getSomeDay(start, 1);
-                        }
-                        return runCunt;
-                    }
-                }
-            } else {
-                logger.error("there is not valid schedule date for the process definition: id:{},date:{}",
-                        processDefineId, schedule);
-            }
-        } else {
-            command.setCommandParam(JSONUtils.toJsonString(cmdParam));
-            return processService.createCommand(command);
-        }
-
-        return 0;
-    }
-
-    /**
-     * check result and auth
-     */
-    private Map<String, Object> checkResultAndAuth(User loginUser, String projectName, Project project) {
-        // check project auth
-        Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
-        Status status = (Status) checkResult.get(Constants.STATUS);
-        if (status != Status.SUCCESS) {
-            return checkResult;
-        }
-        return null;
-    }
-
+    Map<String, Object> startCheckByProcessDefinedId(int processDefineId);
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
similarity index 94%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
index 2ca9cbe..cdb5197 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
@@ -15,9 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.api.service;
+package org.apache.dolphinscheduler.api.service.impl;
 
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.DataSourceService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
@@ -30,7 +32,6 @@ import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
 import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
 import org.apache.dolphinscheduler.dao.datasource.OracleDataSource;
 import org.apache.dolphinscheduler.dao.entity.DataSource;
-import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
 import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
@@ -56,12 +57,12 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
- * datasource service
+ * datasource service impl
  */
 @Service
-public class DataSourceService extends BaseService {
+public class DataSourceServiceImpl extends BaseService implements DataSourceService {
 
-    private static final Logger logger = LoggerFactory.getLogger(DataSourceService.class);
+    private static final Logger logger = LoggerFactory.getLogger(DataSourceServiceImpl.class);
 
     public static final String NAME = "name";
     public static final String NOTE = "note";
@@ -183,7 +184,7 @@ public class DataSourceService extends BaseService {
 
     private boolean checkName(String name) {
         List<DataSource> queryDataSource = dataSourceMapper.queryDataSourceByName(name.trim());
-        return queryDataSource != null && queryDataSource.size() > 0;
+        return queryDataSource != null && !queryDataSource.isEmpty();
     }
 
     /**
@@ -194,7 +195,7 @@ public class DataSourceService extends BaseService {
      */
     public Map<String, Object> queryDataSource(int id) {
 
-        Map<String, Object> result = new HashMap<String, Object>(5);
+        Map<String, Object> result = new HashMap<>();
         DataSource dataSource = dataSourceMapper.selectById(id);
         if (dataSource == null) {
             putMsg(result, Status.RESOURCE_NOT_EXIST);
@@ -247,7 +248,7 @@ public class DataSourceService extends BaseService {
                 break;
         }
 
-        Map<String, String> otherMap = new LinkedHashMap<String, String>();
+        Map<String, String> otherMap = new LinkedHashMap<>();
         if (other != null) {
             String[] configs = other.split(separator);
             for (String config : configs) {
@@ -256,7 +257,7 @@ public class DataSourceService extends BaseService {
 
         }
 
-        Map<String, Object> map = new HashMap<>(10);
+        Map<String, Object> map = new HashMap<>();
         map.put(NAME, dataSourceName);
         map.put(NOTE, desc);
         map.put(TYPE, dataSourceType);
@@ -289,8 +290,8 @@ public class DataSourceService extends BaseService {
      */
     public Map<String, Object> queryDataSourceListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
         Map<String, Object> result = new HashMap<>();
-        IPage<DataSource> dataSourceList = null;
-        Page<DataSource> dataSourcePage = new Page(pageNo, pageSize);
+        IPage<DataSource> dataSourceList;
+        Page<DataSource> dataSourcePage = new Page<>(pageNo, pageSize);
 
         if (isAdmin(loginUser)) {
             dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, 0, searchVal);
@@ -300,7 +301,7 @@ public class DataSourceService extends BaseService {
 
         List<DataSource> dataSources = dataSourceList != null ? dataSourceList.getRecords() : new ArrayList<>();
         handlePasswd(dataSources);
-        PageInfo pageInfo = new PageInfo<Resource>(pageNo, pageSize);
+        PageInfo<DataSource> pageInfo = new PageInfo<>(pageNo, pageSize);
         pageInfo.setTotalCount((int) (dataSourceList != null ? dataSourceList.getTotal() : 0L));
         pageInfo.setLists(dataSources);
         result.put(Constants.DATA_LIST, pageInfo);
@@ -315,18 +316,24 @@ public class DataSourceService extends BaseService {
      * @param dataSourceList
      */
     private void handlePasswd(List<DataSource> dataSourceList) {
-
         for (DataSource dataSource : dataSourceList) {
-
             String connectionParams = dataSource.getConnectionParams();
             ObjectNode object = JSONUtils.parseObject(connectionParams);
-            object.put(Constants.PASSWORD, Constants.XXXXXX);
+            object.put(Constants.PASSWORD, getHiddenPassword());
             dataSource.setConnectionParams(object.toString());
-
         }
     }
 
     /**
+     * get hidden password (resolve the security hotspot)
+     *
+     * @return hidden password
+     */
+    private String getHiddenPassword() {
+        return Constants.XXXXXX;
+    }
+
+    /**
      * query data resource list
      *
      * @param loginUser login user
@@ -359,8 +366,7 @@ public class DataSourceService extends BaseService {
     public Result<Object> verifyDataSourceName(String name) {
         Result<Object> result = new Result<>();
         List<DataSource> dataSourceList = dataSourceMapper.queryDataSourceByName(name);
-        if (dataSourceList != null && dataSourceList.size() > 0) {
-            logger.error("datasource name:{} has exist, can't create again.", name);
+        if (dataSourceList != null && !dataSourceList.isEmpty()) {
             putMsg(result, Status.DATASOURCE_EXIST);
         } else {
             putMsg(result, Status.SUCCESS);
@@ -431,7 +437,7 @@ public class DataSourceService extends BaseService {
                                  String javaSecurityKrb5Conf, String loginUserKeytabUsername, String loginUserKeytabPath) {
 
         String address = buildAddress(type, host, port, connectType);
-        Map<String, Object> parameterMap = new LinkedHashMap<String, Object>(6);
+        Map<String, Object> parameterMap = new LinkedHashMap<>();
         String jdbcUrl;
         if (DbType.SQLSERVER == type) {
             jdbcUrl = address + ";databaseName=" + database;
@@ -590,13 +596,13 @@ public class DataSourceService extends BaseService {
         List<DataSource> resultList = new ArrayList<>();
         List<DataSource> datasourceList = dataSourceMapper.queryDatasourceExceptUserId(userId);
         Set<DataSource> datasourceSet = null;
-        if (datasourceList != null && datasourceList.size() > 0) {
+        if (datasourceList != null && !datasourceList.isEmpty()) {
             datasourceSet = new HashSet<>(datasourceList);
 
             List<DataSource> authedDataSourceList = dataSourceMapper.queryAuthedDatasource(userId);
 
             Set<DataSource> authedDataSourceSet = null;
-            if (authedDataSourceList != null && authedDataSourceList.size() > 0) {
+            if (authedDataSourceList != null && !authedDataSourceList.isEmpty()) {
                 authedDataSourceSet = new HashSet<>(authedDataSourceList);
                 datasourceSet.removeAll(authedDataSourceSet);
 
@@ -632,16 +638,6 @@ public class DataSourceService extends BaseService {
     /**
      * get host and port by address
      *
-     * @param address address
-     * @return sting array: [host,port]
-     */
-    private String[] getHostsAndPort(String address) {
-        return getHostsAndPort(address, Constants.DOUBLE_SLASH);
-    }
-
-    /**
-     * get host and port by address
-     *
      * @param address   address
      * @param separator separator
      * @return sting array: [host,port]
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
similarity index 97%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 77be0a0..0dff163 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.api.service;
+package org.apache.dolphinscheduler.api.service.impl;
 
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
 import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
@@ -26,6 +26,10 @@ import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
 
 import org.apache.dolphinscheduler.api.enums.ExecuteType;
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.ExecutorService;
+import org.apache.dolphinscheduler.api.service.MonitorService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -53,7 +57,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
 
-import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -67,12 +70,12 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 /**
- * executor service
+ * executor service impl
  */
 @Service
-public class ExecutorService extends BaseService {
+public class ExecutorServiceImpl extends BaseService implements ExecutorService {
 
-    private static final Logger logger = LoggerFactory.getLogger(ExecutorService.class);
+    private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceImpl.class);
 
     @Autowired
     private ProjectMapper projectMapper;
@@ -113,7 +116,6 @@ public class ExecutorService extends BaseService {
      * @param timeout timeout
      * @param startParams the global param values which pass to new process instance
      * @return execute process instance code
-     * @throws ParseException Parse Exception
      */
     public Map<String, Object> execProcessInstance(User loginUser, String projectName,
                                                    int processDefinitionId, String cronTime, CommandType commandType,
@@ -121,7 +123,7 @@ public class ExecutorService extends BaseService {
                                                    TaskDependType taskDependType, WarningType warningType, int warningGroupId,
                                                    RunMode runMode,
                                                    Priority processInstancePriority, String workerGroup, Integer timeout,
-                                                   Map<String, String> startParams) throws ParseException {
+                                                   Map<String, String> startParams) {
         Map<String, Object> result = new HashMap<>();
         // timeout is invalid
         if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
@@ -181,7 +183,7 @@ public class ExecutorService extends BaseService {
         List<Server> masterServers = monitorService.getServerListFromZK(true);
 
         // no master
-        if (masterServers.size() == 0) {
+        if (masterServers.isEmpty()) {
             putMsg(result, Status.MASTER_NOT_EXISTS);
             return false;
         }
@@ -297,7 +299,6 @@ public class ExecutorService extends BaseService {
      * @return true if tenant suitable, otherwise return false
      */
     private boolean checkTenantSuitable(ProcessDefinition processDefinition) {
-        // checkTenantExists();
         Tenant tenant = processService.getTenantForProcess(processDefinition.getTenantId(),
                 processDefinition.getUserId());
         return tenant != null;
@@ -465,7 +466,7 @@ public class ExecutorService extends BaseService {
                               String startNodeList, String schedule, WarningType warningType,
                               int executorId, int warningGroupId,
                               RunMode runMode, Priority processInstancePriority, String workerGroup,
-                              Map<String, String> startParams) throws ParseException {
+                              Map<String, String> startParams) {
 
         /**
          * instantiate command schedule instance
@@ -552,8 +553,7 @@ public class ExecutorService extends BaseService {
                     }
                 }
             } else {
-                logger.error("there is not valid schedule date for the process definition: id:{},date:{}",
-                        processDefineId, schedule);
+                logger.error("there is not valid schedule date for the process definition: id:{}", processDefineId);
             }
         } else {
             command.setCommandParam(JSONUtils.toJsonString(cmdParam));
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
index 13eb1b9..8b342d1 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.api.service;
 
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.impl.DataSourceServiceImpl;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.DbConnectType;
@@ -57,7 +58,7 @@ public class DataSourceServiceTest {
 
 
     @InjectMocks
-    private DataSourceService dataSourceService;
+    private DataSourceServiceImpl dataSourceService;
     @Mock
     private DataSourceMapper dataSourceMapper;
     @Mock
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
index 3f25fb8..c9d2893 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -65,7 +66,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 public class ExecutorService2Test {
 
     @InjectMocks
-    private ExecutorService executorService;
+    private ExecutorServiceImpl executorService;
 
     @Mock
     private ProcessService processService;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 57cd207..fa60c78 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service;
 
 import org.apache.dolphinscheduler.api.ApiApplicationServer;
 import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
 import org.apache.dolphinscheduler.common.Constants;
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -39,7 +40,7 @@ public class ExecutorServiceTest {
     private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTest.class);
 
     @Autowired
-    private ExecutorService executorService;
+    private ExecutorServiceImpl executorService;
 
     @Ignore
     @Test
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index b9065ec..fe6c7fd 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -260,8 +260,8 @@ public class ProcessService {
      * @param command command
      * @return create command result
      */
-    public Boolean verifyIsNeedCreateCommand(Command command) {
-        Boolean isNeedCreate = true;
+    public boolean verifyIsNeedCreateCommand(Command command) {
+        boolean isNeedCreate = true;
         EnumMap<CommandType, Integer> cmdTypeMap = new EnumMap<>(CommandType.class);
         cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
         cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);