You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/02/10 13:00:46 UTC
[incubator-dolphinscheduler] branch refactor-architecture updated:
Refactor architecture (#1926)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-architecture
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-architecture by this push:
new 29f2b41 Refactor architecture (#1926)
29f2b41 is described below
commit 29f2b41ed0444e85a336b5988ab17b8b1f20dd28
Author: Tboy <gu...@immomo.com>
AuthorDate: Mon Feb 10 21:00:39 2020 +0800
Refactor architecture (#1926)
* move version to parent pom
* move version properties to parent pom for easy management
* remove freemarker dependency
* delete CombinedApplicationServer
* #1871 correct spelling
* #1873 some updates for TaskQueueZkImpl
* #1875 remove unused properties in pom
* #1878
1. remove tomcat dependency
2. remove combined_logback.xml in api module
3. format pom.xml for not aligning
* #1885 fix api server startup failure
1. add jsp-2.1 dependency
2. remove jasper-runtime dependency
* add stringutils ut (#1921)
* add stringutils ut
* Newfeature for #1675. (#1908)
Continue to finish the rest works, add the cache feature for dependence,mr,python,sub_process,procedure and shell.
* Add modify user name for process definition (#1919)
* class overrides equals() and should therefore also override hashCode()
* #1862 add modify user in process difinition list
* #1862 add pg-1.2.2 ddl.sql
* modify ScriptRunnerTest
* add updateProessDifinition UT
* modify updateProcessDifinition UT
* modify updateProcessDifinition UT
* modify mysql 1.2.2 ddl.sql&dml.sql
* add scope test to mysql in pom
* modify pg-1.2.2 ddl.sql
* refactor module
* updates
Co-authored-by: khadgarmage <kh...@outlook.com>
Co-authored-by: zhukai <bo...@qq.com>
Co-authored-by: Yelli <am...@my.com>
---
.../dolphinscheduler/alert/utils/MailUtils.java | 8 +-
dolphinscheduler-api/pom.xml | 6 -
.../controller/ProcessDefinitionController.java | 16 +--
.../api/controller/ProcessInstanceController.java | 14 +--
.../api/service/DataAnalysisService.java | 10 +-
.../api/service/ExecutorService.java | 34 +++---
.../api/service/LoggerService.java | 8 +-
.../api/service/ProcessDefinitionService.java | 10 +-
.../api/service/ProcessInstanceService.java | 44 ++++----
.../api/service/ResourcesService.java | 2 +-
.../api/service/SchedulerService.java | 22 ++--
.../api/service/TaskInstanceService.java | 4 +-
.../api/utils/ZookeeperMonitor.java | 4 +-
.../api/service/ExecutorService2Test.java | 28 ++---
.../api/service/ProcessDefinitionServiceTest.java | 27 +++++
dolphinscheduler-common/pom.xml | 16 +--
.../common/utils/ScriptRunner.java | 4 +-
.../dolphinscheduler/common/utils/StringUtils.java | 125 +--------------------
.../common/utils/ScriptRunnerTest.java | 4 +-
.../common/utils/StringUtilsTest.java | 66 +++++++++++
dolphinscheduler-dao/pom.xml | 15 ---
.../dao/entity/ProcessDefinition.java | 48 +++++---
.../server/master/MasterServer.java | 14 +--
.../master/runner/MasterBaseTaskExecThread.java | 16 +--
.../server/master/runner/MasterExecThread.java | 54 ++++-----
.../master/runner/MasterSchedulerThread.java | 20 ++--
.../server/master/runner/MasterTaskExecThread.java | 10 +-
.../master/runner/SubProcessTaskExecThread.java | 16 +--
.../server/monitor/ZKMonitorImpl.java | 2 +-
.../server/utils/RemoveZKNode.java | 2 +-
.../server/worker/WorkerServer.java | 20 ++--
.../server/worker/runner/FetchTaskThread.java | 28 ++---
.../server/worker/runner/TaskScheduleThread.java | 26 ++---
.../worker/task/AbstractCommandExecutor.java | 20 ++--
.../server/worker/task/AbstractYarnTask.java | 10 +-
.../server/worker/task/datax/DataxTask.java | 16 +--
.../worker/task/dependent/DependentExecute.java | 14 +--
.../worker/task/dependent/DependentTask.java | 10 +-
.../server/worker/task/flink/FlinkTask.java | 2 +-
.../server/worker/task/http/HttpTask.java | 10 +-
.../worker/task/processdure/ProcedureTask.java | 10 +-
.../server/worker/task/python/PythonTask.java | 10 +-
.../server/worker/task/shell/ShellTask.java | 8 +-
.../server/worker/task/sql/SqlTask.java | 24 ++--
.../dolphinscheduler/server/zk/ZKMasterClient.java | 18 +--
.../dolphinscheduler/server/zk/ZKWorkerClient.java | 2 +-
.../server/master/MasterExecThreadTest.java | 16 +--
.../worker/shell/ShellCommandExecutorTest.java | 8 +-
.../server/worker/sql/SqlExecutorTest.java | 8 +-
.../server/worker/task/datax/DataxTaskTest.java | 16 +--
dolphinscheduler-service/pom.xml | 41 +++++--
.../service/MasterResponseCommand.java | 55 ---------
.../service/WorkerRequestCommand.java | 58 ----------
.../service}/permission/PermissionCheck.java | 36 +++---
.../service/process/ProcessService.java | 14 +--
.../service}/quartz/DruidConnectionProvider.java | 3 +-
.../service}/quartz/ProcessScheduleJob.java | 27 +++--
.../service}/quartz/QuartzExecutors.java | 4 +-
.../service/quartz}/cron/AbstractCycle.java | 4 +-
.../service/quartz}/cron/CronUtils.java | 10 +-
.../service/quartz}/cron/CycleFactory.java | 4 +-
.../service/quartz}/cron/CycleLinks.java | 4 +-
.../service}/queue/ITaskQueue.java | 2 +-
.../service}/queue/TaskQueueFactory.java | 4 +-
.../service}/queue/TaskQueueZkImpl.java | 4 +-
.../service/worker/WorkerClientService.java | 107 ------------------
.../service}/zk/AbstractZKClient.java | 5 +-
.../service}/zk/DefaultEnsembleProvider.java | 2 +-
.../service}/zk/ZookeeperCachedOperator.java | 7 +-
.../service}/zk/ZookeeperConfig.java | 2 +-
.../service}/zk/ZookeeperOperator.java | 4 +-
.../src/main/resources/quartz.properties | 2 +-
.../src/test/java}/cron/CronUtilsTest.java | 8 +-
.../src/test/java}/queue/BaseTaskQueueTest.java | 5 +-
.../src/test/java}/queue/TaskQueueZKImplTest.java | 2 +-
.../src/test/java/queue}/ZKServer.java | 2 +-
.../home/pages/dag/_source/formModel/formModel.vue | 20 +++-
.../dag/_source/formModel/tasks/dependent.vue | 17 ++-
.../pages/dag/_source/formModel/tasks/flink.vue | 2 +-
.../home/pages/dag/_source/formModel/tasks/mr.vue | 29 +++++
.../dag/_source/formModel/tasks/procedure.vue | 25 ++++-
.../pages/dag/_source/formModel/tasks/python.vue | 32 +++++-
.../pages/dag/_source/formModel/tasks/shell.vue | 34 +++++-
.../pages/dag/_source/formModel/tasks/spark.vue | 2 +-
.../dag/_source/formModel/tasks/sub_process.vue | 10 +-
.../pages/definition/pages/list/_source/list.vue | 7 ++
.../src/js/module/i18n/locale/en_US.js | 1 +
.../src/js/module/i18n/locale/zh_CN.js | 1 +
pom.xml | 1 +
sql/dolphinscheduler-postgre.sql | 1 +
sql/dolphinscheduler_mysql.sql | 1 +
.../1.2.2_schema/mysql/dolphinscheduler_ddl.sql | 44 ++++----
.../1.2.2_schema/mysql/dolphinscheduler_dml.sql | 29 +----
.../postgresql/dolphinscheduler_ddl.sql | 43 +++----
.../postgresql/dolphinscheduler_dml.sql | 29 +----
95 files changed, 753 insertions(+), 916 deletions(-)
diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java
index 7ebe6a7..99efdc8 100644
--- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java
+++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java
@@ -90,14 +90,14 @@ public class MailUtils {
public static Map<String,Object> sendMails(Collection<String> receivers, Collection<String> receiversCc, String title, String content, ShowType showType) {
Map<String,Object> retMap = new HashMap<>();
retMap.put(Constants.STATUS, false);
-
+
// if there is no receivers && no receiversCc, no need to process
if (CollectionUtils.isEmpty(receivers) && CollectionUtils.isEmpty(receiversCc)) {
return retMap;
}
receivers.removeIf(StringUtils::isEmpty);
-
+
if (showType == ShowType.TABLE || showType == ShowType.TEXT){
// send email
HtmlEmail email = new HtmlEmail();
@@ -335,7 +335,7 @@ public class MailUtils {
*/
private static void handleException(Collection<String> receivers, Map<String, Object> retMap, Exception e) {
logger.error("Send email to {} failed {}", receivers, e);
- retMap.put(Constants.MESSAGE, "Send email to {" + StringUtils.join(receivers, ",") + "} failed," + e.toString());
+ retMap.put(Constants.MESSAGE, "Send email to {" + String.join(",", receivers) + "} failed," + e.toString());
}
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-api/pom.xml b/dolphinscheduler-api/pom.xml
index 11b23d9..6440805 100644
--- a/dolphinscheduler-api/pom.xml
+++ b/dolphinscheduler-api/pom.xml
@@ -31,12 +31,6 @@
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-alert</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.apache.dolphinscheduler</groupId>
- <artifactId>dolphinscheduler-dao</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
index de9cc12..c07ecf9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
@@ -53,7 +53,7 @@ public class ProcessDefinitionController extends BaseController{
/**
* create process definition
- *
+ *
* @param loginUser login user
* @param projectName project name
* @param name process definition name
@@ -96,7 +96,7 @@ public class ProcessDefinitionController extends BaseController{
/**
* verify process definition name unique
- *
+ *
* @param loginUser login user
* @param projectName project name
* @param name name
@@ -328,9 +328,9 @@ public class ProcessDefinitionController extends BaseController{
/**
- *
+ *
* get tasks list by process definition id
- *
+ *
*
* @param loginUser login user
* @param projectName project name
@@ -442,7 +442,7 @@ public class ProcessDefinitionController extends BaseController{
loginUser.getUserName(), projectName, processDefinitionIds);
Map<String, Object> result = new HashMap<>(5);
- List<Integer> deleteFailedIdList = new ArrayList<Integer>();
+ List<String> deleteFailedIdList = new ArrayList<>();
if(StringUtils.isNotEmpty(processDefinitionIds)){
String[] processDefinitionIdArray = processDefinitionIds.split(",");
@@ -451,17 +451,17 @@ public class ProcessDefinitionController extends BaseController{
try {
Map<String, Object> deleteResult = processDefinitionService.deleteProcessDefinitionById(loginUser, projectName, processDefinitionId);
if(!Status.SUCCESS.equals(deleteResult.get(Constants.STATUS))){
- deleteFailedIdList.add(processDefinitionId);
+ deleteFailedIdList.add(strProcessDefinitionId);
logger.error((String)deleteResult.get(Constants.MSG));
}
} catch (Exception e) {
- deleteFailedIdList.add(processDefinitionId);
+ deleteFailedIdList.add(strProcessDefinitionId);
}
}
}
if(!deleteFailedIdList.isEmpty()){
- putMsg(result, Status.BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR,StringUtils.join(deleteFailedIdList,","));
+ putMsg(result, Status.BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR, String.join(",", deleteFailedIdList));
}else{
putMsg(result, Status.SUCCESS);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
index 542aad5..80db6c8 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
@@ -22,12 +22,12 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.queue.ITaskQueue;
-import org.apache.dolphinscheduler.common.queue.TaskQueueFactory;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import io.swagger.annotations.*;
+import org.apache.dolphinscheduler.service.queue.ITaskQueue;
+import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -58,7 +58,7 @@ public class ProcessInstanceController extends BaseController{
/**
* query process instance list paging
- *
+ *
* @param loginUser login user
* @param projectName project name
* @param pageNo page number
@@ -372,7 +372,7 @@ public class ProcessInstanceController extends BaseController{
// task queue
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
Map<String, Object> result = new HashMap<>(5);
- List<Integer> deleteFailedIdList = new ArrayList<Integer>();
+ List<String> deleteFailedIdList = new ArrayList<>();
if(StringUtils.isNotEmpty(processInstanceIds)){
String[] processInstanceIdArray = processInstanceIds.split(",");
@@ -381,16 +381,16 @@ public class ProcessInstanceController extends BaseController{
try {
Map<String, Object> deleteResult = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue);
if(!Status.SUCCESS.equals(deleteResult.get(Constants.STATUS))){
- deleteFailedIdList.add(processInstanceId);
+ deleteFailedIdList.add(strProcessInstanceId);
logger.error((String)deleteResult.get(Constants.MSG));
}
} catch (Exception e) {
- deleteFailedIdList.add(processInstanceId);
+ deleteFailedIdList.add(strProcessInstanceId);
}
}
}
if(deleteFailedIdList.size() > 0){
- putMsg(result, Status.BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR,StringUtils.join(deleteFailedIdList,","));
+ putMsg(result, Status.BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR, String.join(",", deleteFailedIdList));
}else{
putMsg(result, Status.SUCCESS);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
index f4becbe..0743ef9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
@@ -24,13 +24,13 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.UserType;
-import org.apache.dolphinscheduler.common.queue.ITaskQueue;
-import org.apache.dolphinscheduler.common.queue.TaskQueueFactory;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.ITaskQueue;
+import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -69,7 +69,7 @@ public class DataAnalysisService extends BaseService{
TaskInstanceMapper taskInstanceMapper;
@Autowired
- ProcessDao processDao;
+ ProcessService processService;
/**
* statistical task instance status data
@@ -296,7 +296,7 @@ public class DataAnalysisService extends BaseService{
if(projectId !=0){
projectIds.add(projectId);
}else if(loginUser.getUserType() == UserType.GENERAL_USER){
- projectIds = processDao.getProjectIdListHavePerm(loginUser.getId());
+ projectIds = processService.getProjectIdListHavePerm(loginUser.getId());
if(projectIds.size() ==0 ){
projectIds.add(0);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index 257f15d..6edd48d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -25,12 +25,12 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.apache.dolphinscheduler.dao.utils.cron.CronUtils;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -67,7 +67,7 @@ public class ExecutorService extends BaseService{
@Autowired
- private ProcessDao processDao;
+ private ProcessService processService;
/**
* execute process instance
@@ -186,13 +186,13 @@ public class ExecutorService extends BaseService{
return checkResult;
}
- ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processInstanceId);
+ ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
if (processInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
- ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
+ ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
if(executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE){
result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionId());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
@@ -227,7 +227,7 @@ public class ExecutorService extends BaseService{
} else {
processInstance.setCommandType(CommandType.STOP);
processInstance.addHistoryCmd(CommandType.STOP);
- processDao.updateProcessInstance(processInstance);
+ processService.updateProcessInstance(processInstance);
result = updateProcessInstanceState(processInstanceId, ExecutionStatus.READY_STOP);
}
break;
@@ -237,7 +237,7 @@ public class ExecutorService extends BaseService{
} else {
processInstance.setCommandType(CommandType.PAUSE);
processInstance.addHistoryCmd(CommandType.PAUSE);
- processDao.updateProcessInstance(processInstance);
+ processService.updateProcessInstance(processInstance);
result = updateProcessInstanceState(processInstanceId, ExecutionStatus.READY_PAUSE);
}
break;
@@ -257,7 +257,7 @@ public class ExecutorService extends BaseService{
*/
private boolean checkTenantSuitable(ProcessDefinition processDefinition) {
// checkTenantExists();
- Tenant tenant = processDao.getTenantForProcess(processDefinition.getTenantId(),
+ Tenant tenant = processService.getTenantForProcess(processDefinition.getTenantId(),
processDefinition.getUserId());
if(tenant == null){
return false;
@@ -319,7 +319,7 @@ public class ExecutorService extends BaseService{
private Map<String, Object> updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
Map<String, Object> result = new HashMap<>(5);
- int update = processDao.updateProcessInstanceState(processInstanceId, executionStatus);
+ int update = processService.updateProcessInstanceState(processInstanceId, executionStatus);
if (update > 0) {
putMsg(result, Status.SUCCESS);
} else {
@@ -347,12 +347,12 @@ public class ExecutorService extends BaseService{
CMDPARAM_RECOVER_PROCESS_ID_STRING, instanceId));
command.setExecutorId(loginUser.getId());
- if(!processDao.verifyIsNeedCreateCommand(command)){
+ if(!processService.verifyIsNeedCreateCommand(command)){
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND,processDefinitionId);
return result;
}
- int create = processDao.createCommand(command);
+ int create = processService.createCommand(command);
if (create > 0) {
putMsg(result, Status.SUCCESS);
@@ -376,7 +376,7 @@ public class ExecutorService extends BaseService{
putMsg(result,Status.REQUEST_PARAMS_NOT_VALID_ERROR,"process definition id");
}
List<Integer> ids = new ArrayList<>();
- processDao.recurseFindSubProcessId(processDefineId, ids);
+ processService.recurseFindSubProcessId(processDefineId, ids);
Integer[] idArray = ids.toArray(new Integer[ids.size()]);
if (ids.size() > 0){
List<ProcessDefinition> processDefinitionList;
@@ -506,9 +506,9 @@ public class ExecutorService extends BaseService{
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
command.setCommandParam(JSONUtils.toJson(cmdParam));
- return processDao.createCommand(command);
+ return processService.createCommand(command);
}else if (runMode == RunMode.RUN_MODE_PARALLEL){
- List<Schedule> schedules = processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefineId);
+ List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefineId);
List<Date> listDate = new LinkedList<>();
if(!CollectionUtils.isEmpty(schedules)){
for (Schedule item : schedules) {
@@ -521,7 +521,7 @@ public class ExecutorService extends BaseService{
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(date));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(date));
command.setCommandParam(JSONUtils.toJson(cmdParam));
- processDao.createCommand(command);
+ processService.createCommand(command);
}
return listDate.size();
}else{
@@ -532,7 +532,7 @@ public class ExecutorService extends BaseService{
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start));
command.setCommandParam(JSONUtils.toJson(cmdParam));
- processDao.createCommand(command);
+ processService.createCommand(command);
start = DateUtils.getSomeDay(start, 1);
}
return runCunt;
@@ -544,7 +544,7 @@ public class ExecutorService extends BaseService{
}
}else{
command.setCommandParam(JSONUtils.toJson(cmdParam));
- return processDao.createCommand(command);
+ return processService.createCommand(command);
}
return 0;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
index 108d5d4..3a9ff11 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
@@ -20,9 +20,9 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.log.LogClientService;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -37,7 +37,7 @@ public class LoggerService {
private static final Logger logger = LoggerFactory.getLogger(LoggerService.class);
@Autowired
- private ProcessDao processDao;
+ private ProcessService processService;
/**
* view log
@@ -49,7 +49,7 @@ public class LoggerService {
*/
public Result queryLog(int taskInstId, int skipLineNum, int limit) {
- TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId);
+ TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null){
return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg());
@@ -80,7 +80,7 @@ public class LoggerService {
* @return log byte array
*/
public byte[] getLogBytes(int taskInstId) {
- TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId);
+ TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null){
throw new RuntimeException("task instance is null");
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index 5fe708c..844902b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -43,9 +43,9 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -94,7 +94,7 @@ public class ProcessDefinitionService extends BaseDAGService {
private ScheduleMapper scheduleMapper;
@Autowired
- private ProcessDao processDao;
+ private ProcessService processService;
@Autowired
private WorkerGroupMapper workerGroupMapper;
@@ -143,6 +143,7 @@ public class ProcessDefinitionService extends BaseDAGService {
processDefine.setConnects(connects);
processDefine.setTimeout(processData.getTimeout());
processDefine.setTenantId(processData.getTenantId());
+ processDefine.setModifyBy(loginUser.getUserName());
//custom global params
List<Property> globalParamsList = processData.getGlobalParams();
@@ -282,7 +283,7 @@ public class ProcessDefinitionService extends BaseDAGService {
if ((checkProcessJson.get(Constants.STATUS) != Status.SUCCESS)) {
return checkProcessJson;
}
- ProcessDefinition processDefinition = processDao.findProcessDefineById(id);
+ ProcessDefinition processDefinition = processService.findProcessDefineById(id);
if (processDefinition == null) {
// check process definition exists
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, id);
@@ -295,7 +296,7 @@ public class ProcessDefinitionService extends BaseDAGService {
putMsg(result, Status.SUCCESS);
}
- ProcessDefinition processDefine = processDao.findProcessDefineById(id);
+ ProcessDefinition processDefine = processService.findProcessDefineById(id);
Date now = new Date();
processDefine.setId(id);
@@ -308,6 +309,7 @@ public class ProcessDefinitionService extends BaseDAGService {
processDefine.setConnects(connects);
processDefine.setTimeout(processData.getTimeout());
processDefine.setTenantId(processData.getTenantId());
+ processDefine.setModifyBy(loginUser.getUserName());
//custom global params
List<Property> globalParamsList = new ArrayList<>();
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index 87e1a0e..2b1f04e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -30,15 +30,15 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.Property;
-import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -72,7 +72,7 @@ public class ProcessInstanceService extends BaseDAGService {
ProjectService projectService;
@Autowired
- ProcessDao processDao;
+ ProcessService processService;
@Autowired
ProcessInstanceMapper processInstanceMapper;
@@ -112,7 +112,7 @@ public class ProcessInstanceService extends BaseDAGService {
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
- ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processId);
+ ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
String workerGroupName = "";
if(processInstance.getWorkerGroupId() == -1){
workerGroupName = DEFAULT;
@@ -125,7 +125,7 @@ public class ProcessInstanceService extends BaseDAGService {
}
}
processInstance.setWorkerGroupName(workerGroupName);
- ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
+ ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
processInstance.setReceivers(processDefinition.getReceivers());
processInstance.setReceiversCc(processDefinition.getReceiversCc());
result.put(Constants.DATA_LIST, processInstance);
@@ -228,8 +228,8 @@ public class ProcessInstanceService extends BaseDAGService {
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
- ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processId);
- List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processId);
+ ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
+ List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processId);
AddDependResultForTaskList(taskInstanceList);
Map<String, Object> resultMap = new HashMap<>();
resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString());
@@ -304,7 +304,7 @@ public class ProcessInstanceService extends BaseDAGService {
return checkResult;
}
- TaskInstance taskInstance = processDao.findTaskInstanceById(taskId);
+ TaskInstance taskInstance = processService.findTaskInstanceById(taskId);
if (taskInstance == null) {
putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
return result;
@@ -314,7 +314,7 @@ public class ProcessInstanceService extends BaseDAGService {
return result;
}
- ProcessInstance subWorkflowInstance = processDao.findSubProcessInstance(
+ ProcessInstance subWorkflowInstance = processService.findSubProcessInstance(
taskInstance.getProcessInstanceId(), taskInstance.getId());
if (subWorkflowInstance == null) {
putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId);
@@ -356,7 +356,7 @@ public class ProcessInstanceService extends BaseDAGService {
}
//check process instance exists
- ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processInstanceId);
+ ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
if (processInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
@@ -380,7 +380,7 @@ public class ProcessInstanceService extends BaseDAGService {
String globalParams = null;
String originDefParams = null;
int timeout = processInstance.getTimeout();
- ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
+ ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
if (StringUtils.isNotEmpty(processInstanceJson)) {
ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
//check workflow json is valid
@@ -396,7 +396,7 @@ public class ProcessInstanceService extends BaseDAGService {
processInstance.getCmdTypeIfComplement(), schedule);
timeout = processData.getTimeout();
processInstance.setTimeout(timeout);
- Tenant tenant = processDao.getTenantForProcess(processData.getTenantId(),
+ Tenant tenant = processService.getTenantForProcess(processData.getTenantId(),
processDefinition.getUserId());
if(tenant != null){
processInstance.setTenantCode(tenant.getTenantCode());
@@ -406,7 +406,7 @@ public class ProcessInstanceService extends BaseDAGService {
}
// int update = processDao.updateProcessInstance(processInstanceId, processInstanceJson,
// globalParams, schedule, flag, locations, connects);
- int update = processDao.updateProcessInstance(processInstance);
+ int update = processService.updateProcessInstance(processInstance);
int updateDefine = 1;
if (syncDefine && StringUtils.isNotEmpty(processInstanceJson)) {
processDefinition.setProcessDefinitionJson(processInstanceJson);
@@ -445,7 +445,7 @@ public class ProcessInstanceService extends BaseDAGService {
return checkResult;
}
- ProcessInstance subInstance = processDao.findProcessInstanceDetailById(subId);
+ ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId);
if (subInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, subId);
return result;
@@ -455,7 +455,7 @@ public class ProcessInstanceService extends BaseDAGService {
return result;
}
- ProcessInstance parentWorkflowInstance = processDao.findParentProcessInstance(subId);
+ ProcessInstance parentWorkflowInstance = processService.findParentProcessInstance(subId);
if (parentWorkflowInstance == null) {
putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST);
return result;
@@ -476,7 +476,7 @@ public class ProcessInstanceService extends BaseDAGService {
* @return delete result code
*/
@Transactional(rollbackFor = Exception.class)
- public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId,ITaskQueue tasksQueue) {
+ public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId, ITaskQueue tasksQueue) {
Map<String, Object> result = new HashMap<>(5);
Project project = projectMapper.queryByName(projectName);
@@ -486,8 +486,8 @@ public class ProcessInstanceService extends BaseDAGService {
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
- ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processInstanceId);
- List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstanceId);
+ ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
+ List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId);
if (null == processInstance) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
@@ -512,7 +512,7 @@ public class ProcessInstanceService extends BaseDAGService {
.append(taskInstance.getId())
.append(UNDERLINE);
- int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance);
+ int taskWorkerGroupId = processService.getTaskWorkerGroupId(taskInstance);
WorkerGroup workerGroup = workerGroupMapper.selectById(taskWorkerGroupId);
if(workerGroup == null){
@@ -541,9 +541,9 @@ public class ProcessInstanceService extends BaseDAGService {
}
// delete database cascade
- int delete = processDao.deleteWorkProcessInstanceById(processInstanceId);
- processDao.deleteAllSubWorkProcessByParentId(processInstanceId);
- processDao.deleteWorkProcessMapByParentId(processInstanceId);
+ int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
+ processService.deleteAllSubWorkProcessByParentId(processInstanceId);
+ processService.deleteWorkProcessMapByParentId(processInstanceId);
if (delete > 0) {
putMsg(result, Status.SUCCESS);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
index 09b1d31..29a1644 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
@@ -539,7 +539,7 @@ public class ResourcesService extends BaseService {
putMsg(result, Status.SUCCESS);
Map<String, Object> map = new HashMap<>();
map.put(ALIAS, resource.getAlias());
- map.put(CONTENT, StringUtils.join(content, "\n"));
+ map.put(CONTENT, String.join("\n", content));
result.setData(map);
}else{
logger.error("read file {} not exist in hdfs", hdfsFileName);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
index bdce947..7212210 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
@@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
@@ -34,11 +33,12 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
-import org.apache.dolphinscheduler.dao.utils.cron.CronUtils;
-import org.apache.dolphinscheduler.dao.quartz.ProcessScheduleJob;
-import org.apache.dolphinscheduler.dao.quartz.QuartzExecutors;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
+import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
+import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +68,7 @@ public class SchedulerService extends BaseService {
private MonitorService monitorService;
@Autowired
- private ProcessDao processDao;
+ private ProcessService processService;
@Autowired
private ScheduleMapper scheduleMapper;
@@ -119,7 +119,7 @@ public class SchedulerService extends BaseService {
}
// check work flow define release state
- ProcessDefinition processDefinition = processDao.findProcessDefineById(processDefineId);
+ ProcessDefinition processDefinition = processService.findProcessDefineById(processDefineId);
result = executorService.checkProcessDefinitionValid(processDefinition, processDefineId);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
@@ -221,7 +221,7 @@ public class SchedulerService extends BaseService {
return result;
}
- ProcessDefinition processDefinition = processDao.findProcessDefineById(schedule.getProcessDefinitionId());
+ ProcessDefinition processDefinition = processService.findProcessDefineById(schedule.getProcessDefinitionId());
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionId());
return result;
@@ -321,7 +321,7 @@ public class SchedulerService extends BaseService {
putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
return result;
}
- ProcessDefinition processDefinition = processDao.findProcessDefineById(scheduleObj.getProcessDefinitionId());
+ ProcessDefinition processDefinition = processService.findProcessDefineById(scheduleObj.getProcessDefinitionId());
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionId());
return result;
@@ -338,7 +338,7 @@ public class SchedulerService extends BaseService {
}
// check sub process definition release state
List<Integer> subProcessDefineIds = new ArrayList<>();
- processDao.recurseFindSubProcessId(scheduleObj.getProcessDefinitionId(), subProcessDefineIds);
+ processService.recurseFindSubProcessId(scheduleObj.getProcessDefinitionId(), subProcessDefineIds);
Integer[] idArray = subProcessDefineIds.toArray(new Integer[subProcessDefineIds.size()]);
if (subProcessDefineIds.size() > 0){
List<ProcessDefinition> subProcessDefinitionList =
@@ -423,7 +423,7 @@ public class SchedulerService extends BaseService {
return result;
}
- ProcessDefinition processDefinition = processDao.findProcessDefineById(processDefineId);
+ ProcessDefinition processDefinition = processService.findProcessDefineById(processDefineId);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId);
return result;
@@ -472,7 +472,7 @@ public class SchedulerService extends BaseService {
logger.info("set schedule, project id: {}, scheduleId: {}", projectId, scheduleId);
- Schedule schedule = processDao.querySchedule(scheduleId);
+ Schedule schedule = processService.querySchedule(scheduleId);
if (schedule == null) {
logger.warn("process schedule info not exists");
return;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
index 74afa2a..9690f5c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
@@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -33,6 +32,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -56,7 +56,7 @@ public class TaskInstanceService extends BaseService {
ProjectService projectService;
@Autowired
- ProcessDao processDao;
+ ProcessService processService;
@Autowired
TaskInstanceMapper taskInstanceMapper;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java
index 66f57f6..f91d3bc 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java
@@ -18,9 +18,9 @@ package org.apache.dolphinscheduler.api.utils;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
+import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -34,7 +34,7 @@ import java.util.List;
* monitor zookeeper info
*/
@Component
-public class ZookeeperMonitor extends AbstractZKClient{
+public class ZookeeperMonitor extends AbstractZKClient {
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperMonitor.class);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
index 66c7a3e..07d7477 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
@@ -22,10 +22,10 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -52,7 +52,7 @@ public class ExecutorService2Test {
private ExecutorService executorService;
@Mock
- private ProcessDao processDao;
+ private ProcessService processService;
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
@@ -100,8 +100,8 @@ public class ExecutorService2Test {
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(checkProjectAndAuth());
Mockito.when(processDefinitionMapper.selectById(processDefinitionId)).thenReturn(processDefinition);
- Mockito.when(processDao.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
- Mockito.when(processDao.createCommand(any(Command.class))).thenReturn(1);
+ Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
+ Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
}
/**
@@ -111,7 +111,7 @@ public class ExecutorService2Test {
@Test
public void testNoComplement() throws ParseException {
try {
- Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.START_PROCESS,
null, null,
@@ -119,7 +119,7 @@ public class ExecutorService2Test {
"", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
- verify(processDao, times(1)).createCommand(any(Command.class));
+ verify(processService, times(1)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}
@@ -132,7 +132,7 @@ public class ExecutorService2Test {
@Test
public void testDateError() throws ParseException {
try {
- Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA,
null, null,
@@ -140,7 +140,7 @@ public class ExecutorService2Test {
"", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
- verify(processDao, times(0)).createCommand(any(Command.class));
+ verify(processService, times(0)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}
@@ -153,7 +153,7 @@ public class ExecutorService2Test {
@Test
public void testSerial() throws ParseException {
try {
- Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
@@ -161,7 +161,7 @@ public class ExecutorService2Test {
"", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
- verify(processDao, times(1)).createCommand(any(Command.class));
+ verify(processService, times(1)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}
@@ -174,7 +174,7 @@ public class ExecutorService2Test {
@Test
public void testParallelWithOutSchedule() throws ParseException {
try{
- Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
@@ -182,7 +182,7 @@ public class ExecutorService2Test {
"", "", RunMode.RUN_MODE_PARALLEL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
- verify(processDao, times(31)).createCommand(any(Command.class));
+ verify(processService, times(31)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}
@@ -195,7 +195,7 @@ public class ExecutorService2Test {
@Test
public void testParallelWithSchedule() throws ParseException {
try{
- Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
@@ -203,7 +203,7 @@ public class ExecutorService2Test {
"", "", RunMode.RUN_MODE_PARALLEL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
- verify(processDao, times(15)).createCommand(any(Command.class));
+ verify(processService, times(15)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index b30a919..ccbbc36 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.http.entity.ContentType;
import org.json.JSONException;
import org.junit.Assert;
@@ -77,6 +78,9 @@ public class ProcessDefinitionServiceTest {
@Mock
private WorkerGroupMapper workerGroupMapper;
+ @Mock
+ private ProcessService processService;
+
private String sqlDependentJson = "{\"globalParams\":[]," +
"\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," +
"\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," +
@@ -422,6 +426,27 @@ public class ProcessDefinitionServiceTest {
Assert.assertTrue(deleteFlag);
}
+ @Test
+ public void testUpdateProcessDefinition () {
+ User loginUser = new User();
+ loginUser.setId(1);
+ loginUser.setUserType(UserType.ADMIN_USER);
+
+ Map<String, Object> result = new HashMap<>(5);
+ putMsg(result, Status.SUCCESS);
+
+ String projectName = "project_test1";
+ Project project = getProject(projectName);
+
+ Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
+ Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
+ Mockito.when(processService.findProcessDefineById(1)).thenReturn(getProcessDefinition());
+
+ Map<String, Object> updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectName, 1, "test",
+ sqlDependentJson, "", "", "");
+
+ Assert.assertEquals(Status.UPDATE_PROCESS_DEFINITION_ERROR, updateResult.get(Constants.STATUS));
+ }
/**
* get mock datasource
@@ -443,6 +468,8 @@ public class ProcessDefinitionServiceTest {
processDefinition.setId(46);
processDefinition.setName("testProject");
processDefinition.setProjectId(2);
+ processDefinition.setTenantId(1);
+ processDefinition.setDescription("");
return processDefinition;
}
diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml
index bd2448e..9eb4bf7 100644
--- a/dolphinscheduler-common/pom.xml
+++ b/dolphinscheduler-common/pom.xml
@@ -85,21 +85,7 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-client</artifactId>
- <version>${curator.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j-1.2-api</groupId>
- <artifactId>org.apache.logging.log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
+
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java
index bbc937c..f92839b 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java
@@ -169,13 +169,13 @@ public class ScriptRunner {
if (stopOnError && rs != null) {
ResultSetMetaData md = rs.getMetaData();
int cols = md.getColumnCount();
- for (int i = 0; i < cols; i++) {
+ for (int i = 1; i < cols; i++) {
String name = md.getColumnLabel(i);
logger.info("{} \t", name);
}
logger.info("");
while (rs.next()) {
- for (int i = 0; i < cols; i++) {
+ for (int i = 1; i < cols; i++) {
String value = rs.getString(i);
logger.info("{} \t", value);
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
index 12b75fb..af2817a 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
@@ -16,17 +16,7 @@
*/
package org.apache.dolphinscheduler.common.utils;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
-import java.util.Objects;
-import java.util.regex.Pattern;
-
-
public class StringUtils {
-
- public static final int INDEX_NOT_FOUND = -1;
-
public static final String EMPTY = "";
public static boolean isEmpty(final CharSequence cs) {
@@ -37,119 +27,14 @@ public class StringUtils {
return !isEmpty(cs);
}
- public static boolean isBlank(CharSequence cs){
- int strLen;
- if (cs == null || (strLen = cs.length()) == 0) {
+ public static boolean isBlank(String s){
+ if (isEmpty(s)) {
return true;
}
- for (int i = 0; i < strLen; i++) {
- if (Character.isWhitespace(cs.charAt(i)) == false) {
- return false;
- }
- }
- return true;
- }
-
- public static boolean isNotBlank(CharSequence str){
- return !isBlank(str);
- }
-
- public static String substringBefore(final String str, final String separator) {
- if (isBlank(str) || separator == null) {
- return str;
- }
- if (separator.isEmpty()) {
- return EMPTY;
- }
- final int pos = str.indexOf(separator);
- if (pos == INDEX_NOT_FOUND) {
- return str;
- }
- return str.substring(0, pos);
- }
-
- public static String substringAfter(final String str, final String separator) {
- if (isBlank(str)) {
- return str;
- }
- if (separator == null) {
- return EMPTY;
- }
- final int pos = str.indexOf(separator);
- if (pos == INDEX_NOT_FOUND) {
- return EMPTY;
- }
- return str.substring(pos + separator.length());
+ return s.trim().length() == 0;
}
- public static String substringAfterLast(final String str, final String separator) {
- if (isEmpty(str)) {
- return str;
- }
- if (isEmpty(separator)) {
- return EMPTY;
- }
- final int pos = str.lastIndexOf(separator);
- if (pos == INDEX_NOT_FOUND || pos == str.length() - separator.length()) {
- return EMPTY;
- }
- return str.substring(pos + separator.length());
- }
-
- public static String getUtf8String(byte[] bytes){
- return new String(bytes, StandardCharsets.UTF_8);
- }
-
- public static byte[] getUtf8Bytes(String str){
- return str.getBytes(StandardCharsets.UTF_8);
- }
-
- public static boolean hasChinese(String str) {
- if (str == null) {
- return false;
- }
- Pattern pattern = Pattern.compile("[\\u4E00-\\u9FBF]+");
- return pattern.matcher(str).find();
- }
-
- public static boolean hasSpace(String str) {
- if (str == null) {
- return false;
- }
- int len = str.length();
- for (int i = 0; i < len; i++) {
- if (str.charAt(i) == ' ') {
- return true;
- }
- }
- return false;
- }
-
- public static String join(final Iterable<?> iterable, final String separator){
- Iterator<?> iterator = iterable.iterator();
- if (iterator == null) {
- return null;
- }
- if (!iterator.hasNext()) {
- return EMPTY;
- }
- final Object first = iterator.next();
- if (!iterable.iterator().hasNext()) {
- return Objects.toString(first, "");
- }
- final StringBuilder buf = new StringBuilder(64);
- if (first != null) {
- buf.append(first);
- }
- while (iterator.hasNext()) {
- if (separator != null) {
- buf.append(separator);
- }
- final Object obj = iterator.next();
- if (obj != null) {
- buf.append(obj);
- }
- }
- return buf.toString();
+ public static boolean isNotBlank(String s){
+ return !isBlank(s);
}
}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ScriptRunnerTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ScriptRunnerTest.java
index 0eb1cce..155d52a 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ScriptRunnerTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ScriptRunnerTest.java
@@ -48,7 +48,7 @@ public class ScriptRunnerTest {
Mockito.when(st.getResultSet()).thenReturn(rs);
ResultSetMetaData md = Mockito.mock(ResultSetMetaData.class);
Mockito.when(rs.getMetaData()).thenReturn(md);
- Mockito.when(md.getColumnCount()).thenReturn(1);
+ Mockito.when(md.getColumnCount()).thenReturn(2);
Mockito.when(rs.next()).thenReturn(true, false);
ScriptRunner s = new ScriptRunner(conn, true, true);
if (dbName.isEmpty()) {
@@ -56,7 +56,7 @@ public class ScriptRunnerTest {
} else {
s.runScript(new StringReader("select 1;"), dbName);
}
- Mockito.verify(md).getColumnLabel(0);
+ Mockito.verify(md).getColumnLabel(1);
} catch(Exception e) {
Assert.assertNotNull(e);
}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StringUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StringUtilsTest.java
new file mode 100644
index 0000000..947e731
--- /dev/null
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StringUtilsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.common.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+public class StringUtilsTest {
+ @Test
+ public void testIsNotEmpty() {
+ //null string
+ boolean b = StringUtils.isNotEmpty(null);
+ Assert.assertFalse(b);
+
+ //"" string
+ b = StringUtils.isNotEmpty("");
+ Assert.assertFalse(b);
+
+ //" " string
+ b = StringUtils.isNotEmpty(" ");
+ Assert.assertTrue(b);
+
+ //"test" string
+ b = StringUtils.isNotEmpty("test");
+ Assert.assertTrue(b);
+ }
+
+ @Test
+ public void testIsNotBlank() {
+ //null string
+ boolean b = StringUtils.isNotBlank(null);
+ Assert.assertFalse(b);
+
+ //"" string
+ b = StringUtils.isNotBlank("");
+ Assert.assertFalse(b);
+
+ //" " string
+ b = StringUtils.isNotBlank(" ");
+ Assert.assertFalse(b);
+
+ //" test " string
+ b = StringUtils.isNotBlank(" test ");
+ Assert.assertTrue(b);
+
+ //"test" string
+ b = StringUtils.isNotBlank("test");
+ Assert.assertTrue(b);
+ }
+}
diff --git a/dolphinscheduler-dao/pom.xml b/dolphinscheduler-dao/pom.xml
index b3b22c1..20d1941 100644
--- a/dolphinscheduler-dao/pom.xml
+++ b/dolphinscheduler-dao/pom.xml
@@ -116,21 +116,6 @@
<artifactId>cron-utils</artifactId>
</dependency>
- <dependency>
- <groupId>org.quartz-scheduler</groupId>
- <artifactId>quartz</artifactId>
- <exclusions>
- <exclusion>
- <artifactId>c3p0</artifactId>
- <groupId>c3p0</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.quartz-scheduler</groupId>
- <artifactId>quartz-jobs</artifactId>
- </dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
index cd0494e..6e7ea7d 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
@@ -29,6 +29,7 @@ import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.stream.Collectors;
@@ -158,6 +159,11 @@ public class ProcessDefinition {
*/
private int tenantId;
+ /**
+ * modify user name
+ */
+ private String modifyBy;
+
public String getName() {
return name;
@@ -337,6 +343,30 @@ public class ProcessDefinition {
this.timeout = timeout;
}
+ public int getTenantId() {
+ return tenantId;
+ }
+
+ public void setTenantId(int tenantId) {
+ this.tenantId = tenantId;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public String getModifyBy() {
+ return modifyBy;
+ }
+
+ public void setModifyBy(String modifyBy) {
+ this.modifyBy = modifyBy;
+ }
+
@Override
public String toString() {
return "ProcessDefinition{" +
@@ -346,6 +376,7 @@ public class ProcessDefinition {
", releaseState=" + releaseState +
", projectId=" + projectId +
", processDefinitionJson='" + processDefinitionJson + '\'' +
+ ", description='" + description + '\'' +
", globalParams='" + globalParams + '\'' +
", globalParamList=" + globalParamList +
", globalParamMap=" + globalParamMap +
@@ -362,22 +393,7 @@ public class ProcessDefinition {
", scheduleReleaseState=" + scheduleReleaseState +
", timeout=" + timeout +
", tenantId=" + tenantId +
+ ", modifyBy='" + modifyBy + '\'' +
'}';
}
-
- public int getTenantId() {
- return tenantId;
- }
-
- public void setTenantId(int tenantId) {
- this.tenantId = tenantId;
- }
-
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 0647b94..9ce59a5 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -24,12 +24,12 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
-import org.apache.dolphinscheduler.dao.quartz.ProcessScheduleJob;
-import org.apache.dolphinscheduler.dao.quartz.QuartzExecutors;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
+import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,10 +66,10 @@ public class MasterServer implements IStoppable {
private ScheduledExecutorService heartbeatMasterService;
/**
- * dolphinscheduler database interface
+ * process service
*/
@Autowired
- protected ProcessDao processDao;
+ protected ProcessService processService;
/**
* master exec thread pool
@@ -126,7 +126,7 @@ public class MasterServer implements IStoppable {
// master scheduler thread
MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread(
zkMasterClient,
- processDao,
+ processService,
masterConfig.getMasterExecThreads());
// submit master scheduler thread
@@ -136,7 +136,7 @@ public class MasterServer implements IStoppable {
// what system should do if exception
try {
logger.info("start Quartz server...");
- ProcessScheduleJob.init(processDao);
+ ProcessScheduleJob.init(processService);
QuartzExecutors.getInstance().start();
} catch (Exception e) {
try {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index c1552c4..46541f6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -16,15 +16,15 @@
*/
package org.apache.dolphinscheduler.server.master.runner;
-import org.apache.dolphinscheduler.common.queue.ITaskQueue;
-import org.apache.dolphinscheduler.common.queue.TaskQueueFactory;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.AlertDao;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.BeanContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.ITaskQueue;
+import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,9 +41,9 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
private static final Logger logger = LoggerFactory.getLogger(MasterBaseTaskExecThread.class);
/**
- * process dao
+ * process service
*/
- protected ProcessDao processDao;
+ protected ProcessService processService;
/**
* alert database access
@@ -81,7 +81,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
* @param processInstance process instance
*/
public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
- this.processDao = BeanContext.getBean(ProcessDao.class);
+ this.processService = BeanContext.getBean(ProcessService.class);
this.alertDao = BeanContext.getBean(AlertDao.class);
this.processInstance = processInstance;
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
@@ -121,14 +121,14 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
try {
if(!submitDB){
// submit task to db
- task = processDao.submitTask(taskInstance, processInstance);
+ task = processService.submitTask(taskInstance, processInstance);
if(task != null && task.getId() != 0){
submitDB = true;
}
}
if(submitDB && !submitQueue){
// submit task to queue
- submitQueue = processDao.submitTaskToQueue(task);
+ submitQueue = processService.submitTaskToQueue(task);
}
if(submitDB && submitQueue){
return task;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 2b1ff4d..b9d21f6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -28,14 +28,14 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
-import org.apache.dolphinscheduler.dao.utils.cron.CronUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -124,9 +124,9 @@ public class MasterExecThread implements Runnable {
private DAG<String,TaskNode,TaskNodeRelation> dag;
/**
- * process dao
+ * process service
*/
- private ProcessDao processDao;
+ private ProcessService processService;
/**
* master config
@@ -136,10 +136,10 @@ public class MasterExecThread implements Runnable {
/**
* constructor of MasterExecThread
* @param processInstance process instance
- * @param processDao process dao
+ * @param processService process dao
*/
- public MasterExecThread(ProcessInstance processInstance,ProcessDao processDao){
- this.processDao = processDao;
+ public MasterExecThread(ProcessInstance processInstance, ProcessService processService){
+ this.processService = processService;
this.processInstance = processInstance;
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
@@ -177,7 +177,7 @@ public class MasterExecThread implements Runnable {
logger.error("process execute failed, process id:{}", processInstance.getId());
processInstance.setState(ExecutionStatus.FAILURE);
processInstance.setEndTime(new Date());
- processDao.updateProcessInstance(processInstance);
+ processService.updateProcessInstance(processInstance);
}finally {
taskExecService.shutdown();
// post handle
@@ -205,11 +205,11 @@ public class MasterExecThread implements Runnable {
Date startDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date endDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
- processDao.saveProcessInstance(processInstance);
+ processService.saveProcessInstance(processInstance);
// get schedules
int processDefinitionId = processInstance.getProcessDefinitionId();
- List<Schedule> schedules = processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
+ List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
List<Date> listDate = Lists.newLinkedList();
if(!CollectionUtils.isEmpty(schedules)){
for (Schedule schedule : schedules) {
@@ -223,7 +223,7 @@ public class MasterExecThread implements Runnable {
iterator = listDate.iterator();
scheduleDate = iterator.next();
processInstance.setScheduleTime(scheduleDate);
- processDao.updateProcessInstance(processInstance);
+ processService.updateProcessInstance(processInstance);
}else{
scheduleDate = processInstance.getScheduleTime();
if(scheduleDate == null){
@@ -239,7 +239,7 @@ public class MasterExecThread implements Runnable {
logger.error("process {} dag is null, please check out parameters",
processInstance.getId());
processInstance.setState(ExecutionStatus.SUCCESS);
- processDao.updateProcessInstance(processInstance);
+ processService.updateProcessInstance(processInstance);
return;
}
@@ -281,10 +281,10 @@ public class MasterExecThread implements Runnable {
processInstance.setCommandParam(JSONUtils.toJson(cmdParam));
}
- List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());
+ List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for(TaskInstance taskInstance : taskInstanceList){
taskInstance.setFlag(Flag.NO);
- processDao.updateTaskInstance(taskInstance);
+ processService.updateTaskInstance(taskInstance);
}
processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
@@ -292,7 +292,7 @@ public class MasterExecThread implements Runnable {
processInstance.getProcessDefinition().getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
- processDao.saveProcessInstance(processInstance);
+ processService.saveProcessInstance(processInstance);
}
// flow end
@@ -320,11 +320,11 @@ public class MasterExecThread implements Runnable {
*/
private void endProcess() {
processInstance.setEndTime(new Date());
- processDao.updateProcessInstance(processInstance);
+ processService.updateProcessInstance(processInstance);
if(processInstance.getState().typeIsWaittingThread()){
- processDao.createRecoveryWaitingThreadCommand(null, processInstance);
+ processService.createRecoveryWaitingThreadCommand(null, processInstance);
}
- List<TaskInstance> taskInstances = processDao.findValidTaskListByProcessId(processInstance.getId());
+ List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(processInstance.getId());
alertManager.sendAlertProcessInstance(processInstance, taskInstances);
}
@@ -361,7 +361,7 @@ public class MasterExecThread implements Runnable {
dependFailedTask.clear();
completeTaskList.clear();
errorTaskList.clear();
- List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());
+ List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for(TaskInstance task : taskInstanceList){
if(task.isTaskComplete()){
completeTaskList.put(task.getName(), task);
@@ -417,7 +417,7 @@ public class MasterExecThread implements Runnable {
* @return TaskInstance
*/
private TaskInstance findTaskIfExists(String taskName){
- List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(this.processInstance.getId());
+ List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(this.processInstance.getId());
for(TaskInstance taskInstance : taskInstanceList){
if(taskInstance.getName().equals(taskName)){
return taskInstance;
@@ -706,7 +706,7 @@ public class MasterExecThread implements Runnable {
* @return process instance execution status
*/
private ExecutionStatus getProcessInstanceState(){
- ProcessInstance instance = processDao.findProcessInstanceById(processInstance.getId());
+ ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId());
ExecutionStatus state = instance.getState();
if(activeTaskNode.size() > 0){
@@ -784,10 +784,10 @@ public class MasterExecThread implements Runnable {
processInstance.getState().toString(), state.toString(),
processInstance.getCommandType().toString());
processInstance.setState(state);
- ProcessInstance instance = processDao.findProcessInstanceById(processInstance.getId());
+ ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId());
instance.setState(state);
instance.setProcessDefinition(processInstance.getProcessDefinition());
- processDao.updateProcessInstance(instance);
+ processService.updateProcessInstance(instance);
processInstance = instance;
}
}
@@ -845,7 +845,7 @@ public class MasterExecThread implements Runnable {
// send warning email if process time out.
if( !sendTimeWarning && checkProcessTimeOut(processInstance) ){
alertManager.sendProcessTimeoutAlert(processInstance,
- processDao.findProcessDefineById(processInstance.getProcessDefinitionId()));
+ processService.findProcessDefineById(processInstance.getProcessDefinitionId()));
sendTimeWarning = true;
}
for(Map.Entry<MasterBaseTaskExecThread,Future<Boolean>> entry: activeTaskNode.entrySet()) {
@@ -903,7 +903,7 @@ public class MasterExecThread implements Runnable {
if(completeTask.getState()== ExecutionStatus.PAUSE){
completeTask.setState(ExecutionStatus.KILL);
completeTaskList.put(entry.getKey(), completeTask);
- processDao.updateTaskInstance(completeTask);
+ processService.updateTaskInstance(completeTask);
}
}
}
@@ -961,7 +961,7 @@ public class MasterExecThread implements Runnable {
Future<Boolean> future = entry.getValue();
TaskInstance taskInstance = taskExecThread.getTaskInstance();
- taskInstance = processDao.findTaskInstanceById(taskInstance.getId());
+ taskInstance = processService.findTaskInstanceById(taskInstance.getId());
if(taskInstance.getState().typeIsFinished()){
continue;
}
@@ -1031,7 +1031,7 @@ public class MasterExecThread implements Runnable {
}
try {
Integer intId = Integer.valueOf(taskId);
- TaskInstance task = processDao.findTaskInstanceById(intId);
+ TaskInstance task = processService.findTaskInstanceById(intId);
if(task == null){
logger.error("start node id cannot be found: {}", taskId);
}else {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
index a873fb7..84b1432 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
@@ -23,12 +23,12 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
-import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +53,7 @@ public class MasterSchedulerThread implements Runnable {
/**
* dolphinscheduler database interface
*/
- private final ProcessDao processDao;
+ private final ProcessService processService;
/**
* zookeeper master client
@@ -74,11 +74,11 @@ public class MasterSchedulerThread implements Runnable {
/**
* constructor of MasterSchedulerThread
* @param zkClient zookeeper master client
- * @param processDao process dao
+ * @param processService process service
* @param masterExecThreadNum master exec thread num
*/
- public MasterSchedulerThread(ZKMasterClient zkClient, ProcessDao processDao, int masterExecThreadNum){
- this.processDao = processDao;
+ public MasterSchedulerThread(ZKMasterClient zkClient, ProcessService processService, int masterExecThreadNum){
+ this.processService = processService;
this.zkMasterClient = zkClient;
this.masterExecThreadNum = masterExecThreadNum;
this.masterExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread",masterExecThreadNum);
@@ -115,19 +115,19 @@ public class MasterSchedulerThread implements Runnable {
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) masterExecService;
int activeCount = poolExecutor.getActiveCount();
// make sure to scan and delete command table in one transaction
- Command command = processDao.findOneCommand();
+ Command command = processService.findOneCommand();
if (command != null) {
logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
try{
- processInstance = processDao.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
+ processInstance = processService.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
if (processInstance != null) {
logger.info("start master exec thread , split DAG ...");
- masterExecService.execute(new MasterExecThread(processInstance,processDao));
+ masterExecService.execute(new MasterExecThread(processInstance, processService));
}
}catch (Exception e){
logger.error("scan command error ", e);
- processDao.moveToErrorCommand(command, e.toString());
+ processService.moveToErrorCommand(command, e.toString());
}
} else{
//indicate that no command ,sleep for 1s
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
index f2ee66b..66d1a3f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
@@ -82,7 +82,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
result = waitTaskQuit();
}
taskInstance.setEndTime(new Date());
- processDao.updateTaskInstance(taskInstance);
+ processService.updateTaskInstance(taskInstance);
logger.info("task :{} id:{}, process id:{}, exec thread completed ",
this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
return result;
@@ -94,7 +94,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
*/
public Boolean waitTaskQuit(){
// query new state
- taskInstance = processDao.findTaskInstanceById(taskInstance.getId());
+ taskInstance = processService.findTaskInstanceById(taskInstance.getId());
logger.info("wait task: process id: {}, task id:{}, task name:{} complete",
this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());
// task time out
@@ -126,15 +126,15 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
if (remainTime < 0) {
logger.warn("task id: {} execution time out",taskInstance.getId());
// process define
- ProcessDefinition processDefine = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
+ ProcessDefinition processDefine = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
// send warn mail
alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(),processDefine.getReceivers(),processDefine.getReceiversCc(),taskInstance.getId(),taskInstance.getName());
checkTimeout = false;
}
}
// updateProcessInstance task instance
- taskInstance = processDao.findTaskInstanceById(taskInstance.getId());
- processInstance = processDao.findProcessInstanceById(processInstance.getId());
+ taskInstance = processService.findTaskInstanceById(taskInstance.getId());
+ processInstance = processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
logger.error("exception",e);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java
index 0026de7..fc16b51 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/SubProcessTaskExecThread.java
@@ -64,7 +64,7 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread {
}
setTaskInstanceState();
waitTaskQuit();
- subProcessInstance = processDao.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
+ subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
// at the end of the subflow , the task state is changed to the subflow state
if(subProcessInstance != null){
@@ -75,7 +75,7 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread {
}
}
taskInstance.setEndTime(new Date());
- processDao.updateTaskInstance(taskInstance);
+ processService.updateTaskInstance(taskInstance);
logger.info("subflow task :{} id:{}, process id:{}, exec thread completed ",
this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
result = true;
@@ -96,14 +96,14 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread {
* @return
*/
private Boolean setTaskInstanceState(){
- subProcessInstance = processDao.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
+ subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
if(subProcessInstance == null || taskInstance.getState().typeIsFinished()){
return false;
}
taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
taskInstance.setStartTime(new Date());
- processDao.updateTaskInstance(taskInstance);
+ processService.updateTaskInstance(taskInstance);
return true;
}
@@ -111,7 +111,7 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread {
* updateProcessInstance parent state
*/
private void updateParentProcessState(){
- ProcessInstance parentProcessInstance = processDao.findProcessInstanceById(this.processInstance.getId());
+ ProcessInstance parentProcessInstance = processService.findProcessInstanceById(this.processInstance.getId());
if(parentProcessInstance == null){
logger.error("parent work flow instance is null , please check it! work flow id {}", processInstance.getId());
@@ -145,7 +145,7 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread {
continue;
}
}
- subProcessInstance = processDao.findProcessInstanceById(subProcessInstance.getId());
+ subProcessInstance = processService.findProcessInstanceById(subProcessInstance.getId());
updateParentProcessState();
if (subProcessInstance.getState().typeIsFinished()){
break;
@@ -171,7 +171,7 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread {
return;
}
subProcessInstance.setState(ExecutionStatus.READY_STOP);
- processDao.updateProcessInstance(subProcessInstance);
+ processService.updateProcessInstance(subProcessInstance);
}
/**
@@ -183,6 +183,6 @@ public class SubProcessTaskExecThread extends MasterBaseTaskExecThread {
return;
}
subProcessInstance.setState(ExecutionStatus.READY_PAUSE);
- processDao.updateProcessInstance(subProcessInstance);
+ processService.updateProcessInstance(subProcessInstance);
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/ZKMonitorImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/ZKMonitorImpl.java
index 9270740..5acc8fd 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/ZKMonitorImpl.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/ZKMonitorImpl.java
@@ -16,7 +16,7 @@
*/
package org.apache.dolphinscheduler.server.monitor;
-import org.apache.dolphinscheduler.common.zk.ZookeeperOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperOperator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/RemoveZKNode.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/RemoveZKNode.java
index 7264c2f..5550e75 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/RemoveZKNode.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/RemoveZKNode.java
@@ -16,7 +16,7 @@
*/
package org.apache.dolphinscheduler.server.utils;
-import org.apache.dolphinscheduler.common.zk.ZookeeperOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index d270880..92ce585 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -22,22 +22,22 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.queue.ITaskQueue;
-import org.apache.dolphinscheduler.common.queue.TaskQueueFactory;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
-import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.AlertDao;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.ITaskQueue;
+import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
+import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -73,10 +73,10 @@ public class WorkerServer implements IStoppable {
/**
- * process database access
+ * process service
*/
@Autowired
- private ProcessDao processDao;
+ private ProcessService processService;
/**
* alert database access
@@ -167,7 +167,7 @@ public class WorkerServer implements IStoppable {
killExecutorService.execute(killProcessThread);
// new fetch task thread
- FetchTaskThread fetchTaskThread = new FetchTaskThread(zkWorkerClient, processDao, taskQueue);
+ FetchTaskThread fetchTaskThread = new FetchTaskThread(zkWorkerClient, processService, taskQueue);
// submit fetch task thread
fetchTaskExecutorService.execute(fetchTaskThread);
@@ -297,7 +297,7 @@ public class WorkerServer implements IStoppable {
Set<String> taskInfoSet = taskQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_KILL);
if (CollectionUtils.isNotEmpty(taskInfoSet)){
for (String taskInfo : taskInfoSet){
- killTask(taskInfo, processDao);
+ killTask(taskInfo, processService);
removeKillInfoFromQueue(taskInfo);
}
}
@@ -319,7 +319,7 @@ public class WorkerServer implements IStoppable {
* @param taskInfo task info
* @param pd process dao
*/
- private void killTask(String taskInfo, ProcessDao pd) {
+ private void killTask(String taskInfo, ProcessService pd) {
logger.info("get one kill command from tasks kill queue: " + taskInfo);
String[] taskInfoArray = taskInfo.split("-");
if(taskInfoArray.length != 2){
@@ -357,7 +357,7 @@ public class WorkerServer implements IStoppable {
* @param taskInstance
* @param pd process dao
*/
- private void deleteTaskFromQueue(TaskInstance taskInstance, ProcessDao pd){
+ private void deleteTaskFromQueue(TaskInstance taskInstance, ProcessService pd){
// creating distributed locks, lock path /dolphinscheduler/lock/worker
InterProcessMutex mutex = null;
logger.info("delete task from tasks queue: " + taskInstance.getId());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
index 221ad06..357ac9d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
@@ -19,17 +19,17 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.queue.ITaskQueue;
+import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +63,7 @@ public class FetchTaskThread implements Runnable{
/**
* process database access
*/
- private final ProcessDao processDao;
+ private final ProcessService processService;
/**
* worker thread pool executor
@@ -91,10 +91,10 @@ public class FetchTaskThread implements Runnable{
private WorkerConfig workerConfig;
public FetchTaskThread(ZKWorkerClient zkWorkerClient,
- ProcessDao processDao,
+ ProcessService processService,
ITaskQueue taskQueue){
this.zkWorkerClient = zkWorkerClient;
- this.processDao = processDao;
+ this.processService = processService;
this.taskQueue = taskQueue;
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.taskNum = workerConfig.getWorkerFetchTaskNum();
@@ -112,12 +112,12 @@ public class FetchTaskThread implements Runnable{
*/
private boolean checkWorkerGroup(TaskInstance taskInstance, String host){
- int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance);
+ int taskWorkerGroupId = processService.getTaskWorkerGroupId(taskInstance);
if(taskWorkerGroupId <= 0){
return true;
}
- WorkerGroup workerGroup = processDao.queryWorkerGroupById(taskWorkerGroupId);
+ WorkerGroup workerGroup = processService.queryWorkerGroupById(taskWorkerGroupId);
if(workerGroup == null ){
logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId());
return true;
@@ -184,7 +184,7 @@ public class FetchTaskThread implements Runnable{
// mainly to wait for the master insert task to succeed
waitForTaskInstance();
- taskInstance = processDao.getTaskInstanceDetailByTaskId(taskInstId);
+ taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstId);
// verify task instance is null
if (verifyTaskInstanceIsNull(taskInstance)) {
@@ -200,7 +200,7 @@ public class FetchTaskThread implements Runnable{
// if process definition is null ,process definition already deleted
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
- Tenant tenant = processDao.getTenantForProcess(
+ Tenant tenant = processService.getTenantForProcess(
taskInstance.getProcessInstance().getTenantId(),
userId);
@@ -212,7 +212,7 @@ public class FetchTaskThread implements Runnable{
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
- String userQueue = processDao.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
+ String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
@@ -234,7 +234,7 @@ public class FetchTaskThread implements Runnable{
logger.info("task : {} ready to submit to task scheduler thread",taskInstId);
// submit task
- workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
+ workerExecService.submit(new TaskScheduleThread(taskInstance, processService));
// remove node from zk
removeNodeFromTaskQueue(taskQueueStr);
@@ -259,7 +259,7 @@ public class FetchTaskThread implements Runnable{
removeNodeFromTaskQueue(taskQueueStr);
if (taskInstance != null){
- processDao.changeTaskState(ExecutionStatus.FAILURE,
+ processService.changeTaskState(ExecutionStatus.FAILURE,
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
@@ -347,7 +347,7 @@ public class FetchTaskThread implements Runnable{
int retryTimes = 30;
while (taskInstance == null && retryTimes > 0) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- taskInstance = processDao.findTaskInstanceById(taskInstId);
+ taskInstance = processService.findTaskInstanceById(taskInstId);
retryTimes--;
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
index f179d63..a69cffd 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
@@ -31,15 +31,15 @@ import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.dao.permission.PermissionCheck;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.permission.PermissionCheck;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,9 +64,9 @@ public class TaskScheduleThread implements Runnable {
private TaskInstance taskInstance;
/**
- * process database access
+ * process service
*/
- private final ProcessDao processDao;
+ private final ProcessService processService;
/**
* abstract task
@@ -77,10 +77,10 @@ public class TaskScheduleThread implements Runnable {
* constructor
*
* @param taskInstance task instance
- * @param processDao process dao
+ * @param processService process dao
*/
- public TaskScheduleThread(TaskInstance taskInstance, ProcessDao processDao){
- this.processDao = processDao;
+ public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService){
+ this.processService = processService;
this.taskInstance = taskInstance;
}
@@ -152,7 +152,7 @@ public class TaskScheduleThread implements Runnable {
logger.error("task scheduler failure", e);
kill();
// update task instance state
- processDao.changeTaskState(ExecutionStatus.FAILURE,
+ processService.changeTaskState(ExecutionStatus.FAILURE,
new Date(),
taskInstance.getId());
}
@@ -161,7 +161,7 @@ public class TaskScheduleThread implements Runnable {
taskInstance.getId(),
task.getExitStatus());
// update task instance state
- processDao.changeTaskState(task.getExitStatus(),
+ processService.changeTaskState(task.getExitStatus(),
new Date(),
taskInstance.getId());
}
@@ -191,14 +191,14 @@ public class TaskScheduleThread implements Runnable {
// update task status is running
if(taskType.equals(TaskType.SQL.name()) ||
taskType.equals(TaskType.PROCEDURE.name())){
- processDao.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
+ processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
getTaskLogPath(),
taskInstance.getId());
}else{
- processDao.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
+ processService.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
taskInstance.getStartTime(),
taskInstance.getHost(),
taskInstance.getExecutePath(),
@@ -311,7 +311,7 @@ public class TaskScheduleThread implements Runnable {
if (!resFile.exists()) {
try {
// query the tenant code of the resource according to the name of the resource
- String tentnCode = processDao.queryTenantCodeByResName(res);
+ String tentnCode = processService.queryTenantCodeByResName(res);
String resHdfsPath = HadoopUtils.getHdfsFilename(tentnCode, res);
logger.info("get resource file from hdfs :{}", resHdfsPath);
@@ -334,7 +334,7 @@ public class TaskScheduleThread implements Runnable {
private void checkDownloadPermission(List<String> projectRes) throws Exception {
int userId = taskInstance.getProcessInstance().getExecutorId();
String[] resNames = projectRes.toArray(new String[projectRes.size()]);
- PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE,processDao,resNames,userId,logger);
+ PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger);
permissionCheck.checkPermission();
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
index 0409821..459007e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
@@ -21,10 +21,10 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.io.*;
@@ -121,10 +121,10 @@ public abstract class AbstractCommandExecutor {
* task specific execution logic
*
* @param execCommand exec command
- * @param processDao process dao
+ * @param processService process dao
* @return exit status code
*/
- public int run(String execCommand, ProcessDao processDao) {
+ public int run(String execCommand, ProcessService processService) {
int exitStatusCode;
try {
@@ -147,7 +147,7 @@ public abstract class AbstractCommandExecutor {
// get process id
int pid = getProcessId(process);
- processDao.updatePidByTaskInstId(taskInstId, pid, "");
+ processService.updatePidByTaskInstId(taskInstId, pid, "");
logger.info("process start, process id is: {}", pid);
@@ -161,10 +161,10 @@ public abstract class AbstractCommandExecutor {
exitStatusCode = process.exitValue();
logger.info("process has exited, work dir:{}, pid:{} ,exitStatusCode:{}", taskDir, pid,exitStatusCode);
//update process state to db
- exitStatusCode = updateState(processDao, exitStatusCode, pid, taskInstId);
+ exitStatusCode = updateState(processService, exitStatusCode, pid, taskInstId);
} else {
- TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId);
+ TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null) {
logger.error("task instance id:{} not exist", taskInstId);
} else {
@@ -219,23 +219,23 @@ public abstract class AbstractCommandExecutor {
/**
* update process state to db
*
- * @param processDao process dao
+ * @param processService process dao
* @param exitStatusCode exit status code
* @param pid process id
* @param taskInstId task instance id
* @return exit status code
*/
- private int updateState(ProcessDao processDao, int exitStatusCode, int pid, int taskInstId) {
+ private int updateState(ProcessService processService, int exitStatusCode, int pid, int taskInstId) {
//get yarn state by log
if (exitStatusCode == 0) {
- TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId);
+ TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
logger.info("process id is {}", pid);
List<String> appIds = getAppLinks(taskInstance.getLogPath());
if (appIds.size() > 0) {
String appUrl = String.join(Constants.COMMA, appIds);
logger.info("yarn log url:{}",appUrl);
- processDao.updatePidByTaskInstId(taskInstId, pid, appUrl);
+ processService.updatePidByTaskInstId(taskInstId, pid, appUrl);
}
// check if all operations are completed
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
index 6846617..3d5f0b8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
@@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
/**
@@ -39,7 +39,7 @@ public abstract class AbstractYarnTask extends AbstractTask {
/**
* process database access
*/
- protected ProcessDao processDao;
+ protected ProcessService processService;
/**
* Abstract Yarn Task
@@ -48,7 +48,7 @@ public abstract class AbstractYarnTask extends AbstractTask {
*/
public AbstractYarnTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
- this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
+ this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskProps.getTaskDir(),
taskProps.getTaskAppId(),
@@ -64,7 +64,7 @@ public abstract class AbstractYarnTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
- exitStatusCode = shellCommandExecutor.run(buildCommand(), processDao);
+ exitStatusCode = shellCommandExecutor.run(buildCommand(), processService);
} catch (Exception e) {
logger.error("yarn process failure", e);
exitStatusCode = -1;
@@ -82,7 +82,7 @@ public abstract class AbstractYarnTask extends AbstractTask {
cancel = true;
// cancel process
shellCommandExecutor.cancelApplication();
- TaskInstance taskInstance = processDao.findTaskInstanceById(taskProps.getTaskInstId());
+ TaskInstance taskInstance = processService.findTaskInstanceById(taskProps.getTaskInstId());
if (status && taskInstance != null){
ProcessUtils.killYarnJob(taskInstance);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index 0de2bbc..4270ef9 100755
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -47,7 +47,6 @@ import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.DataxUtils;
@@ -56,6 +55,7 @@ import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import com.alibaba.druid.sql.ast.SQLStatement;
@@ -106,9 +106,9 @@ public class DataxTask extends AbstractTask {
private ShellCommandExecutor shellCommandExecutor;
/**
- * process database access
+ * process dao
*/
- private ProcessDao processDao;
+ private ProcessService processService;
/**
* constructor
@@ -128,7 +128,7 @@ public class DataxTask extends AbstractTask {
props.getTaskInstId(), props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(),
props.getTaskTimeout(), logger);
- this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
+ this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
/**
@@ -160,7 +160,7 @@ public class DataxTask extends AbstractTask {
// run datax process
String jsonFilePath = buildDataxJsonFile();
String shellCommandFilePath = buildShellCommandFile(jsonFilePath);
- exitStatusCode = shellCommandExecutor.run(shellCommandFilePath, processDao);
+ exitStatusCode = shellCommandExecutor.run(shellCommandFilePath, processService);
}
catch (Exception e) {
exitStatusCode = -1;
@@ -220,11 +220,11 @@ public class DataxTask extends AbstractTask {
*/
private List<JSONObject> buildDataxJobContentJson()
throws SQLException {
- DataSource dataSource = processDao.findDataSourceById(dataXParameters.getDataSource());
+ DataSource dataSource = processService.findDataSourceById(dataXParameters.getDataSource());
BaseDataSource dataSourceCfg = DataSourceFactory.getDatasource(dataSource.getType(),
dataSource.getConnectionParams());
- DataSource dataTarget = processDao.findDataSourceById(dataXParameters.getDataTarget());
+ DataSource dataTarget = processService.findDataSourceById(dataXParameters.getDataTarget());
BaseDataSource dataTargetCfg = DataSourceFactory.getDatasource(dataTarget.getType(),
dataTarget.getConnectionParams());
@@ -355,7 +355,7 @@ public class DataxTask extends AbstractTask {
String dataxCommand = sbr.toString();
// find process instance by task id
- ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+ ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java
index 4be65ed..7625c85 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java
@@ -24,9 +24,9 @@ import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,9 +37,9 @@ import java.util.*;
*/
public class DependentExecute {
/**
- * process dao
+ * process service
*/
- private final ProcessDao processDao = SpringApplicationContext.getBean(ProcessDao.class);
+ private final ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
/**
* depend item list
@@ -108,7 +108,7 @@ public class DependentExecute {
result = getDependResultByState(processInstance.getState());
}else{
TaskInstance taskInstance = null;
- List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());
+ List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for(TaskInstance task : taskInstanceList){
if(task.getName().equals(dependentItem.getDepTasks())){
@@ -141,16 +141,16 @@ public class DependentExecute {
*/
private ProcessInstance findLastProcessInterval(int definitionId, DateInterval dateInterval) {
- ProcessInstance runningProcess = processDao.findLastRunningProcess(definitionId, dateInterval);
+ ProcessInstance runningProcess = processService.findLastRunningProcess(definitionId, dateInterval);
if(runningProcess != null){
return runningProcess;
}
- ProcessInstance lastSchedulerProcess = processDao.findLastSchedulerProcessInterval(
+ ProcessInstance lastSchedulerProcess = processService.findLastSchedulerProcessInterval(
definitionId, dateInterval
);
- ProcessInstance lastManualProcess = processDao.findLastManualProcessInterval(
+ ProcessInstance lastManualProcess = processService.findLastManualProcessInterval(
definitionId, dateInterval
);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
index 9af29e0..ba0727d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
@@ -26,10 +26,10 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.util.*;
@@ -63,9 +63,9 @@ public class DependentTask extends AbstractTask {
private Date dependentDate;
/**
- * process dao
+ * process service
*/
- private ProcessDao processDao;
+ private ProcessService processService;
/**
* constructor
@@ -88,7 +88,7 @@ public class DependentTask extends AbstractTask {
taskModel.getDependItemList(), taskModel.getRelation()));
}
- this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
+ this.processService = SpringApplicationContext.getBean(ProcessService.class);
if(taskProps.getScheduleTime() != null){
this.dependentDate = taskProps.getScheduleTime();
@@ -107,7 +107,7 @@ public class DependentTask extends AbstractTask {
try{
TaskInstance taskInstance = null;
while(Stopper.isRunning()){
- taskInstance = processDao.findTaskInstanceById(this.taskProps.getTaskInstId());
+ taskInstance = processService.findTaskInstanceById(this.taskProps.getTaskInstId());
if(taskInstance == null){
exitStatusCode = -1;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
index 0fa9e11..c562fbe 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
@@ -68,7 +68,7 @@ public class FlinkTask extends AbstractYarnTask {
if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
String args = flinkParameters.getMainArgs();
// get process instance by task instance id
- ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+ ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
/**
* combining local and global parameters
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
index 97e6cb7..b1dbd54 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
@@ -30,11 +30,11 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.http.HttpEntity;
import org.apache.http.ParseException;
import org.apache.http.client.config.RequestConfig;
@@ -66,9 +66,9 @@ public class HttpTask extends AbstractTask {
private HttpParameters httpParameters;
/**
- * process database access
+ * process service
*/
- private ProcessDao processDao;
+ private ProcessService processService;
/**
* Convert mill seconds to second unit
@@ -92,7 +92,7 @@ public class HttpTask extends AbstractTask {
*/
public HttpTask(TaskProps props, Logger logger) {
super(props, logger);
- this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
+ this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
@Override
@@ -138,7 +138,7 @@ public class HttpTask extends AbstractTask {
*/
protected CloseableHttpResponse sendRequest(CloseableHttpClient client) throws IOException {
RequestBuilder builder = createRequestBuilder();
- ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+ ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java
index 9b4952b..745468d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java
@@ -30,11 +30,11 @@ import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.sql.*;
@@ -56,9 +56,9 @@ public class ProcedureTask extends AbstractTask {
private ProcedureParameters procedureParameters;
/**
- * process database access
+ * process service
*/
- private ProcessDao processDao;
+ private ProcessService processService;
/**
* base datasource
@@ -82,7 +82,7 @@ public class ProcedureTask extends AbstractTask {
throw new RuntimeException("procedure task params is not valid");
}
- this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
+ this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
@Override
@@ -97,7 +97,7 @@ public class ProcedureTask extends AbstractTask {
procedureParameters.getMethod(),
procedureParameters.getLocalParams());
- DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource());
+ DataSource dataSource = processService.findDataSourceById(procedureParameters.getDatasource());
if (dataSource == null){
logger.error("datasource not exists");
exitStatusCode = -1;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
index 585d62f..74dfe3c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
@@ -23,11 +23,11 @@ import org.apache.dolphinscheduler.common.task.python.PythonParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.util.Map;
@@ -53,9 +53,9 @@ public class PythonTask extends AbstractTask {
private PythonCommandExecutor pythonCommandExecutor;
/**
- * process database access
+ * process service
*/
- private ProcessDao processDao;
+ private ProcessService processService;
/**
* constructor
@@ -76,7 +76,7 @@ public class PythonTask extends AbstractTask {
taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(),
logger);
- this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
+ this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
@Override
@@ -94,7 +94,7 @@ public class PythonTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
- exitStatusCode = pythonCommandExecutor.run(buildCommand(), processDao);
+ exitStatusCode = pythonCommandExecutor.run(buildCommand(), processService);
} catch (Exception e) {
logger.error("python task failure", e);
exitStatusCode = -1;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index 789a0c5..32f5833 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -24,11 +24,11 @@ import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.io.File;
@@ -64,7 +64,7 @@ public class ShellTask extends AbstractTask {
/**
* process database access
*/
- private ProcessDao processDao;
+ private ProcessService processService;
/**
* constructor
@@ -84,7 +84,7 @@ public class ShellTask extends AbstractTask {
taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(),
logger);
- this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
+ this.processService = SpringApplicationContext.getBean(ProcessService.class);
}
@Override
@@ -102,7 +102,7 @@ public class ShellTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
- exitStatusCode = shellCommandExecutor.run(buildCommand(), processDao);
+ exitStatusCode = shellCommandExecutor.run(buildCommand(), processService);
} catch (Exception e) {
logger.error("shell task failure", e);
exitStatusCode = -1;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index eba05a0..bc306b9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -36,16 +36,16 @@ import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlType;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.AlertDao;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.permission.PermissionCheck;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.UDFUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.permission.PermissionCheck;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.sql.*;
@@ -67,9 +67,9 @@ public class SqlTask extends AbstractTask {
private SqlParameters sqlParameters;
/**
- * process database access
+ * process service
*/
- private ProcessDao processDao;
+ private ProcessService processService;
/**
* alert dao
@@ -96,7 +96,7 @@ public class SqlTask extends AbstractTask {
if (!sqlParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
}
- this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
+ this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
}
@@ -122,7 +122,7 @@ public class SqlTask extends AbstractTask {
return;
}
- dataSource= processDao.findDataSourceById(sqlParameters.getDatasource());
+ dataSource= processService.findDataSourceById(sqlParameters.getDatasource());
// data source is null
if (dataSource == null){
@@ -171,7 +171,7 @@ public class SqlTask extends AbstractTask {
}
// check udf permission
checkUdfPermission(ArrayUtils.toObject(idsArray));
- List<UdfFunc> udfFuncList = processDao.queryUdfFunListByids(idsArray);
+ List<UdfFunc> udfFuncList = processService.queryUdfFunListByids(idsArray);
createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger);
}
@@ -383,7 +383,7 @@ public class SqlTask extends AbstractTask {
public void sendAttachment(String title,String content){
// process instance
- ProcessInstance instance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+ ProcessInstance instance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
List<User> users = alertDao.queryUserByAlertGroupId(instance.getWarningGroupId());
@@ -470,10 +470,10 @@ public class SqlTask extends AbstractTask {
*/
private void checkUdfPermission(Integer[] udfFunIds) throws Exception{
// process instance
- ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+ ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
int userId = processInstance.getExecutorId();
- PermissionCheck<Integer> permissionCheckUdf = new PermissionCheck<Integer>(AuthorizationType.UDF,processDao,udfFunIds,userId,logger);
+ PermissionCheck<Integer> permissionCheckUdf = new PermissionCheck<Integer>(AuthorizationType.UDF, processService,udfFunIds,userId,logger);
permissionCheckUdf.checkPermission();
}
@@ -484,10 +484,10 @@ public class SqlTask extends AbstractTask {
*/
private void checkDataSourcePermission(int dataSourceId) throws Exception{
// process instance
- ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+ ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId());
int userId = processInstance.getExecutorId();
- PermissionCheck<Integer> permissionCheckDataSource = new PermissionCheck<Integer>(AuthorizationType.DATASOURCE,processDao,new Integer[]{dataSourceId},userId,logger);
+ PermissionCheck<Integer> permissionCheckDataSource = new PermissionCheck<Integer>(AuthorizationType.DATASOURCE, processService,new Integer[]{dataSourceId},userId,logger);
permissionCheckDataSource.checkPermission();
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index c6a71ed..fe4ec91 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -21,10 +21,8 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
-import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
@@ -32,6 +30,8 @@ import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.utils.ThreadUtils;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -70,10 +70,10 @@ public class ZKMasterClient extends AbstractZKClient {
*/
private AlertDao alertDao = null;
/**
- * flow database access
+ * process service
*/
@Autowired
- private ProcessDao processDao;
+ private ProcessService processService;
/**
* default constructor
@@ -374,7 +374,7 @@ public class ZKMasterClient extends AbstractZKClient {
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
logger.info("start worker[{}] failover ...", workerHost);
- List<TaskInstance> needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost);
+ List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
for(TaskInstance taskInstance : needFailoverTaskInstanceList){
if(needCheckWorkerAlive){
if(!checkTaskInstanceNeedFailover(taskInstance)){
@@ -382,7 +382,7 @@ public class ZKMasterClient extends AbstractZKClient {
}
}
- ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
+ ProcessInstance instance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if(instance!=null){
taskInstance.setProcessInstance(instance);
}
@@ -390,7 +390,7 @@ public class ZKMasterClient extends AbstractZKClient {
ProcessUtils.killYarnJob(taskInstance);
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
- processDao.saveTaskInstance(taskInstance);
+ processService.saveTaskInstance(taskInstance);
}
logger.info("end worker[{}] failover ...", workerHost);
}
@@ -403,11 +403,11 @@ public class ZKMasterClient extends AbstractZKClient {
private void failoverMaster(String masterHost) {
logger.info("start master failover ...");
- List<ProcessInstance> needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost);
+ List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
//updateProcessInstance host is null and insert into command
for(ProcessInstance processInstance : needFailoverProcessInstanceList){
- processDao.processNeedFailoverProcessInstances(processInstance);
+ processService.processNeedFailoverProcessInstances(processInstance);
}
logger.info("master failover end");
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
index 88abfa3..7ddee3b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
@@ -19,9 +19,9 @@ package org.apache.dolphinscheduler.server.zk;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
-import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
index d7c3de1..ec66478 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
@@ -21,12 +21,12 @@ import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.MasterExecThread;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -57,7 +57,7 @@ public class MasterExecThreadTest {
private ProcessInstance processInstance;
- private ProcessDao processDao;
+ private ProcessService processService;
private int processDefinitionId = 1;
@@ -67,7 +67,7 @@ public class MasterExecThreadTest {
@Before
public void init() throws Exception{
- processDao = mock(ProcessDao.class);
+ processService = mock(ProcessService.class);
applicationContext = mock(ApplicationContext.class);
config = new MasterConfig();
@@ -91,7 +91,7 @@ public class MasterExecThreadTest {
processDefinition.setGlobalParamList(Collections.EMPTY_LIST);
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
- masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processDao));
+ masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService));
// prepareProcess init dag
Field dag = MasterExecThread.class.getDeclaredField("dag");
dag.setAccessible(true);
@@ -110,12 +110,12 @@ public class MasterExecThreadTest {
@Test
public void testParallelWithOutSchedule() throws ParseException {
try{
- Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess");
method.setAccessible(true);
method.invoke(masterExecThread);
// one create save, and 1-30 for next save, and last day 31 no save
- verify(processDao, times(31)).saveProcessInstance(processInstance);
+ verify(processService, times(31)).saveProcessInstance(processInstance);
}catch (Exception e){
e.printStackTrace();
Assert.assertTrue(false);
@@ -129,12 +129,12 @@ public class MasterExecThreadTest {
@Test
public void testParallelWithSchedule() throws ParseException {
try{
- Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
+ Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess");
method.setAccessible(true);
method.invoke(masterExecThread);
// one create save, and 15(1 to 31 step 2) for next save, and last day 31 no save
- verify(processDao, times(15)).saveProcessInstance(processInstance);
+ verify(processService, times(15)).saveProcessInstance(processInstance);
}catch (Exception e){
Assert.assertTrue(false);
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
index 04c8448..0ab0b4b 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
@@ -21,12 +21,12 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -43,11 +43,11 @@ public class ShellCommandExecutorTest {
private static final Logger logger = LoggerFactory.getLogger(ShellCommandExecutorTest.class);
- private ProcessDao processDao = null;
+ private ProcessService processService = null;
@Before
public void before(){
- processDao = SpringApplicationContext.getBean(ProcessDao.class);
+ processService = SpringApplicationContext.getBean(ProcessService.class);
}
@Test
@@ -65,7 +65,7 @@ public class ShellCommandExecutorTest {
- TaskInstance taskInstance = processDao.findTaskInstanceById(7657);
+ TaskInstance taskInstance = processService.findTaskInstanceById(7657);
String taskJson = taskInstance.getTaskJson();
TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
index 7da3f71..c8e92da 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
@@ -22,12 +22,12 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -44,11 +44,11 @@ public class SqlExecutorTest {
private static final Logger logger = LoggerFactory.getLogger(SqlExecutorTest.class);
- private ProcessDao processDao = null;
+ private ProcessService processService = null;
@Before
public void before(){
- processDao = SpringApplicationContext.getBean(ProcessDao.class);
+ processService = SpringApplicationContext.getBean(ProcessService.class);
}
@Test
@@ -109,7 +109,7 @@ public class SqlExecutorTest {
taskProps.setCmdTypeIfComplement(CommandType.START_PROCESS);
- TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId);
+ TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
String taskJson = taskInstance.getTaskJson();
TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
index 7a6073e..6b71d6d 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
@@ -28,12 +28,12 @@ import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.job.db.BaseDataSource;
import org.apache.dolphinscheduler.common.job.db.DataSourceFactory;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.DataxUtils;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -53,7 +53,7 @@ public class DataxTaskTest {
private DataxTask dataxTask;
- private ProcessDao processDao;
+ private ProcessService processService;
private ShellCommandExecutor shellCommandExecutor;
@@ -62,13 +62,13 @@ public class DataxTaskTest {
@Before
public void before()
throws Exception {
- processDao = Mockito.mock(ProcessDao.class);
+ processService = Mockito.mock(ProcessService.class);
shellCommandExecutor = Mockito.mock(ShellCommandExecutor.class);
applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
- Mockito.when(applicationContext.getBean(ProcessDao.class)).thenReturn(processDao);
+ Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
@@ -83,12 +83,12 @@ public class DataxTaskTest {
dataxTask = PowerMockito.spy(new DataxTask(props, logger));
dataxTask.init();
- Mockito.when(processDao.findDataSourceById(1)).thenReturn(getDataSource());
- Mockito.when(processDao.findDataSourceById(2)).thenReturn(getDataSource());
- Mockito.when(processDao.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance());
+ Mockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource());
+ Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
+ Mockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance());
String fileName = String.format("%s/%s_node.sh", props.getTaskDir(), props.getTaskAppId());
- Mockito.when(shellCommandExecutor.run(fileName, processDao)).thenReturn(0);
+ Mockito.when(shellCommandExecutor.run(fileName, processService)).thenReturn(0);
}
private DataSource getDataSource() {
diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml
index 31a2837..7d775d5 100644
--- a/dolphinscheduler-service/pom.xml
+++ b/dolphinscheduler-service/pom.xml
@@ -12,18 +12,45 @@
<artifactId>dolphinscheduler-service</artifactId>
<name>dolphinscheduler-service</name>
- <url>http://www.example.com</url>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <maven.compiler.source>1.7</maven.compiler.source>
- <maven.compiler.target>1.7</maven.compiler.target>
- </properties>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-remote</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-dao</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ <version>${curator.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j-1.2-api</groupId>
+ <artifactId>org.apache.logging.log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.quartz-scheduler</groupId>
+ <artifactId>quartz</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>c3p0</artifactId>
+ <groupId>c3p0</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.quartz-scheduler</groupId>
+ <artifactId>quartz-jobs</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/MasterResponseCommand.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/MasterResponseCommand.java
deleted file mode 100644
index 7607159..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/MasterResponseCommand.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service;
-
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
-
-import java.io.Serializable;
-
-/**
- * view log response command
- */
-public class MasterResponseCommand implements Serializable {
-
- private String msg;
-
- public MasterResponseCommand() {
- }
-
- public MasterResponseCommand(String msg) {
- this.msg = msg;
- }
-
- public String getMsg() {
- return msg;
- }
-
- public void setMsg(String msg) {
- this.msg = msg;
- }
-
- public Command convert2Command(long opaque){
- Command command = new Command(opaque);
- command.setType(CommandType.MASTER_RESPONSE);
- byte[] body = FastJsonSerializer.serialize(this);
- command.setBody(body);
- return command;
- }
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/WorkerRequestCommand.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/WorkerRequestCommand.java
deleted file mode 100644
index 419add4..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/WorkerRequestCommand.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service;
-
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
-
-import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * view log request command
- */
-public class WorkerRequestCommand implements Serializable {
-
- private static final AtomicLong REQUEST = new AtomicLong(1);
-
- private String path;
-
- public WorkerRequestCommand() {
- }
-
- public WorkerRequestCommand(String path) {
- this.path = path;
- }
-
- public String getPath() {
- return path;
- }
-
- public void setPath(String path) {
- this.path = path;
- }
-
- public Command convert2Command(){
- Command command = new Command(REQUEST.getAndIncrement());
- command.setType(CommandType.WORKER_REQUEST);
- byte[] body = FastJsonSerializer.serialize(this);
- command.setBody(body);
- return command;
- }
-}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/permission/PermissionCheck.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
similarity index 80%
rename from dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/permission/PermissionCheck.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
index 63d4c1c..027666f 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/permission/PermissionCheck.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
@@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.permission;
+package org.apache.dolphinscheduler.service.permission;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.util.List;
@@ -38,7 +38,7 @@ public class PermissionCheck<T> {
/**
* Authorization Type
*/
- private ProcessDao processDao;
+ private ProcessService processService;
/**
* need check array
@@ -53,23 +53,23 @@ public class PermissionCheck<T> {
/**
* permission check
* @param authorizationType authorization type
- * @param processDao process dao
+ * @param processService process dao
*/
- public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao) {
+ public PermissionCheck(AuthorizationType authorizationType, ProcessService processService) {
this.authorizationType = authorizationType;
- this.processDao = processDao;
+ this.processService = processService;
}
/**
* permission check
* @param authorizationType
- * @param processDao
+ * @param processService
* @param needChecks
* @param userId
*/
- public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao, T[] needChecks, int userId) {
+ public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, T[] needChecks, int userId) {
this.authorizationType = authorizationType;
- this.processDao = processDao;
+ this.processService = processService;
this.needChecks = needChecks;
this.userId = userId;
}
@@ -77,14 +77,14 @@ public class PermissionCheck<T> {
/**
* permission check
* @param authorizationType
- * @param processDao
+ * @param processService
* @param needChecks
* @param userId
* @param logger
*/
- public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao, T[] needChecks, int userId,Logger logger) {
+ public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, T[] needChecks, int userId, Logger logger) {
this.authorizationType = authorizationType;
- this.processDao = processDao;
+ this.processService = processService;
this.needChecks = needChecks;
this.userId = userId;
this.logger = logger;
@@ -98,12 +98,12 @@ public class PermissionCheck<T> {
this.authorizationType = authorizationType;
}
- public ProcessDao getProcessDao() {
- return processDao;
+ public ProcessService getProcessService() {
+ return processService;
}
- public void setProcessDao(ProcessDao processDao) {
- this.processDao = processDao;
+ public void setProcessService(ProcessService processService) {
+ this.processService = processService;
}
public T[] getNeedChecks() {
@@ -142,9 +142,9 @@ public class PermissionCheck<T> {
public void checkPermission() throws Exception{
if(this.needChecks.length > 0){
// get user type in order to judge whether the user is admin
- User user = processDao.getUserById(userId);
+ User user = processService.getUserById(userId);
if (user.getUserType() != UserType.ADMIN_USER){
- List<T> unauthorizedList = processDao.listUnauthorized(userId,needChecks,authorizationType);
+ List<T> unauthorizedList = processService.listUnauthorized(userId,needChecks,authorizationType);
// if exist unauthorized resource
if(CollectionUtils.isNotEmpty(unauthorizedList)){
logger.error("user {} didn't has permission of {}: {}", user.getUserName(), authorizationType.getDescp(),unauthorizedList.toString());
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
similarity index 99%
rename from dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 820b2fd..a26044e 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao;
+package org.apache.dolphinscheduler.service.process;
import com.alibaba.fastjson.JSONObject;
import com.cronutils.model.Cron;
@@ -24,16 +24,12 @@ import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
-import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.IpUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.ParameterUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
-import org.apache.dolphinscheduler.dao.utils.cron.CronUtils;
+import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
+import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +47,7 @@ import static org.apache.dolphinscheduler.common.Constants.*;
* process relative dao that some mappers in this.
*/
@Component
-public class ProcessDao {
+public class ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass());
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/DruidConnectionProvider.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/DruidConnectionProvider.java
similarity index 99%
rename from dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/DruidConnectionProvider.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/DruidConnectionProvider.java
index 8a4ceba..d51e8e8 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/DruidConnectionProvider.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/DruidConnectionProvider.java
@@ -14,11 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.quartz;
+package org.apache.dolphinscheduler.service.quartz;
import com.alibaba.druid.pool.DruidDataSource;
import org.quartz.SchedulerException;
import org.quartz.utils.ConnectionProvider;
+
import java.sql.Connection;
import java.sql.SQLException;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/ProcessScheduleJob.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
similarity index 83%
rename from dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/ProcessScheduleJob.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
index ac46129..69a80e6 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/ProcessScheduleJob.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
@@ -14,17 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.quartz;
+package org.apache.dolphinscheduler.service.quartz;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
-import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.quartz.*;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
@@ -42,17 +45,17 @@ public class ProcessScheduleJob implements Job {
private static final Logger logger = LoggerFactory.getLogger(ProcessScheduleJob.class);
/**
- * process dao
+ * process service
*/
- private static ProcessDao processDao;
+ private static ProcessService processService;
/**
* init
- * @param processDao process dao
+ * @param processService process dao
*/
- public static void init(ProcessDao processDao) {
- ProcessScheduleJob.processDao = processDao;
+ public static void init(ProcessService processService) {
+ ProcessScheduleJob.processService = processService;
}
/**
@@ -64,7 +67,7 @@ public class ProcessScheduleJob implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
- Assert.notNull(processDao, "please call init() method first");
+ Assert.notNull(processService, "please call init() method first");
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
@@ -80,7 +83,7 @@ public class ProcessScheduleJob implements Job {
logger.info("scheduled fire time :{}, fire time :{}, process id :{}", scheduledFireTime, fireTime, scheduleId);
// query schedule
- Schedule schedule = processDao.querySchedule(scheduleId);
+ Schedule schedule = processService.querySchedule(scheduleId);
if (schedule == null) {
logger.warn("process schedule does not exist in db,delete schedule job in quartz, projectId:{}, scheduleId:{}", projectId, scheduleId);
deleteJob(projectId, scheduleId);
@@ -88,7 +91,7 @@ public class ProcessScheduleJob implements Job {
}
- ProcessDefinition processDefinition = processDao.findProcessDefineById(schedule.getProcessDefinitionId());
+ ProcessDefinition processDefinition = processService.findProcessDefineById(schedule.getProcessDefinitionId());
// release state : online/offline
ReleaseState releaseState = processDefinition.getReleaseState();
if (processDefinition == null || releaseState == ReleaseState.OFFLINE) {
@@ -108,7 +111,7 @@ public class ProcessScheduleJob implements Job {
command.setWarningType(schedule.getWarningType());
command.setProcessInstancePriority(schedule.getProcessInstancePriority());
- processDao.createCommand(command);
+ processService.createCommand(command);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/QuartzExecutors.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
similarity index 99%
rename from dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/QuartzExecutors.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
index 054d790..9d96264 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/quartz/QuartzExecutors.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.quartz;
+package org.apache.dolphinscheduler.service.quartz;
+import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.apache.commons.lang.StringUtils;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.GroupMatcher;
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/AbstractCycle.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
similarity index 99%
rename from dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/AbstractCycle.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
index 0cda336..0a2e31b 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/AbstractCycle.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
@@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.utils.cron;
+package org.apache.dolphinscheduler.service.quartz.cron;
-import org.apache.dolphinscheduler.common.enums.CycleEnum;
import com.cronutils.model.Cron;
import com.cronutils.model.field.CronField;
import com.cronutils.model.field.CronFieldName;
import com.cronutils.model.field.expression.*;
+import org.apache.dolphinscheduler.common.enums.CycleEnum;
/**
* Cycle
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CronUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
similarity index 98%
rename from dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CronUtils.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
index 8a9087a..2d1d938 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CronUtils.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CronUtils.java
@@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.utils.cron;
+package org.apache.dolphinscheduler.service.quartz.cron;
-import org.apache.dolphinscheduler.common.enums.CycleEnum;
-import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
import com.cronutils.model.Cron;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser;
+import org.apache.dolphinscheduler.common.enums.CycleEnum;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +31,7 @@ import java.text.ParseException;
import java.util.*;
import static com.cronutils.model.CronType.QUARTZ;
-import static org.apache.dolphinscheduler.dao.utils.cron.CycleFactory.*;
+import static org.apache.dolphinscheduler.service.quartz.cron.CycleFactory.*;
/**
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CycleFactory.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CycleFactory.java
similarity index 99%
rename from dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CycleFactory.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CycleFactory.java
index 10906b4..a956d6c 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CycleFactory.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CycleFactory.java
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.utils.cron;
+package org.apache.dolphinscheduler.service.quartz.cron;
-import org.apache.dolphinscheduler.common.enums.CycleEnum;
import com.cronutils.model.Cron;
import com.cronutils.model.field.expression.Always;
import com.cronutils.model.field.expression.QuestionMark;
+import org.apache.dolphinscheduler.common.enums.CycleEnum;
/**
* Crontab Cycle Tool Factory
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CycleLinks.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CycleLinks.java
similarity index 97%
rename from dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CycleLinks.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CycleLinks.java
index 63824bd..9f01b18 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/cron/CycleLinks.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/CycleLinks.java
@@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.utils.cron;
+package org.apache.dolphinscheduler.service.quartz.cron;
-import org.apache.dolphinscheduler.common.enums.CycleEnum;
import com.cronutils.model.Cron;
+import org.apache.dolphinscheduler.common.enums.CycleEnum;
import java.util.ArrayList;
import java.util.List;
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/ITaskQueue.java
similarity index 97%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/ITaskQueue.java
index 5beb811..bed8a11 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/ITaskQueue.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.queue;
+package org.apache.dolphinscheduler.service.queue;
import java.util.List;
import java.util.Set;
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueFactory.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
similarity index 97%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueFactory.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
index 0a2d943..ec0f157 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueFactory.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
@@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.queue;
+package org.apache.dolphinscheduler.service.queue;
-import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueZkImpl.java
similarity index 99%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueZkImpl.java
index d442c13..874512c 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueZkImpl.java
@@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.queue;
+package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.IpUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.common.zk.ZookeeperOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/worker/WorkerClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/worker/WorkerClientService.java
deleted file mode 100644
index c107122..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/worker/WorkerClientService.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.service.worker;
-
-import io.netty.channel.Channel;
-import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.log.*;
-import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import org.apache.dolphinscheduler.remote.utils.Address;
-import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
-import org.apache.dolphinscheduler.service.MasterResponseCommand;
-import org.apache.dolphinscheduler.service.WorkerRequestCommand;
-import org.apache.dolphinscheduler.service.log.LogPromise;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * log client
- */
-public class WorkerClientService implements NettyRequestProcessor {
-
- private static final Logger logger = LoggerFactory.getLogger(WorkerClientService.class);
-
- private final NettyClientConfig clientConfig;
-
- private final NettyRemotingClient client;
-
- private final Address address;
-
- /**
- * request time out
- */
- private final long logRequestTimeout = 10 * 1000;
-
- /**
- * construct client
- * @param host host
- * @param port port
- */
- public WorkerClientService(String host, int port) {
- this.address = new Address(host, port);
- this.clientConfig = new NettyClientConfig();
- this.clientConfig.setWorkerThreads(1);
- this.client = new NettyRemotingClient(clientConfig);
- this.client.registerProcessor(CommandType.MASTER_RESPONSE, this);
-
- }
-
- /**
- * shutdown
- */
- public void shutdown() {
- this.client.close();
- logger.info("logger client shutdown");
- }
-
-
- public String reportResult() {
- WorkerRequestCommand request = new WorkerRequestCommand();
- String result = "";
- try {
- Command command = request.convert2Command();
- this.client.send(address, command);
- LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
- result = ((String)promise.getResult());
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("roll view log error", e);
- }
- return result;
- }
-
-
- @Override
- public void process(Channel channel, Command command) {
- logger.info("received log response : {}", command);
- MasterResponseCommand masterResponseCommand = FastJsonSerializer.deserialize(
- command.getBody(), MasterResponseCommand.class);
- LogPromise.notify(command.getOpaque(), masterResponseCommand.getMsg());
- }
-
- public static void main(String[] args) throws Exception{
- WorkerClientService workerClientService = new WorkerClientService("192.168.220.247", 1128);
- String result = workerClientService.reportResult();
- System.out.println(result);
-
- }
-
-}
\ No newline at end of file
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
similarity index 99%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
index f62e106..135bfda 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.zk;
+package org.apache.dolphinscheduler.service.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
@@ -31,12 +31,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
+
import static org.apache.dolphinscheduler.common.Constants.*;
/**
* abstract zookeeper client
*/
-public abstract class AbstractZKClient extends ZookeeperCachedOperator{
+public abstract class AbstractZKClient extends ZookeeperCachedOperator {
private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class);
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/DefaultEnsembleProvider.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/DefaultEnsembleProvider.java
similarity index 96%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/DefaultEnsembleProvider.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/DefaultEnsembleProvider.java
index 0cf06c0..9eedf7a 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/DefaultEnsembleProvider.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/DefaultEnsembleProvider.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.zk;
+package org.apache.dolphinscheduler.service.zk;
import org.apache.curator.ensemble.EnsembleProvider;
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
similarity index 90%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
index 5aa2555..dccb768 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
@@ -14,22 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.zk;
+package org.apache.dolphinscheduler.service.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.apache.dolphinscheduler.common.utils.Preconditions.*;
-import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
@Component
public class ZookeeperCachedOperator extends ZookeeperOperator {
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperConfig.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
similarity index 98%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperConfig.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
index 75a9f6c..c6bdfc3 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperConfig.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.zk;
+package org.apache.dolphinscheduler.service.zk;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
similarity index 98%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
index 9442afd..a2cabce 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.zk;
+package org.apache.dolphinscheduler.service.zk;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
@@ -33,12 +33,10 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import org.springframework.util.CollectionUtils;
import java.nio.charset.StandardCharsets;
import java.util.List;
-import static org.apache.dolphinscheduler.common.utils.Preconditions.*;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
/**
diff --git a/dolphinscheduler-common/src/main/resources/quartz.properties b/dolphinscheduler-service/src/main/resources/quartz.properties
similarity index 96%
rename from dolphinscheduler-common/src/main/resources/quartz.properties
rename to dolphinscheduler-service/src/main/resources/quartz.properties
index 2e3a2a0..9c8930b 100644
--- a/dolphinscheduler-common/src/main/resources/quartz.properties
+++ b/dolphinscheduler-service/src/main/resources/quartz.properties
@@ -59,6 +59,6 @@ org.quartz.jobStore.dataSource = myDs
#============================================================================
# Configure Datasources
#============================================================================
-org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.dao.quartz.DruidConnectionProvider
+org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider
org.quartz.dataSource.myDs.maxConnections = 10
org.quartz.dataSource.myDs.validationQuery = select 1
\ No newline at end of file
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/cron/CronUtilsTest.java b/dolphinscheduler-service/src/test/java/cron/CronUtilsTest.java
similarity index 98%
rename from dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/cron/CronUtilsTest.java
rename to dolphinscheduler-service/src/test/java/cron/CronUtilsTest.java
index 1135cf2..4ddd5fc 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/cron/CronUtilsTest.java
+++ b/dolphinscheduler-service/src/test/java/cron/CronUtilsTest.java
@@ -14,11 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.dao.cron;
+package cron;
-import org.apache.dolphinscheduler.common.enums.CycleEnum;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.dao.utils.cron.CronUtils;
import com.cronutils.builder.CronBuilder;
import com.cronutils.model.Cron;
import com.cronutils.model.CronType;
@@ -26,6 +23,9 @@ import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.field.CronField;
import com.cronutils.model.field.CronFieldName;
import com.cronutils.model.field.expression.*;
+import org.apache.dolphinscheduler.common.enums.CycleEnum;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/BaseTaskQueueTest.java b/dolphinscheduler-service/src/test/java/queue/BaseTaskQueueTest.java
similarity index 90%
rename from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/BaseTaskQueueTest.java
rename to dolphinscheduler-service/src/test/java/queue/BaseTaskQueueTest.java
index 433e4fa..a0cc457 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/BaseTaskQueueTest.java
+++ b/dolphinscheduler-service/src/test/java/queue/BaseTaskQueueTest.java
@@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.queue;
+package queue;
-import org.apache.dolphinscheduler.common.zk.ZKServer;
+import org.apache.dolphinscheduler.service.queue.ITaskQueue;
+import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.junit.*;
/**
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java b/dolphinscheduler-service/src/test/java/queue/TaskQueueZKImplTest.java
similarity index 99%
rename from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java
rename to dolphinscheduler-service/src/test/java/queue/TaskQueueZKImplTest.java
index b34a7d6..d29c5aa 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java
+++ b/dolphinscheduler-service/src/test/java/queue/TaskQueueZKImplTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.queue;
+package queue;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.IpUtils;
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java b/dolphinscheduler-service/src/test/java/queue/ZKServer.java
similarity index 99%
rename from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java
rename to dolphinscheduler-service/src/test/java/queue/ZKServer.java
index fc39e62..65fb95c 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java
+++ b/dolphinscheduler-service/src/test/java/queue/ZKServer.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.zk;
+package queue;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue
index 682dd5b..e05de8e 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue
@@ -121,6 +121,7 @@
<m-shell
v-if="taskType === 'SHELL'"
@on-params="_onParams"
+ @on-cache-params="_onCacheParams"
ref="SHELL"
:backfill-item="backfillItem">
</m-shell>
@@ -128,6 +129,7 @@
<m-sub-process
v-if="taskType === 'SUB_PROCESS'"
@on-params="_onParams"
+ @on-cache-params="_onCacheParams"
@on-set-process-name="_onSetProcessName"
ref="SUB_PROCESS"
:backfill-item="backfillItem">
@@ -136,6 +138,7 @@
<m-procedure
v-if="taskType === 'PROCEDURE'"
@on-params="_onParams"
+ @on-cache-params="_onCacheParams"
ref="PROCEDURE"
:backfill-item="backfillItem">
</m-procedure>
@@ -167,6 +170,7 @@
<m-mr
v-if="taskType === 'MR'"
@on-params="_onParams"
+ @on-cache-params="_onCacheParams"
ref="MR"
:backfill-item="backfillItem">
</m-mr>
@@ -174,6 +178,7 @@
<m-python
v-if="taskType === 'PYTHON'"
@on-params="_onParams"
+ @on-cache-params="_onCacheParams"
ref="PYTHON"
:backfill-item="backfillItem">
</m-python>
@@ -181,6 +186,7 @@
<m-dependent
v-if="taskType === 'DEPENDENT'"
@on-dependent="_onDependent"
+ @on-cache-dependent="_onCacheDependent"
ref="DEPENDENT"
:backfill-item="backfillItem">
</m-dependent>
@@ -248,6 +254,8 @@
resourcesList: [],
// dependence
dependence: {},
+ // cache dependence
+ cacheDependence: {},
// Current node params data
params: {},
// Running sign
@@ -284,6 +292,12 @@
this.dependence = Object.assign(this.dependence, {}, o)
},
/**
+ * cache dependent
+ */
+ _onCacheDependent (o) {
+ this.cacheDependence = Object.assign(this.cacheDependence, {}, o)
+ },
+ /**
* Task timeout alarm
*/
_onTimeout (o) {
@@ -356,9 +370,10 @@
type: this.taskType,
id: this.id,
name: this.name,
+ params: this.params,
description: this.description,
runFlag: this.runFlag,
- dependence: this.dependence,
+ dependence: this.cacheDependence,
maxRetryTimes: this.maxRetryTimes,
retryInterval: this.retryInterval,
timeout: this.timeout,
@@ -522,6 +537,7 @@
this.params = o.params || {}
this.dependence = o.dependence || {}
+ this.cacheDependence = o.dependence || {}
}
this.isContentBox = true
@@ -551,7 +567,7 @@
name: this.name,
description: this.description,
runFlag: this.runFlag,
- dependence: this.dependence,
+ dependence: this.cacheDependence,
maxRetryTimes: this.maxRetryTimes,
retryInterval: this.retryInterval,
timeout: this.timeout,
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/dependent.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/dependent.vue
index cca9ec7..79d127a 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/dependent.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/dependent.vue
@@ -131,6 +131,9 @@
setTimeout(() => {
this.isLoading = false
}, 600)
+ },
+ cacheDependence (val) {
+ this.$emit('on-cache-dependent', val)
}
},
beforeCreate () {
@@ -151,7 +154,19 @@
},
destroyed () {
},
- computed: {},
+ computed: {
+ cacheDependence () {
+ return {
+ relation: this.relation,
+ dependTaskList: _.map(this.dependTaskList, v => {
+ return {
+ relation: v.relation,
+ dependItemList: _.map(v.dependItemList, v1 => _.omit(v1, ['depTasksList', 'state', 'dateValueList']))
+ }
+ })
+ }
+ }
+ },
components: { mListBox, mDependItemList }
}
</script>
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
index f0d69ec..03e53fe 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
@@ -379,7 +379,7 @@
// Non-null objects represent backfill
if (!_.isEmpty(o)) {
this.mainClass = o.params.mainClass || ''
- this.mainJar = o.params.mainJar.res || ''
+ this.mainJar = o.params.mainJar && o.params.mainJar.res ? o.params.mainJar.res : ''
this.deployMode = o.params.deployMode || ''
this.slot = o.params.slot || 1
this.taskManager = o.params.taskManager || '2'
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue
index fa73e9b..706a35f 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue
@@ -91,6 +91,7 @@
<m-resources
ref="refResources"
@on-resourcesData="_onResourcesData"
+ @on-cache-resourcesData="_onCacheResourcesData"
:resource-list="resourceList">
</m-resources>
</div>
@@ -127,6 +128,8 @@
mainJarList: [],
// Resource(list)
resourceList: [],
+ // Cache ResourceList
+ cacheResourceList: [],
// Custom parameter
localParams: [],
// Command line argument
@@ -157,6 +160,12 @@
this.resourceList = a
},
/**
+ * cache resourceList
+ */
+ _onCacheResourcesData (a) {
+ this.cacheResourceList = a
+ },
+ /**
* verification
*/
_verification () {
@@ -220,6 +229,25 @@
if (type === 'PYTHON') {
this.mainClass = ''
}
+ },
+ //Watch the cacheParams
+ cacheParams (val) {
+ this.$emit('on-cache-params', val);
+ }
+ },
+ computed: {
+ cacheParams () {
+ return {
+ mainClass: this.mainClass,
+ mainJar: {
+ res: this.mainJar
+ },
+ resourceList: this.cacheResourceList,
+ localParams: this.localParams,
+ mainArgs: this.mainArgs,
+ others: this.others,
+ programType: this.programType
+ }
}
},
created () {
@@ -238,6 +266,7 @@
let resourceList = o.params.resourceList || []
if (resourceList.length) {
this.resourceList = resourceList
+ this.cacheResourceList = resourceList
}
// backfill localParams
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/procedure.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/procedure.vue
index d55f18a..e84f37d 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/procedure.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/procedure.vue
@@ -70,7 +70,9 @@
// Data source type
type: '',
// data source
- datasource: ''
+ datasource: '',
+ // Return to the selected data source
+ rtDatasource: ''
}
},
mixins: [disabledState],
@@ -83,7 +85,7 @@
*/
_onDsData (o) {
this.type = o.type
- this.datasource = o.datasource
+ this.rtDatasource = o.datasource
},
/**
* return udp
@@ -112,14 +114,29 @@
// storage
this.$emit('on-params', {
type: this.type,
- datasource: this.datasource,
+ datasource: this.rtDatasource,
method: this.method,
localParams: this.localParams
})
return true
}
},
- watch: {},
+ watch: {
+ //Watch the cacheParams
+ cacheParams (val) {
+ this.$emit('on-cache-params', val);
+ }
+ },
+ computed: {
+ cacheParams () {
+ return {
+ type: this.type,
+ datasource: this.rtDatasource,
+ method: this.method,
+ localParams: this.localParams
+ }
+ }
+ },
created () {
let o = this.backfillItem
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue
index 39a7cd8..e565b4a 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue
@@ -31,6 +31,7 @@
<m-resources
ref="refResources"
@on-resourcesData="_onResourcesData"
+ @on-cache-resourcesData="_onCacheResourcesData"
:resource-list="resourceList">
</m-resources>
</div>
@@ -69,7 +70,9 @@
// Custom parameter
localParams: [],
// resource(list)
- resourceList: []
+ resourceList: [],
+ // Cache ResourceList
+ cacheResourceList: []
}
},
mixins: [disabledState],
@@ -90,6 +93,12 @@
this.resourceList = a
},
/**
+ * cache resourceList
+ */
+ _onCacheResourcesData (a) {
+ this.cacheResourceList = a
+ },
+ /**
* verification
*/
_verification () {
@@ -142,18 +151,33 @@
return editor
}
},
- watch: {},
+ watch: {
+ //Watch the cacheParams
+ cacheParams (val) {
+ this.$emit('on-cache-params', val);
+ }
+ },
+ computed: {
+ cacheParams () {
+ return {
+ resourceList: this.cacheResourceList,
+ localParams: this.localParams,
+ rawScript: editor ? editor.getValue() : ''
+ }
+ }
+ },
created () {
let o = this.backfillItem
// Non-null objects represent backfill
if (!_.isEmpty(o)) {
- this.rawScript = o.params.rawScript
+ this.rawScript = o.params.rawScript || ''
// backfill resourceList
let resourceList = o.params.resourceList || []
if (resourceList.length) {
this.resourceList = resourceList
+ this.cacheResourceList = resourceList
}
// backfill localParams
@@ -174,4 +198,4 @@
},
components: { mLocalParams, mListBox, mResources }
}
-</script>
\ No newline at end of file
+</script>
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue
index 17184ee..ad40c58 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue
@@ -37,6 +37,7 @@
<m-resources
ref="refResources"
@on-resourcesData="_onResourcesData"
+ @on-cache-resourcesData="_onCacheResourcesData"
:resource-list="resourceList">
</m-resources>
</div>
@@ -75,7 +76,9 @@
// Custom parameter
localParams: [],
// resource(list)
- resourceList: []
+ resourceList: [],
+ // Cache ResourceList
+ cacheResourceList: []
}
},
mixins: [disabledState],
@@ -119,12 +122,18 @@
},
/**
* return resourceList
- *
+ *
*/
_onResourcesData (a) {
this.resourceList = a
},
/**
+ * cache resourceList
+ */
+ _onCacheResourcesData (a) {
+ this.cacheResourceList = a
+ },
+ /**
* verification
*/
_verification () {
@@ -175,18 +184,33 @@
return editor
}
},
- watch: {},
+ watch: {
+ //Watch the cacheParams
+ cacheParams (val) {
+ this.$emit('on-cache-params', val);
+ }
+ },
+ computed: {
+ cacheParams () {
+ return {
+ resourceList: this.cacheResourceList,
+ localParams: this.localParams,
+ rawScript: editor ? editor.getValue() : ''
+ }
+ }
+ },
created () {
let o = this.backfillItem
// Non-null objects represent backfill
if (!_.isEmpty(o)) {
- this.rawScript = o.params.rawScript
+ this.rawScript = o.params.rawScript || ''
// backfill resourceList
let resourceList = o.params.resourceList || []
if (resourceList.length) {
this.resourceList = resourceList
+ this.cacheResourceList = resourceList
}
// backfill localParams
@@ -229,5 +253,5 @@
right: -12px;
top: -16px;
}
-
+
</style>
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue
index 66a89a4..feef198 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue
@@ -412,7 +412,7 @@
// Non-null objects represent backfill
if (!_.isEmpty(o)) {
this.mainClass = o.params.mainClass || ''
- this.mainJar = o.params.mainJar.res || ''
+ this.mainJar = o.params.mainJar && o.params.mainJar.res ? o.params.mainJar.res : ''
this.deployMode = o.params.deployMode || ''
this.driverCores = o.params.driverCores || 1
this.driverMemory = o.params.driverMemory || '512M'
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sub_process.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sub_process.vue
index ee03513..477038f 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sub_process.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sub_process.vue
@@ -86,7 +86,13 @@
return _.filter(this.processDefinitionList, v => id === v.id)[0].code
}
},
- watch: {},
+ watch: {
+ wdiCurr (val) {
+ this.$emit('on-cache-params', {
+ processDefinitionId: this.wdiCurr
+ })
+ }
+ },
created () {
let processListS = _.cloneDeep(this.store.state.dag.processListS)
let id = this.router.history.current.params.id || null
@@ -115,4 +121,4 @@
mounted () {
}
}
-</script>
\ No newline at end of file
+</script>
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue
index 5550864..53939f3 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue
@@ -40,6 +40,9 @@
<th scope="col">
<span>{{$t('Description')}}</span>
</th>
+ <th scope="col" width="130">
+ <span>{{$t('Modify User')}}</span>
+ </th>
<th scope="col" width="90">
<span>{{$t('Timing state')}}</span>
</th>
@@ -73,6 +76,10 @@
<span v-else>-</span>
</td>
<td>
+ <span v-if="item.modifyBy">{{item.modifyBy}}</span>
+ <span v-else>-</span>
+ </td>
+ <td>
<span v-if="item.scheduleReleaseState === 'OFFLINE'">{{$t('offline')}}</span>
<span v-if="item.scheduleReleaseState === 'ONLINE'">{{$t('online')}}</span>
<span v-if="!item.scheduleReleaseState">-</span>
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
index 6e8c113..0402d7e 100644
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
@@ -518,4 +518,5 @@ export default {
'SpeedRecord': 'speed(record count)',
'0 means unlimited by byte': '0 means unlimited',
'0 means unlimited by count': '0 means unlimited',
+ 'Modify User': 'Modify User'
}
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
index 52a9877..95eb4a1 100644
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
@@ -518,4 +518,5 @@ export default {
'SpeedRecord': '限流(记录数)',
'0 means unlimited by byte': 'KB,0代表不限制',
'0 means unlimited by count': '0代表不限制',
+ 'Modify User': '修改用户'
}
diff --git a/pom.xml b/pom.xml
index f6009dc..23f4a24 100644
--- a/pom.xml
+++ b/pom.xml
@@ -348,6 +348,7 @@
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.connector.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/sql/dolphinscheduler-postgre.sql b/sql/dolphinscheduler-postgre.sql
index b3c61eb..c68fd17 100644
--- a/sql/dolphinscheduler-postgre.sql
+++ b/sql/dolphinscheduler-postgre.sql
@@ -319,6 +319,7 @@ CREATE TABLE t_ds_process_definition (
timeout int DEFAULT '0' ,
tenant_id int NOT NULL DEFAULT '-1' ,
update_time timestamp DEFAULT NULL ,
+ modify_by varchar(36) DEFAULT '' ,
PRIMARY KEY (id)
) ;
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index fec2771..ea0f9cb 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -366,6 +366,7 @@ CREATE TABLE `t_ds_process_definition` (
`timeout` int(11) DEFAULT '0' COMMENT 'time out',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
+ `modify_by` varchar(36) DEFAULT '' COMMENT 'modify user',
PRIMARY KEY (`id`),
KEY `process_definition_index` (`project_id`,`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql
similarity index 51%
copy from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java
copy to sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql
index 5c3db2d..9fe246a 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java
+++ b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql
@@ -13,31 +13,25 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
-package org.apache.dolphinscheduler.common.zk;
+*/
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
+-- uc_dolphin_T_t_ds_process_definition_A_modify_by
+drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_definition_A_modify_by;
+delimiter d//
+CREATE PROCEDURE uc_dolphin_T_t_ds_process_definition_A_modify_by()
+ BEGIN
+ IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+ WHERE TABLE_NAME='t_ds_process_definition'
+ AND TABLE_SCHEMA=(SELECT DATABASE())
+ AND COLUMN_NAME ='modify_by')
+ THEN
+ ALTER TABLE t_ds_process_definition ADD `modify_by` varchar(36) DEFAULT '' COMMENT 'modify user';
+ END IF;
+ END;
-/**
- * demo for using zkServer
- */
-public class TestZk {
+d//
- @Before
- public void before(){
- ZKServer.start();
- }
-
- @Test
- public void test(){
- Assert.assertTrue(ZKServer.isStarted());
- }
-
- @After
- public void after(){
- ZKServer.stop();
- }
-}
+delimiter ;
+CALL uc_dolphin_T_t_ds_process_definition_A_modify_by;
+DROP PROCEDURE uc_dolphin_T_t_ds_process_definition_A_modify_by;
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_dml.sql
similarity index 64%
copy from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java
copy to sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_dml.sql
index 5c3db2d..38964cc 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java
+++ b/sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_dml.sql
@@ -13,31 +13,4 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
-package org.apache.dolphinscheduler.common.zk;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * demo for using zkServer
- */
-public class TestZk {
-
- @Before
- public void before(){
- ZKServer.start();
- }
-
- @Test
- public void test(){
- Assert.assertTrue(ZKServer.isStarted());
- }
-
- @After
- public void after(){
- ZKServer.stop();
- }
-}
+*/
\ No newline at end of file
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql
similarity index 55%
rename from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java
rename to sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql
index 5c3db2d..7fc1290 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java
+++ b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql
@@ -13,31 +13,22 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
-package org.apache.dolphinscheduler.common.zk;
+*/
+-- uc_dolphin_T_t_ds_process_definition_A_modify_by
+delimiter d//
+CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_process_definition_A_modify_by() RETURNS void AS $$
+BEGIN
+ IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+ WHERE TABLE_NAME='t_ds_process_definition'
+ AND COLUMN_NAME ='modify_by')
+ THEN
+ ALTER TABLE t_ds_process_definition ADD COLUMN modify_by varchar(36) DEFAULT '';
+ END IF;
+END;
+$$ LANGUAGE plpgsql;
+d//
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+delimiter ;
+SELECT uc_dolphin_T_t_ds_process_definition_A_modify_by();
+DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_process_definition_A_modify_by();
-/**
- * demo for using zkServer
- */
-public class TestZk {
-
- @Before
- public void before(){
- ZKServer.start();
- }
-
- @Test
- public void test(){
- Assert.assertTrue(ZKServer.isStarted());
- }
-
- @After
- public void after(){
- ZKServer.stop();
- }
-}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZkServer.java b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql
similarity index 64%
rename from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZkServer.java
rename to sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql
index d1a0526..38964cc 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZkServer.java
+++ b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_dml.sql
@@ -13,31 +13,4 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
-package org.apache.dolphinscheduler.common.zk;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * demo for using zkServer
- */
-public class TestZkServer {
-
- @Before
- public void before(){
- ZKServer.start();
- }
-
- @Test
- public void test(){
- Assert.assertTrue(ZKServer.isStarted());
- }
-
- @After
- public void after(){
- ZKServer.stop();
- }
-}
+*/
\ No newline at end of file