You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/02/18 15:27:46 UTC
[incubator-dolphinscheduler] branch dev updated:
[Improvement-3369][api] Introduce executor and datasource service interface
for clear code (#4759)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 15a5b05 [Improvement-3369][api] Introduce executor and datasource service interface for clear code (#4759)
15a5b05 is described below
commit 15a5b0588399bbafd201405af02d611d548fac12
Author: Shiwen Cheng <ch...@gmail.com>
AuthorDate: Thu Feb 18 23:27:37 2021 +0800
[Improvement-3369][api] Introduce executor and datasource service interface for clear code (#4759)
---
.../api/controller/ExecutorController.java | 3 +-
.../api/controller/ResourcesController.java | 8 +-
.../api/service/DataSourceService.java | 542 +--------------------
.../api/service/ExecutorService.java | 509 +------------------
.../DataSourceServiceImpl.java} | 58 +--
.../ExecutorServiceImpl.java} | 24 +-
.../api/service/DataSourceServiceTest.java | 3 +-
.../api/service/ExecutorService2Test.java | 3 +-
.../api/service/ExecutorServiceTest.java | 3 +-
.../service/process/ProcessService.java | 4 +-
10 files changed, 77 insertions(+), 1080 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index c1e75a7..abca855 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -35,7 +35,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.User;
-import java.text.ParseException;
import java.util.Map;
import org.slf4j.Logger;
@@ -121,7 +120,7 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "timeout", required = false) Integer timeout,
- @RequestParam(value = "startParams", required = false) String startParams) throws ParseException {
+ @RequestParam(value = "startParams", required = false) String startParams) {
logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, "
+ "failure policy: {}, node name: {}, node dep: {}, notify type: {}, "
+ "notify group id: {}, run mode: {},process instance priority:{}, workerGroup: {}, timeout: {}, startParams: {} ",
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
index 52fd023..c631c8e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
@@ -322,9 +322,7 @@ public class ResourcesController extends BaseController {
@RequestParam(value = "programType",required = false) ProgramType programType
) {
String programTypeName = programType == null ? "" : programType.name();
- String userName = loginUser.getUserName();
- userName = userName.replaceAll("[\n|\r|\t]", "_");
- logger.info("query resource list, login user:{}, resource type:{}, program type:{}", userName,programTypeName);
+ logger.info("query resource list, resource type:{}, program type:{}", type, programTypeName);
Map<String, Object> result = resourceService.queryResourceByProgramType(loginUser, type,programType);
return returnDataList(result);
}
@@ -641,9 +639,7 @@ public class ResourcesController extends BaseController {
@ApiException(QUERY_DATASOURCE_BY_TYPE_ERROR)
public Result<Object> queryUdfFuncList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("type") UdfType type) {
- String userName = loginUser.getUserName();
- userName = userName.replaceAll("[\n|\r|\t]", "_");
- logger.info("query udf func list, user:{}, type:{}", userName, type);
+ logger.info("query udf func list, type:{}", type);
Map<String, Object> result = udfFuncService.queryUdfFuncList(loginUser, type.ordinal());
return returnDataList(result);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
index 2ca9cbe..8d2a023 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
@@ -17,67 +17,17 @@
package org.apache.dolphinscheduler.api.service;
-import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbConnectType;
import org.apache.dolphinscheduler.common.enums.DbType;
-import org.apache.dolphinscheduler.common.utils.CommonUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
-import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
-import org.apache.dolphinscheduler.dao.datasource.OracleDataSource;
-import org.apache.dolphinscheduler.dao.entity.DataSource;
-import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
-import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
-import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* datasource service
*/
-@Service
-public class DataSourceService extends BaseService {
-
- private static final Logger logger = LoggerFactory.getLogger(DataSourceService.class);
-
- public static final String NAME = "name";
- public static final String NOTE = "note";
- public static final String TYPE = "type";
- public static final String HOST = "host";
- public static final String PORT = "port";
- public static final String PRINCIPAL = "principal";
- public static final String DATABASE = "database";
- public static final String USER_NAME = "userName";
- public static final String OTHER = "other";
-
- @Autowired
- private DataSourceMapper dataSourceMapper;
-
- @Autowired
- private DataSourceUserMapper datasourceUserMapper;
+public interface DataSourceService {
/**
* create data source
@@ -89,37 +39,7 @@ public class DataSourceService extends BaseService {
* @param parameter datasource parameters
* @return create result code
*/
- public Result<Object> createDataSource(User loginUser, String name, String desc, DbType type, String parameter) {
-
- Result<Object> result = new Result<>();
- // check name can use or not
- if (checkName(name)) {
- putMsg(result, Status.DATASOURCE_EXIST);
- return result;
- }
- Result<Object> isConnection = checkConnection(type, parameter);
- if (Status.SUCCESS.getCode() != isConnection.getCode()) {
- return result;
- }
-
- // build datasource
- DataSource dataSource = new DataSource();
- Date now = new Date();
-
- dataSource.setName(name.trim());
- dataSource.setNote(desc);
- dataSource.setUserId(loginUser.getId());
- dataSource.setUserName(loginUser.getUserName());
- dataSource.setType(type);
- dataSource.setConnectionParams(parameter);
- dataSource.setCreateTime(now);
- dataSource.setUpdateTime(now);
- dataSourceMapper.insert(dataSource);
-
- putMsg(result, Status.SUCCESS);
-
- return result;
- }
+ Result<Object> createDataSource(User loginUser, String name, String desc, DbType type, String parameter);
/**
* updateProcessInstance datasource
@@ -132,59 +52,7 @@ public class DataSourceService extends BaseService {
* @param id data source id
* @return update result code
*/
- public Result<Object> updateDataSource(int id, User loginUser, String name, String desc, DbType type, String parameter) {
-
- Result<Object> result = new Result<>();
- // determine whether the data source exists
- DataSource dataSource = dataSourceMapper.selectById(id);
- if (dataSource == null) {
- putMsg(result, Status.RESOURCE_NOT_EXIST);
- return result;
- }
-
- if (!hasPerm(loginUser, dataSource.getUserId())) {
- putMsg(result, Status.USER_NO_OPERATION_PERM);
- return result;
- }
-
- //check name can use or not
- if (!name.trim().equals(dataSource.getName()) && checkName(name)) {
- putMsg(result, Status.DATASOURCE_EXIST);
- return result;
- }
- //check password,if the password is not updated, set to the old password.
- ObjectNode paramObject = JSONUtils.parseObject(parameter);
- String password = paramObject.path(Constants.PASSWORD).asText();
- if (StringUtils.isBlank(password)) {
- String oldConnectionParams = dataSource.getConnectionParams();
- ObjectNode oldParams = JSONUtils.parseObject(oldConnectionParams);
- paramObject.put(Constants.PASSWORD, oldParams.path(Constants.PASSWORD).asText());
- }
- // connectionParams json
- String connectionParams = paramObject.toString();
-
- Result<Object> isConnection = checkConnection(type, parameter);
- if (Status.SUCCESS.getCode() != isConnection.getCode()) {
- return result;
- }
-
- Date now = new Date();
-
- dataSource.setName(name.trim());
- dataSource.setNote(desc);
- dataSource.setUserName(loginUser.getUserName());
- dataSource.setType(type);
- dataSource.setConnectionParams(connectionParams);
- dataSource.setUpdateTime(now);
- dataSourceMapper.updateById(dataSource);
- putMsg(result, Status.SUCCESS);
- return result;
- }
-
- private boolean checkName(String name) {
- List<DataSource> queryDataSource = dataSourceMapper.queryDataSourceByName(name.trim());
- return queryDataSource != null && queryDataSource.size() > 0;
- }
+ Result<Object> updateDataSource(int id, User loginUser, String name, String desc, DbType type, String parameter);
/**
* updateProcessInstance datasource
@@ -192,91 +60,7 @@ public class DataSourceService extends BaseService {
* @param id datasource id
* @return data source detail
*/
- public Map<String, Object> queryDataSource(int id) {
-
- Map<String, Object> result = new HashMap<String, Object>(5);
- DataSource dataSource = dataSourceMapper.selectById(id);
- if (dataSource == null) {
- putMsg(result, Status.RESOURCE_NOT_EXIST);
- return result;
- }
- // type
- String dataSourceType = dataSource.getType().toString();
- // name
- String dataSourceName = dataSource.getName();
- // desc
- String desc = dataSource.getNote();
- // parameter
- String parameter = dataSource.getConnectionParams();
-
- BaseDataSource datasourceForm = DataSourceFactory.getDatasource(dataSource.getType(), parameter);
- DbConnectType connectType = null;
- String hostSeperator = Constants.DOUBLE_SLASH;
- if (DbType.ORACLE.equals(dataSource.getType())) {
- connectType = ((OracleDataSource) datasourceForm).getConnectType();
- if (DbConnectType.ORACLE_SID.equals(connectType)) {
- hostSeperator = Constants.AT_SIGN;
- }
- }
- String database = datasourceForm.getDatabase();
- // jdbc connection params
- String other = datasourceForm.getOther();
- String address = datasourceForm.getAddress();
-
- String[] hostsPorts = getHostsAndPort(address, hostSeperator);
- // ip host
- String host = hostsPorts[0];
- // prot
- String port = hostsPorts[1];
- String separator = "";
-
- switch (dataSource.getType()) {
- case HIVE:
- case SQLSERVER:
- separator = ";";
- break;
- case MYSQL:
- case POSTGRESQL:
- case CLICKHOUSE:
- case ORACLE:
- case PRESTO:
- separator = "&";
- break;
- default:
- separator = "&";
- break;
- }
-
- Map<String, String> otherMap = new LinkedHashMap<String, String>();
- if (other != null) {
- String[] configs = other.split(separator);
- for (String config : configs) {
- otherMap.put(config.split("=")[0], config.split("=")[1]);
- }
-
- }
-
- Map<String, Object> map = new HashMap<>(10);
- map.put(NAME, dataSourceName);
- map.put(NOTE, desc);
- map.put(TYPE, dataSourceType);
- if (connectType != null) {
- map.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType);
- }
-
- map.put(HOST, host);
- map.put(PORT, port);
- map.put(PRINCIPAL, datasourceForm.getPrincipal());
- map.put(Constants.KERBEROS_KRB5_CONF_PATH, datasourceForm.getJavaSecurityKrb5Conf());
- map.put(Constants.KERBEROS_KEY_TAB_USERNAME, datasourceForm.getLoginUserKeytabUsername());
- map.put(Constants.KERBEROS_KEY_TAB_PATH, datasourceForm.getLoginUserKeytabPath());
- map.put(DATABASE, database);
- map.put(USER_NAME, datasourceForm.getUser());
- map.put(OTHER, otherMap);
- result.put(Constants.DATA_LIST, map);
- putMsg(result, Status.SUCCESS);
- return result;
- }
+ Map<String, Object> queryDataSource(int id);
/**
* query datasource list by keyword
@@ -287,44 +71,7 @@ public class DataSourceService extends BaseService {
* @param pageSize page size
* @return data source list page
*/
- public Map<String, Object> queryDataSourceListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
- Map<String, Object> result = new HashMap<>();
- IPage<DataSource> dataSourceList = null;
- Page<DataSource> dataSourcePage = new Page(pageNo, pageSize);
-
- if (isAdmin(loginUser)) {
- dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, 0, searchVal);
- } else {
- dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, loginUser.getId(), searchVal);
- }
-
- List<DataSource> dataSources = dataSourceList != null ? dataSourceList.getRecords() : new ArrayList<>();
- handlePasswd(dataSources);
- PageInfo pageInfo = new PageInfo<Resource>(pageNo, pageSize);
- pageInfo.setTotalCount((int) (dataSourceList != null ? dataSourceList.getTotal() : 0L));
- pageInfo.setLists(dataSources);
- result.put(Constants.DATA_LIST, pageInfo);
- putMsg(result, Status.SUCCESS);
-
- return result;
- }
-
- /**
- * handle datasource connection password for safety
- *
- * @param dataSourceList
- */
- private void handlePasswd(List<DataSource> dataSourceList) {
-
- for (DataSource dataSource : dataSourceList) {
-
- String connectionParams = dataSource.getConnectionParams();
- ObjectNode object = JSONUtils.parseObject(connectionParams);
- object.put(Constants.PASSWORD, Constants.XXXXXX);
- dataSource.setConnectionParams(object.toString());
-
- }
- }
+ Map<String, Object> queryDataSourceListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize);
/**
* query data resource list
@@ -333,22 +80,7 @@ public class DataSourceService extends BaseService {
* @param type data source type
* @return data source list page
*/
- public Map<String, Object> queryDataSourceList(User loginUser, Integer type) {
- Map<String, Object> result = new HashMap<>();
-
- List<DataSource> datasourceList;
-
- if (isAdmin(loginUser)) {
- datasourceList = dataSourceMapper.listAllDataSourceByType(type);
- } else {
- datasourceList = dataSourceMapper.queryDataSourceByType(loginUser.getId(), type);
- }
-
- result.put(Constants.DATA_LIST, datasourceList);
- putMsg(result, Status.SUCCESS);
-
- return result;
- }
+ Map<String, Object> queryDataSourceList(User loginUser, Integer type);
/**
* verify datasource exists
@@ -356,18 +88,7 @@ public class DataSourceService extends BaseService {
* @param name datasource name
* @return true if data datasource not exists, otherwise return false
*/
- public Result<Object> verifyDataSourceName(String name) {
- Result<Object> result = new Result<>();
- List<DataSource> dataSourceList = dataSourceMapper.queryDataSourceByName(name);
- if (dataSourceList != null && dataSourceList.size() > 0) {
- logger.error("datasource name:{} has exist, can't create again.", name);
- putMsg(result, Status.DATASOURCE_EXIST);
- } else {
- putMsg(result, Status.SUCCESS);
- }
-
- return result;
- }
+ Result<Object> verifyDataSourceName(String name);
/**
* check connection
@@ -376,25 +97,7 @@ public class DataSourceService extends BaseService {
* @param parameter data source parameters
* @return true if connect successfully, otherwise false
*/
- public Result<Object> checkConnection(DbType type, String parameter) {
- Result<Object> result = new Result<>();
- BaseDataSource datasource = DataSourceFactory.getDatasource(type, parameter);
- if (datasource == null) {
- putMsg(result, Status.DATASOURCE_TYPE_NOT_EXIST, type);
- return result;
- }
- try (Connection connection = datasource.getConnection()) {
- if (connection == null) {
- putMsg(result, Status.CONNECTION_TEST_FAILURE);
- return result;
- }
- putMsg(result, Status.SUCCESS);
- return result;
- } catch (Exception e) {
- logger.error("datasource test connection error, dbType:{}, jdbcUrl:{}, message:{}.", type, datasource.getJdbcUrl(), e.getMessage());
- return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(),e.getMessage());
- }
- }
+ Result<Object> checkConnection(DbType type, String parameter);
/**
* test connection
@@ -402,15 +105,7 @@ public class DataSourceService extends BaseService {
* @param id datasource id
* @return connect result code
*/
- public Result<Object> connectionTest(int id) {
- DataSource dataSource = dataSourceMapper.selectById(id);
- if (dataSource == null) {
- Result<Object> result = new Result<>();
- putMsg(result, Status.RESOURCE_NOT_EXIST);
- return result;
- }
- return checkConnection(dataSource.getType(), dataSource.getConnectionParams());
- }
+ Result<Object> connectionTest(int id);
/**
* build paramters
@@ -425,116 +120,10 @@ public class DataSourceService extends BaseService {
* @param principal principal
* @return datasource parameter
*/
- public String buildParameter(DbType type, String host,
- String port, String database, String principal, String userName,
- String password, DbConnectType connectType, String other,
- String javaSecurityKrb5Conf, String loginUserKeytabUsername, String loginUserKeytabPath) {
-
- String address = buildAddress(type, host, port, connectType);
- Map<String, Object> parameterMap = new LinkedHashMap<String, Object>(6);
- String jdbcUrl;
- if (DbType.SQLSERVER == type) {
- jdbcUrl = address + ";databaseName=" + database;
- } else {
- jdbcUrl = address + "/" + database;
- }
-
- if (Constants.ORACLE.equals(type.name())) {
- parameterMap.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType);
- }
-
- if (CommonUtils.getKerberosStartupState()
- && (type == DbType.HIVE || type == DbType.SPARK)) {
- jdbcUrl += ";principal=" + principal;
- }
-
- String separator = "";
- if (Constants.MYSQL.equals(type.name())
- || Constants.POSTGRESQL.equals(type.name())
- || Constants.CLICKHOUSE.equals(type.name())
- || Constants.ORACLE.equals(type.name())
- || Constants.PRESTO.equals(type.name())) {
- separator = "&";
- } else if (Constants.HIVE.equals(type.name())
- || Constants.SPARK.equals(type.name())
- || Constants.DB2.equals(type.name())
- || Constants.SQLSERVER.equals(type.name())) {
- separator = ";";
- }
-
- parameterMap.put(TYPE, connectType);
- parameterMap.put(Constants.ADDRESS, address);
- parameterMap.put(Constants.DATABASE, database);
- parameterMap.put(Constants.JDBC_URL, jdbcUrl);
- parameterMap.put(Constants.USER, userName);
- parameterMap.put(Constants.PASSWORD, CommonUtils.encodePassword(password));
- if (CommonUtils.getKerberosStartupState()
- && (type == DbType.HIVE || type == DbType.SPARK)) {
- parameterMap.put(Constants.PRINCIPAL, principal);
- parameterMap.put(Constants.KERBEROS_KRB5_CONF_PATH, javaSecurityKrb5Conf);
- parameterMap.put(Constants.KERBEROS_KEY_TAB_USERNAME, loginUserKeytabUsername);
- parameterMap.put(Constants.KERBEROS_KEY_TAB_PATH, loginUserKeytabPath);
- }
-
- Map<String, String> map = JSONUtils.toMap(other);
- if (map != null) {
- StringBuilder otherSb = new StringBuilder();
- for (Map.Entry<String, String> entry: map.entrySet()) {
- otherSb.append(String.format("%s=%s%s", entry.getKey(), entry.getValue(), separator));
- }
- if (!Constants.DB2.equals(type.name())) {
- otherSb.deleteCharAt(otherSb.length() - 1);
- }
- parameterMap.put(Constants.OTHER, otherSb);
- }
-
- if (logger.isDebugEnabled()) {
- logger.info("parameters map:{}", JSONUtils.toJsonString(parameterMap));
- }
- return JSONUtils.toJsonString(parameterMap);
-
- }
-
- private String buildAddress(DbType type, String host, String port, DbConnectType connectType) {
- StringBuilder sb = new StringBuilder();
- if (Constants.MYSQL.equals(type.name())) {
- sb.append(Constants.JDBC_MYSQL);
- sb.append(host).append(":").append(port);
- } else if (Constants.POSTGRESQL.equals(type.name())) {
- sb.append(Constants.JDBC_POSTGRESQL);
- sb.append(host).append(":").append(port);
- } else if (Constants.HIVE.equals(type.name()) || Constants.SPARK.equals(type.name())) {
- sb.append(Constants.JDBC_HIVE_2);
- String[] hostArray = host.split(",");
- if (hostArray.length > 0) {
- for (String zkHost : hostArray) {
- sb.append(String.format("%s:%s,", zkHost, port));
- }
- sb.deleteCharAt(sb.length() - 1);
- }
- } else if (Constants.CLICKHOUSE.equals(type.name())) {
- sb.append(Constants.JDBC_CLICKHOUSE);
- sb.append(host).append(":").append(port);
- } else if (Constants.ORACLE.equals(type.name())) {
- if (connectType == DbConnectType.ORACLE_SID) {
- sb.append(Constants.JDBC_ORACLE_SID);
- } else {
- sb.append(Constants.JDBC_ORACLE_SERVICE_NAME);
- }
- sb.append(host).append(":").append(port);
- } else if (Constants.SQLSERVER.equals(type.name())) {
- sb.append(Constants.JDBC_SQLSERVER);
- sb.append(host).append(":").append(port);
- } else if (Constants.DB2.equals(type.name())) {
- sb.append(Constants.JDBC_DB2);
- sb.append(host).append(":").append(port);
- } else if (Constants.PRESTO.equals(type.name())) {
- sb.append(Constants.JDBC_PRESTO);
- sb.append(host).append(":").append(port);
- }
-
- return sb.toString();
- }
+ String buildParameter(DbType type, String host,
+ String port, String database, String principal, String userName,
+ String password, DbConnectType connectType, String other,
+ String javaSecurityKrb5Conf, String loginUserKeytabUsername, String loginUserKeytabPath);
/**
* delete datasource
@@ -543,30 +132,7 @@ public class DataSourceService extends BaseService {
* @param datasourceId data source id
* @return delete result code
*/
- @Transactional(rollbackFor = RuntimeException.class)
- public Result<Object> delete(User loginUser, int datasourceId) {
- Result<Object> result = new Result<>();
- try {
- //query datasource by id
- DataSource dataSource = dataSourceMapper.selectById(datasourceId);
- if (dataSource == null) {
- logger.error("resource id {} not exist", datasourceId);
- putMsg(result, Status.RESOURCE_NOT_EXIST);
- return result;
- }
- if (!hasPerm(loginUser, dataSource.getUserId())) {
- putMsg(result, Status.USER_NO_OPERATION_PERM);
- return result;
- }
- dataSourceMapper.deleteById(datasourceId);
- datasourceUserMapper.deleteByDatasourceId(datasourceId);
- putMsg(result, Status.SUCCESS);
- } catch (Exception e) {
- logger.error("delete datasource error", e);
- throw new RuntimeException("delete datasource error");
- }
- return result;
- }
+ Result<Object> delete(User loginUser, int datasourceId);
/**
* unauthorized datasource
@@ -575,38 +141,7 @@ public class DataSourceService extends BaseService {
* @param userId user id
* @return unauthed data source result code
*/
- public Map<String, Object> unauthDatasource(User loginUser, Integer userId) {
-
- Map<String, Object> result = new HashMap<>();
- //only admin operate
- if (!isAdmin(loginUser)) {
- putMsg(result, Status.USER_NO_OPERATION_PERM);
- return result;
- }
-
- /**
- * query all data sources except userId
- */
- List<DataSource> resultList = new ArrayList<>();
- List<DataSource> datasourceList = dataSourceMapper.queryDatasourceExceptUserId(userId);
- Set<DataSource> datasourceSet = null;
- if (datasourceList != null && datasourceList.size() > 0) {
- datasourceSet = new HashSet<>(datasourceList);
-
- List<DataSource> authedDataSourceList = dataSourceMapper.queryAuthedDatasource(userId);
-
- Set<DataSource> authedDataSourceSet = null;
- if (authedDataSourceList != null && authedDataSourceList.size() > 0) {
- authedDataSourceSet = new HashSet<>(authedDataSourceList);
- datasourceSet.removeAll(authedDataSourceSet);
-
- }
- resultList = new ArrayList<>(datasourceSet);
- }
- result.put(Constants.DATA_LIST, resultList);
- putMsg(result, Status.SUCCESS);
- return result;
- }
+ Map<String, Object> unauthDatasource(User loginUser, Integer userId);
/**
* authorized datasource
@@ -615,50 +150,5 @@ public class DataSourceService extends BaseService {
* @param userId user id
* @return authorized result code
*/
- public Map<String, Object> authedDatasource(User loginUser, Integer userId) {
- Map<String, Object> result = new HashMap<>();
-
- if (!isAdmin(loginUser)) {
- putMsg(result, Status.USER_NO_OPERATION_PERM);
- return result;
- }
-
- List<DataSource> authedDatasourceList = dataSourceMapper.queryAuthedDatasource(userId);
- result.put(Constants.DATA_LIST, authedDatasourceList);
- putMsg(result, Status.SUCCESS);
- return result;
- }
-
- /**
- * get host and port by address
- *
- * @param address address
- * @return sting array: [host,port]
- */
- private String[] getHostsAndPort(String address) {
- return getHostsAndPort(address, Constants.DOUBLE_SLASH);
- }
-
- /**
- * get host and port by address
- *
- * @param address address
- * @param separator separator
- * @return sting array: [host,port]
- */
- private String[] getHostsAndPort(String address, String separator) {
- String[] result = new String[2];
- String[] tmpArray = address.split(separator);
- String hostsAndPorts = tmpArray[tmpArray.length - 1];
- StringBuilder hosts = new StringBuilder();
- String[] hostPortArray = hostsAndPorts.split(Constants.COMMA);
- String port = hostPortArray[0].split(Constants.COLON)[1];
- for (String hostPort : hostPortArray) {
- hosts.append(hostPort.split(Constants.COLON)[0]).append(Constants.COMMA);
- }
- hosts.deleteCharAt(hosts.length() - 1);
- result[0] = hosts.toString();
- result[1] = port;
- return result;
- }
+ Map<String, Object> authedDatasource(User loginUser, Integer userId);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index 77be0a0..6bed979 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -17,82 +17,22 @@
package org.apache.dolphinscheduler.api.service;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
-import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
-import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
-
import org.apache.dolphinscheduler.api.enums.ExecuteType;
-import org.apache.dolphinscheduler.api.enums.Status;
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
-import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
-import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
/**
* executor service
*/
-@Service
-public class ExecutorService extends BaseService {
-
- private static final Logger logger = LoggerFactory.getLogger(ExecutorService.class);
-
- @Autowired
- private ProjectMapper projectMapper;
-
- @Autowired
- private ProjectService projectService;
-
- @Autowired
- private ProcessDefinitionMapper processDefinitionMapper;
-
- @Autowired
- private MonitorService monitorService;
-
-
- @Autowired
- private ProcessInstanceMapper processInstanceMapper;
-
-
- @Autowired
- private ProcessService processService;
+public interface ExecutorService {
/**
* execute process instance
@@ -113,80 +53,14 @@ public class ExecutorService extends BaseService {
* @param timeout timeout
* @param startParams the global param values which pass to new process instance
* @return execute process instance code
- * @throws ParseException Parse Exception
- */
- public Map<String, Object> execProcessInstance(User loginUser, String projectName,
- int processDefinitionId, String cronTime, CommandType commandType,
- FailureStrategy failureStrategy, String startNodeList,
- TaskDependType taskDependType, WarningType warningType, int warningGroupId,
- RunMode runMode,
- Priority processInstancePriority, String workerGroup, Integer timeout,
- Map<String, String> startParams) throws ParseException {
- Map<String, Object> result = new HashMap<>();
- // timeout is invalid
- if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
- putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR);
- return result;
- }
- Project project = projectMapper.queryByName(projectName);
- Map<String, Object> checkResultAndAuth = checkResultAndAuth(loginUser, projectName, project);
- if (checkResultAndAuth != null) {
- return checkResultAndAuth;
- }
-
- // check process define release state
- ProcessDefinition processDefinition = processDefinitionMapper.selectById(processDefinitionId);
- result = checkProcessDefinitionValid(processDefinition, processDefinitionId);
- if (result.get(Constants.STATUS) != Status.SUCCESS) {
- return result;
- }
-
- if (!checkTenantSuitable(processDefinition)) {
- logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
- processDefinition.getId(), processDefinition.getName());
- putMsg(result, Status.TENANT_NOT_SUITABLE);
- return result;
- }
-
- // check master exists
- if (!checkMasterExists(result)) {
- return result;
- }
-
- /**
- * create command
- */
- int create = this.createCommand(commandType, processDefinitionId,
- taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
- warningGroupId, runMode, processInstancePriority, workerGroup, startParams);
-
- if (create > 0) {
- processDefinition.setWarningGroupId(warningGroupId);
- processDefinitionMapper.updateById(processDefinition);
- putMsg(result, Status.SUCCESS);
- } else {
- putMsg(result, Status.START_PROCESS_INSTANCE_ERROR);
- }
- return result;
- }
-
- /**
- * check whether master exists
- *
- * @param result result
- * @return master exists return true , otherwise return false
*/
- private boolean checkMasterExists(Map<String, Object> result) {
- // check master server exists
- List<Server> masterServers = monitorService.getServerListFromZK(true);
-
- // no master
- if (masterServers.size() == 0) {
- putMsg(result, Status.MASTER_NOT_EXISTS);
- return false;
- }
- return true;
- }
+ Map<String, Object> execProcessInstance(User loginUser, String projectName,
+ int processDefinitionId, String cronTime, CommandType commandType,
+ FailureStrategy failureStrategy, String startNodeList,
+ TaskDependType taskDependType, WarningType warningType, int warningGroupId,
+ RunMode runMode,
+ Priority processInstancePriority, String workerGroup, Integer timeout,
+ Map<String, String> startParams);
/**
* check whether the process definition can be executed
@@ -195,19 +69,7 @@ public class ExecutorService extends BaseService {
* @param processDefineId process definition id
* @return check result code
*/
- public Map<String, Object> checkProcessDefinitionValid(ProcessDefinition processDefinition, int processDefineId) {
- Map<String, Object> result = new HashMap<>();
- if (processDefinition == null) {
- // check process definition exists
- putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId);
- } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
- // check process definition online
- putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineId);
- } else {
- result.put(Constants.STATUS, Status.SUCCESS);
- }
- return result;
- }
+ Map<String, Object> checkProcessDefinitionValid(ProcessDefinition processDefinition, int processDefineId);
/**
* do action to process instance:pause, stop, repeat, recover from pause, recover from stop
@@ -218,194 +80,7 @@ public class ExecutorService extends BaseService {
* @param executeType execute type
* @return execute result code
*/
- public Map<String, Object> execute(User loginUser, String projectName, Integer processInstanceId, ExecuteType executeType) {
- Map<String, Object> result = new HashMap<>();
- Project project = projectMapper.queryByName(projectName);
-
- Map<String, Object> checkResult = checkResultAndAuth(loginUser, projectName, project);
- if (checkResult != null) {
- return checkResult;
- }
-
- // check master exists
- if (!checkMasterExists(result)) {
- return result;
- }
-
- ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
- if (processInstance == null) {
- putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
- return result;
- }
-
- ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
- if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
- result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionId());
- if (result.get(Constants.STATUS) != Status.SUCCESS) {
- return result;
- }
- }
-
- checkResult = checkExecuteType(processInstance, executeType);
- Status status = (Status) checkResult.get(Constants.STATUS);
- if (status != Status.SUCCESS) {
- return checkResult;
- }
- if (!checkTenantSuitable(processDefinition)) {
- logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
- processDefinition.getId(), processDefinition.getName());
- putMsg(result, Status.TENANT_NOT_SUITABLE);
- }
-
- switch (executeType) {
- case REPEAT_RUNNING:
- result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING);
- break;
- case RECOVER_SUSPENDED_PROCESS:
- result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RECOVER_SUSPENDED_PROCESS);
- break;
- case START_FAILURE_TASK_PROCESS:
- result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.START_FAILURE_TASK_PROCESS);
- break;
- case STOP:
- if (processInstance.getState() == ExecutionStatus.READY_STOP) {
- putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
- } else {
- result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
- }
- break;
- case PAUSE:
- if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
- putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
- } else {
- result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
- }
- break;
- default:
- logger.error("unknown execute type : {}", executeType);
- putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
-
- break;
- }
- return result;
- }
-
- /**
- * check tenant suitable
- *
- * @param processDefinition process definition
- * @return true if tenant suitable, otherwise return false
- */
- private boolean checkTenantSuitable(ProcessDefinition processDefinition) {
- // checkTenantExists();
- Tenant tenant = processService.getTenantForProcess(processDefinition.getTenantId(),
- processDefinition.getUserId());
- return tenant != null;
- }
-
- /**
- * Check the state of process instance and the type of operation match
- *
- * @param processInstance process instance
- * @param executeType execute type
- * @return check result code
- */
- private Map<String, Object> checkExecuteType(ProcessInstance processInstance, ExecuteType executeType) {
-
- Map<String, Object> result = new HashMap<>();
- ExecutionStatus executionStatus = processInstance.getState();
- boolean checkResult = false;
- switch (executeType) {
- case PAUSE:
- case STOP:
- if (executionStatus.typeIsRunning()) {
- checkResult = true;
- }
- break;
- case REPEAT_RUNNING:
- if (executionStatus.typeIsFinished()) {
- checkResult = true;
- }
- break;
- case START_FAILURE_TASK_PROCESS:
- if (executionStatus.typeIsFailure()) {
- checkResult = true;
- }
- break;
- case RECOVER_SUSPENDED_PROCESS:
- if (executionStatus.typeIsPause() || executionStatus.typeIsCancel()) {
- checkResult = true;
- }
- break;
- default:
- break;
- }
- if (!checkResult) {
- putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), executionStatus.toString(), executeType.toString());
- } else {
- putMsg(result, Status.SUCCESS);
- }
- return result;
- }
-
- /**
- * prepare to update process instance command type and status
- *
- * @param processInstance process instance
- * @param commandType command type
- * @param executionStatus execute status
- * @return update result
- */
- private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) {
- Map<String, Object> result = new HashMap<>();
-
- processInstance.setCommandType(commandType);
- processInstance.addHistoryCmd(commandType);
- processInstance.setState(executionStatus);
- int update = processService.updateProcessInstance(processInstance);
-
- // determine whether the process is normal
- if (update > 0) {
- putMsg(result, Status.SUCCESS);
- } else {
- putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
- }
- return result;
- }
-
- /**
- * insert command, used in the implementation of the page, re run, recovery (pause / failure) execution
- *
- * @param loginUser login user
- * @param instanceId instance id
- * @param processDefinitionId process definition id
- * @param commandType command type
- * @return insert result code
- */
- private Map<String, Object> insertCommand(User loginUser, Integer instanceId, Integer processDefinitionId, CommandType commandType) {
- Map<String, Object> result = new HashMap<>();
- Command command = new Command();
- command.setCommandType(commandType);
- command.setProcessDefinitionId(processDefinitionId);
- command.setCommandParam(String.format("{\"%s\":%d}",
- CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId));
- command.setExecutorId(loginUser.getId());
-
- if (!processService.verifyIsNeedCreateCommand(command)) {
- putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, processDefinitionId);
- return result;
- }
-
- int create = processService.createCommand(command);
-
- if (create > 0) {
- putMsg(result, Status.SUCCESS);
- } else {
- putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
- }
-
- return result;
- }
+ Map<String, Object> execute(User loginUser, String projectName, Integer processInstanceId, ExecuteType executeType);
/**
* check if sub processes are offline before starting process definition
@@ -413,167 +88,5 @@ public class ExecutorService extends BaseService {
* @param processDefineId process definition id
* @return check result code
*/
- public Map<String, Object> startCheckByProcessDefinedId(int processDefineId) {
- Map<String, Object> result = new HashMap<>();
-
- if (processDefineId == 0) {
- logger.error("process definition id is null");
- putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "process definition id");
- }
- List<Integer> ids = new ArrayList<>();
- processService.recurseFindSubProcessId(processDefineId, ids);
- Integer[] idArray = ids.toArray(new Integer[ids.size()]);
- if (!ids.isEmpty()) {
- List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryDefinitionListByIdList(idArray);
- if (processDefinitionList != null) {
- for (ProcessDefinition processDefinition : processDefinitionList) {
- /**
- * if there is no online process, exit directly
- */
- if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
- putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
- logger.info("not release process definition id: {} , name : {}",
- processDefinition.getId(), processDefinition.getName());
- return result;
- }
- }
- }
- }
- putMsg(result, Status.SUCCESS);
- return result;
- }
-
- /**
- * create command
- *
- * @param commandType commandType
- * @param processDefineId processDefineId
- * @param nodeDep nodeDep
- * @param failureStrategy failureStrategy
- * @param startNodeList startNodeList
- * @param schedule schedule
- * @param warningType warningType
- * @param executorId executorId
- * @param warningGroupId warningGroupId
- * @param runMode runMode
- * @param processInstancePriority processInstancePriority
- * @param workerGroup workerGroup
- * @return command id
- */
- private int createCommand(CommandType commandType, int processDefineId,
- TaskDependType nodeDep, FailureStrategy failureStrategy,
- String startNodeList, String schedule, WarningType warningType,
- int executorId, int warningGroupId,
- RunMode runMode, Priority processInstancePriority, String workerGroup,
- Map<String, String> startParams) throws ParseException {
-
- /**
- * instantiate command schedule instance
- */
- Command command = new Command();
-
- Map<String, String> cmdParam = new HashMap<>();
- if (commandType == null) {
- command.setCommandType(CommandType.START_PROCESS);
- } else {
- command.setCommandType(commandType);
- }
- command.setProcessDefinitionId(processDefineId);
- if (nodeDep != null) {
- command.setTaskDependType(nodeDep);
- }
- if (failureStrategy != null) {
- command.setFailureStrategy(failureStrategy);
- }
-
- if (StringUtils.isNotEmpty(startNodeList)) {
- cmdParam.put(CMD_PARAM_START_NODE_NAMES, startNodeList);
- }
- if (warningType != null) {
- command.setWarningType(warningType);
- }
- if (startParams != null && startParams.size() > 0) {
- cmdParam.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams));
- }
- command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- command.setExecutorId(executorId);
- command.setWarningGroupId(warningGroupId);
- command.setProcessInstancePriority(processInstancePriority);
- command.setWorkerGroup(workerGroup);
-
- Date start = null;
- Date end = null;
- if (StringUtils.isNotEmpty(schedule)) {
- String[] interval = schedule.split(",");
- if (interval.length == 2) {
- start = DateUtils.getScheduleDate(interval[0]);
- end = DateUtils.getScheduleDate(interval[1]);
- }
- }
-
- // determine whether to complement
- if (commandType == CommandType.COMPLEMENT_DATA) {
- runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
- if (null != start && null != end && !start.after(end)) {
- if (runMode == RunMode.RUN_MODE_SERIAL) {
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
- command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- return processService.createCommand(command);
- } else if (runMode == RunMode.RUN_MODE_PARALLEL) {
- List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefineId);
- List<Date> listDate = new LinkedList<>();
- if (!CollectionUtils.isEmpty(schedules)) {
- for (Schedule item : schedules) {
- listDate.addAll(CronUtils.getSelfFireDateList(start, end, item.getCrontab()));
- }
- }
- if (!CollectionUtils.isEmpty(listDate)) {
- // loop by schedule date
- for (Date date : listDate) {
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(date));
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(date));
- command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- processService.createCommand(command);
- }
- return listDate.size();
- } else {
- // loop by day
- int runCunt = 0;
- while (!start.after(end)) {
- runCunt += 1;
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
- cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start));
- command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- processService.createCommand(command);
- start = DateUtils.getSomeDay(start, 1);
- }
- return runCunt;
- }
- }
- } else {
- logger.error("there is not valid schedule date for the process definition: id:{},date:{}",
- processDefineId, schedule);
- }
- } else {
- command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- return processService.createCommand(command);
- }
-
- return 0;
- }
-
- /**
- * check result and auth
- */
- private Map<String, Object> checkResultAndAuth(User loginUser, String projectName, Project project) {
- // check project auth
- Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
- Status status = (Status) checkResult.get(Constants.STATUS);
- if (status != Status.SUCCESS) {
- return checkResult;
- }
- return null;
- }
-
+ Map<String, Object> startCheckByProcessDefinedId(int processDefineId);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
similarity index 94%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
index 2ca9cbe..cdb5197 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
@@ -15,9 +15,11 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.api.service;
+package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.DataSourceService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
@@ -30,7 +32,6 @@ import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.datasource.OracleDataSource;
import org.apache.dolphinscheduler.dao.entity.DataSource;
-import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
@@ -56,12 +57,12 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
- * datasource service
+ * datasource service impl
*/
@Service
-public class DataSourceService extends BaseService {
+public class DataSourceServiceImpl extends BaseService implements DataSourceService {
- private static final Logger logger = LoggerFactory.getLogger(DataSourceService.class);
+ private static final Logger logger = LoggerFactory.getLogger(DataSourceServiceImpl.class);
public static final String NAME = "name";
public static final String NOTE = "note";
@@ -183,7 +184,7 @@ public class DataSourceService extends BaseService {
private boolean checkName(String name) {
List<DataSource> queryDataSource = dataSourceMapper.queryDataSourceByName(name.trim());
- return queryDataSource != null && queryDataSource.size() > 0;
+ return queryDataSource != null && !queryDataSource.isEmpty();
}
/**
@@ -194,7 +195,7 @@ public class DataSourceService extends BaseService {
*/
public Map<String, Object> queryDataSource(int id) {
- Map<String, Object> result = new HashMap<String, Object>(5);
+ Map<String, Object> result = new HashMap<>();
DataSource dataSource = dataSourceMapper.selectById(id);
if (dataSource == null) {
putMsg(result, Status.RESOURCE_NOT_EXIST);
@@ -247,7 +248,7 @@ public class DataSourceService extends BaseService {
break;
}
- Map<String, String> otherMap = new LinkedHashMap<String, String>();
+ Map<String, String> otherMap = new LinkedHashMap<>();
if (other != null) {
String[] configs = other.split(separator);
for (String config : configs) {
@@ -256,7 +257,7 @@ public class DataSourceService extends BaseService {
}
- Map<String, Object> map = new HashMap<>(10);
+ Map<String, Object> map = new HashMap<>();
map.put(NAME, dataSourceName);
map.put(NOTE, desc);
map.put(TYPE, dataSourceType);
@@ -289,8 +290,8 @@ public class DataSourceService extends BaseService {
*/
public Map<String, Object> queryDataSourceListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>();
- IPage<DataSource> dataSourceList = null;
- Page<DataSource> dataSourcePage = new Page(pageNo, pageSize);
+ IPage<DataSource> dataSourceList;
+ Page<DataSource> dataSourcePage = new Page<>(pageNo, pageSize);
if (isAdmin(loginUser)) {
dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, 0, searchVal);
@@ -300,7 +301,7 @@ public class DataSourceService extends BaseService {
List<DataSource> dataSources = dataSourceList != null ? dataSourceList.getRecords() : new ArrayList<>();
handlePasswd(dataSources);
- PageInfo pageInfo = new PageInfo<Resource>(pageNo, pageSize);
+ PageInfo<DataSource> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotalCount((int) (dataSourceList != null ? dataSourceList.getTotal() : 0L));
pageInfo.setLists(dataSources);
result.put(Constants.DATA_LIST, pageInfo);
@@ -315,18 +316,24 @@ public class DataSourceService extends BaseService {
* @param dataSourceList
*/
private void handlePasswd(List<DataSource> dataSourceList) {
-
for (DataSource dataSource : dataSourceList) {
-
String connectionParams = dataSource.getConnectionParams();
ObjectNode object = JSONUtils.parseObject(connectionParams);
- object.put(Constants.PASSWORD, Constants.XXXXXX);
+ object.put(Constants.PASSWORD, getHiddenPassword());
dataSource.setConnectionParams(object.toString());
-
}
}
/**
+ * get hidden password (resolve the security hotspot)
+ *
+ * @return hidden password
+ */
+ private String getHiddenPassword() {
+ return Constants.XXXXXX;
+ }
+
+ /**
* query data resource list
*
* @param loginUser login user
@@ -359,8 +366,7 @@ public class DataSourceService extends BaseService {
public Result<Object> verifyDataSourceName(String name) {
Result<Object> result = new Result<>();
List<DataSource> dataSourceList = dataSourceMapper.queryDataSourceByName(name);
- if (dataSourceList != null && dataSourceList.size() > 0) {
- logger.error("datasource name:{} has exist, can't create again.", name);
+ if (dataSourceList != null && !dataSourceList.isEmpty()) {
putMsg(result, Status.DATASOURCE_EXIST);
} else {
putMsg(result, Status.SUCCESS);
@@ -431,7 +437,7 @@ public class DataSourceService extends BaseService {
String javaSecurityKrb5Conf, String loginUserKeytabUsername, String loginUserKeytabPath) {
String address = buildAddress(type, host, port, connectType);
- Map<String, Object> parameterMap = new LinkedHashMap<String, Object>(6);
+ Map<String, Object> parameterMap = new LinkedHashMap<>();
String jdbcUrl;
if (DbType.SQLSERVER == type) {
jdbcUrl = address + ";databaseName=" + database;
@@ -590,13 +596,13 @@ public class DataSourceService extends BaseService {
List<DataSource> resultList = new ArrayList<>();
List<DataSource> datasourceList = dataSourceMapper.queryDatasourceExceptUserId(userId);
Set<DataSource> datasourceSet = null;
- if (datasourceList != null && datasourceList.size() > 0) {
+ if (datasourceList != null && !datasourceList.isEmpty()) {
datasourceSet = new HashSet<>(datasourceList);
List<DataSource> authedDataSourceList = dataSourceMapper.queryAuthedDatasource(userId);
Set<DataSource> authedDataSourceSet = null;
- if (authedDataSourceList != null && authedDataSourceList.size() > 0) {
+ if (authedDataSourceList != null && !authedDataSourceList.isEmpty()) {
authedDataSourceSet = new HashSet<>(authedDataSourceList);
datasourceSet.removeAll(authedDataSourceSet);
@@ -632,16 +638,6 @@ public class DataSourceService extends BaseService {
/**
* get host and port by address
*
- * @param address address
- * @return sting array: [host,port]
- */
- private String[] getHostsAndPort(String address) {
- return getHostsAndPort(address, Constants.DOUBLE_SLASH);
- }
-
- /**
- * get host and port by address
- *
* @param address address
* @param separator separator
* @return sting array: [host,port]
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
similarity index 97%
copy from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 77be0a0..0dff163 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.api.service;
+package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
@@ -26,6 +26,10 @@ import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.BaseService;
+import org.apache.dolphinscheduler.api.service.ExecutorService;
+import org.apache.dolphinscheduler.api.service.MonitorService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@@ -53,7 +57,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
-import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -67,12 +70,12 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
- * executor service
+ * executor service impl
*/
@Service
-public class ExecutorService extends BaseService {
+public class ExecutorServiceImpl extends BaseService implements ExecutorService {
- private static final Logger logger = LoggerFactory.getLogger(ExecutorService.class);
+ private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceImpl.class);
@Autowired
private ProjectMapper projectMapper;
@@ -113,7 +116,6 @@ public class ExecutorService extends BaseService {
* @param timeout timeout
* @param startParams the global param values which pass to new process instance
* @return execute process instance code
- * @throws ParseException Parse Exception
*/
public Map<String, Object> execProcessInstance(User loginUser, String projectName,
int processDefinitionId, String cronTime, CommandType commandType,
@@ -121,7 +123,7 @@ public class ExecutorService extends BaseService {
TaskDependType taskDependType, WarningType warningType, int warningGroupId,
RunMode runMode,
Priority processInstancePriority, String workerGroup, Integer timeout,
- Map<String, String> startParams) throws ParseException {
+ Map<String, String> startParams) {
Map<String, Object> result = new HashMap<>();
// timeout is invalid
if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
@@ -181,7 +183,7 @@ public class ExecutorService extends BaseService {
List<Server> masterServers = monitorService.getServerListFromZK(true);
// no master
- if (masterServers.size() == 0) {
+ if (masterServers.isEmpty()) {
putMsg(result, Status.MASTER_NOT_EXISTS);
return false;
}
@@ -297,7 +299,6 @@ public class ExecutorService extends BaseService {
* @return true if tenant suitable, otherwise return false
*/
private boolean checkTenantSuitable(ProcessDefinition processDefinition) {
- // checkTenantExists();
Tenant tenant = processService.getTenantForProcess(processDefinition.getTenantId(),
processDefinition.getUserId());
return tenant != null;
@@ -465,7 +466,7 @@ public class ExecutorService extends BaseService {
String startNodeList, String schedule, WarningType warningType,
int executorId, int warningGroupId,
RunMode runMode, Priority processInstancePriority, String workerGroup,
- Map<String, String> startParams) throws ParseException {
+ Map<String, String> startParams) {
/**
* instantiate command schedule instance
@@ -552,8 +553,7 @@ public class ExecutorService extends BaseService {
}
}
} else {
- logger.error("there is not valid schedule date for the process definition: id:{},date:{}",
- processDefineId, schedule);
+ logger.error("there is not valid schedule date for the process definition: id:{}", processDefineId);
}
} else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
index 13eb1b9..8b342d1 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.impl.DataSourceServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbConnectType;
@@ -57,7 +58,7 @@ public class DataSourceServiceTest {
@InjectMocks
- private DataSourceService dataSourceService;
+ private DataSourceServiceImpl dataSourceService;
@Mock
private DataSourceMapper dataSourceMapper;
@Mock
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
index 3f25fb8..c9d2893 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -65,7 +66,7 @@ import org.mockito.junit.MockitoJUnitRunner;
public class ExecutorService2Test {
@InjectMocks
- private ExecutorService executorService;
+ private ExecutorServiceImpl executorService;
@Mock
private ProcessService processService;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index 57cd207..fa60c78 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.junit.Assert;
import org.junit.Ignore;
@@ -39,7 +40,7 @@ public class ExecutorServiceTest {
private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTest.class);
@Autowired
- private ExecutorService executorService;
+ private ExecutorServiceImpl executorService;
@Ignore
@Test
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index b9065ec..fe6c7fd 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -260,8 +260,8 @@ public class ProcessService {
* @param command command
* @return create command result
*/
- public Boolean verifyIsNeedCreateCommand(Command command) {
- Boolean isNeedCreate = true;
+ public boolean verifyIsNeedCreateCommand(Command command) {
+ boolean isNeedCreate = true;
EnumMap<CommandType, Integer> cmdTypeMap = new EnumMap<>(CommandType.class);
cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);