You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/01/11 07:05:38 UTC
[incubator-dolphinscheduler] branch dev updated: refactor import
process (#1804)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 1c5c8b7 refactor import process (#1804)
1c5c8b7 is described below
commit 1c5c8b75b94237b0b304a0fa72d417cac18661f1
Author: Yelli <51...@users.noreply.github.com>
AuthorDate: Sat Jan 11 15:05:30 2020 +0800
refactor import process (#1804)
* refactor import process
* add refactor import process UT
* add import process UT
* add null check UT for import process metadata
* add UT for import process
* modify dependentparam UT
* modify testAddImportDependentSpecialParam
* modify DependentParamTest
* modify processDefinitionService UT
---
.../dolphinscheduler/api/dto/ProcessMeta.java | 8 +-
.../api/service/ProcessDefinitionService.java | 370 +++++++++------------
.../dolphinscheduler/api/utils/FileUtils.java | 28 +-
.../api/utils/exportprocess/DataSourceParam.java | 23 +-
.../api/utils/exportprocess/DependentParam.java | 39 ++-
...sAddTaskParam.java => ProcessAddTaskParam.java} | 13 +-
.../utils/exportprocess/TaskNodeParamFactory.java | 6 +-
.../api/service/ProcessDefinitionServiceTest.java | 121 ++++++-
.../dolphinscheduler/api/utils/FileUtilsTest.java | 31 +-
.../utils/exportprocess/DataSourceParamTest.java | 47 ++-
.../utils/exportprocess/DependentParamTest.java | 77 ++++-
11 files changed, 500 insertions(+), 263 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java
index 7c4a5cf..f14d8df 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java
@@ -59,7 +59,7 @@ public class ProcessMeta {
/**
* warning group id
*/
- private int scheduleWarningGroupId;
+ private Integer scheduleWarningGroupId;
/**
* warning group name
@@ -99,7 +99,7 @@ public class ProcessMeta {
/**
* worker group id
*/
- private int scheduleWorkerGroupId;
+ private Integer scheduleWorkerGroupId;
/**
* worker group name
@@ -165,7 +165,7 @@ public class ProcessMeta {
this.scheduleWarningType = scheduleWarningType;
}
- public int getScheduleWarningGroupId() {
+ public Integer getScheduleWarningGroupId() {
return scheduleWarningGroupId;
}
@@ -229,7 +229,7 @@ public class ProcessMeta {
this.scheduleProcessInstancePriority = scheduleProcessInstancePriority;
}
- public int getScheduleWorkerGroupId() {
+ public Integer getScheduleWorkerGroupId() {
return scheduleWorkerGroupId;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index 80967ac..2d462b3 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -27,9 +27,10 @@ import org.apache.dolphinscheduler.api.dto.treeview.Instance;
import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
+import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
+import org.apache.dolphinscheduler.api.utils.exportprocess.ProcessAddTaskParam;
import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory;
-import org.apache.dolphinscheduler.api.utils.exportprocess.exportProcessAddTaskParam;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.graph.DAG;
@@ -56,9 +57,7 @@ import org.springframework.web.multipart.MultipartFile;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -98,9 +97,6 @@ public class ProcessDefinitionService extends BaseDAGService {
private ProcessDao processDao;
@Autowired
- private DataSourceMapper dataSourceMapper;
-
- @Autowired
private WorkerGroupMapper workerGroupMapper;
/**
@@ -540,7 +536,7 @@ public class ProcessDefinitionService extends BaseDAGService {
*/
public String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) {
//correct task param which has data source or dependent param
- String correctProcessDefinitionJson = addTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson());
+ String correctProcessDefinitionJson = addExportTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson());
processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson);
//export process metadata
@@ -586,7 +582,7 @@ public class ProcessDefinitionService extends BaseDAGService {
* @param processDefinitionJson processDefinitionJson
* @return correct processDefinitionJson
*/
- public String addTaskNodeSpecialParam(String processDefinitionJson) {
+ public String addExportTaskNodeSpecialParam(String processDefinitionJson) {
JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson);
JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
@@ -595,9 +591,9 @@ public class ProcessDefinitionService extends BaseDAGService {
if (StringUtils.isNotEmpty(taskNode.getString("type"))) {
String taskType = taskNode.getString("type");
- exportProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
+ ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
if (null != addTaskParam) {
- addTaskParam.addSpecialParam(taskNode);
+ addTaskParam.addExportSpecialParam(taskNode);
}
}
}
@@ -606,24 +602,6 @@ public class ProcessDefinitionService extends BaseDAGService {
}
/**
- * check task if has dependent
- * @param taskType task type
- * @return if task has dependent return true else false
- */
- private boolean checkTaskHasDependent(String taskType) {
- return taskType.equals(TaskType.DEPENDENT.name());
- }
-
- /**
- * check task if has data source info
- * @param taskType task type
- * @return if task has data source return true else false
- */
- private boolean checkTaskHasDataSource(String taskType) {
- return taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name());
- }
-
- /**
* check task if has sub process
* @param taskType task type
* @return if task has sub process return true else false
@@ -642,206 +620,168 @@ public class ProcessDefinitionService extends BaseDAGService {
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) {
Map<String, Object> result = new HashMap<>(5);
+ String processMetaJson = FileUtils.file2String(file);
+ ProcessMeta processMeta = JSONUtils.parseObject(processMetaJson, ProcessMeta.class);
- JSONObject json;
+ //check file content
+ if (null == processMeta) {
+ putMsg(result, Status.DATA_IS_NULL, "fileContent");
+ return result;
+ }
+ if (StringUtils.isEmpty(processMeta.getProjectName())) {
+ putMsg(result, Status.DATA_IS_NULL, "projectName");
+ return result;
+ }
+ if (StringUtils.isEmpty(processMeta.getProcessDefinitionName())) {
+ putMsg(result, Status.DATA_IS_NULL, "processDefinitionName");
+ return result;
+ }
+ if (StringUtils.isEmpty(processMeta.getProcessDefinitionJson())) {
+ putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson");
+ return result;
+ }
- //read workflow json
- try(InputStreamReader inputStreamReader = new InputStreamReader( file.getInputStream(), StandardCharsets.UTF_8)) {
- BufferedReader streamReader = new BufferedReader(inputStreamReader);
- StringBuilder respomseStrBuilder = new StringBuilder();
- String inputStr;
+ //deal with process name
+ String processDefinitionName = processMeta.getProcessDefinitionName();
+ //use currentProjectName to query
+ Project targetProject = projectMapper.queryByName(currentProjectName);
+ if(null != targetProject){
+ processDefinitionName = recursionProcessDefinitionName(targetProject.getId(),
+ processDefinitionName, 1);
+ }
- while ((inputStr = streamReader.readLine())!= null){
- respomseStrBuilder.append( inputStr );
- }
+ //add special task param
+ String importProcessParam = addImportTaskNodeParam(loginUser, processMeta.getProcessDefinitionJson(), targetProject);
- json = JSONObject.parseObject( respomseStrBuilder.toString() );
-
- if(null != json){
- String originProjectName = null;
- String processDefinitionName = null;
- String processDefinitionJson = null;
- String processDefinitionDesc = null;
- String processDefinitionLocations = null;
- String processDefinitionConnects = null;
-
- String scheduleWarningType = null;
- String scheduleWarningGroupId = null;
- String scheduleStartTime = null;
- String scheduleEndTime = null;
- String scheduleCrontab = null;
- String scheduleFailureStrategy = null;
- String scheduleReleaseState = null;
- String scheduleProcessInstancePriority = null;
- String scheduleWorkerGroupId = null;
- String scheduleWorkerGroupName = null;
-
- if (Objects.nonNull(json.get("projectName"))) {
- originProjectName = json.get("projectName").toString();
- } else {
- putMsg(result, Status.DATA_IS_NULL, "processDefinitionName");
- return result;
- }
- if (Objects.nonNull(json.get("processDefinitionName"))) {
- processDefinitionName = json.get("processDefinitionName").toString();
- } else {
- putMsg(result, Status.DATA_IS_NULL, "processDefinitionName");
- return result;
- }
- if (Objects.nonNull(json.get("processDefinitionJson"))) {
- processDefinitionJson = json.get("processDefinitionJson").toString();
- } else {
- putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson");
- return result;
- }
- if (Objects.nonNull(json.get("processDefinitionDescription"))) {
- processDefinitionDesc = json.get("processDefinitionDescription").toString();
- }
- if (Objects.nonNull(json.get("processDefinitionLocations"))) {
- processDefinitionLocations = json.get("processDefinitionLocations").toString();
- }
- if (Objects.nonNull(json.get("processDefinitionConnects"))) {
- processDefinitionConnects = json.get("processDefinitionConnects").toString();
- }
-
- //check user access for org project
- Project originProject = projectMapper.queryByName(originProjectName);
- Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, originProject, originProjectName);
- Status resultStatus = (Status) checkResult.get(Constants.STATUS);
+ Map<String, Object> createProcessResult;
+ try {
+ createProcessResult = createProcessDefinition(loginUser
+ ,currentProjectName,
+ processDefinitionName,
+ importProcessParam,
+ processMeta.getProcessDefinitionDescription(),
+ processMeta.getProcessDefinitionLocations(),
+ processMeta.getProcessDefinitionConnects());
+ } catch (JsonProcessingException e) {
+ logger.error("import process meta json data: {}", e.getMessage(), e);
+ putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
+ return result;
+ }
- if (resultStatus == Status.SUCCESS) {
- //use currentProjectName to query
- Project targetProject = projectMapper.queryByName(currentProjectName);
- if(null != targetProject){
- processDefinitionName = recursionProcessDefinitionName(targetProject.getId(), processDefinitionName, 1);
- }
+ putMsg(result, Status.SUCCESS);
+ //create process definition
+ Integer processDefinitionId = null;
+ if (null != createProcessResult && Objects.nonNull(createProcessResult.get("processDefinitionId"))) {
+ processDefinitionId = Integer.parseInt(createProcessResult.get("processDefinitionId").toString());
+ }
+ //scheduler param
+ if (null != processMeta.getScheduleCrontab() && null != processDefinitionId) {
+ int scheduleInsert = importProcessSchedule(loginUser,
+ currentProjectName,
+ processMeta,
+ processDefinitionName,
+ processDefinitionId);
+
+ if (0 == scheduleInsert) {
+ putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
+ return result;
+ }
+ }
- JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson);
- JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
-
- for (int j = 0; j < jsonArray.size(); j++) {
- JSONObject taskNode = jsonArray.getJSONObject(j);
- String taskType = taskNode.getString("type");
- if(checkTaskHasDataSource(taskType)) {
- JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
- List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName"));
- if (!dataSources.isEmpty()) {
- DataSource dataSource = dataSources.get(0);
- sqlParameters.put("datasource", dataSource.getId());
- }
- taskNode.put("params", sqlParameters);
- }else if(checkTaskHasDependent(taskType)){
- JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence"));
- if(dependentParameters != null){
- JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList");
- for (int h = 0; h < dependTaskList.size(); h++) {
- JSONObject dependentTaskModel = dependTaskList.getJSONObject(h);
- JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList");
- for (int k = 0; k < dependItemList.size(); k++) {
- JSONObject dependentItem = dependItemList.getJSONObject(k);
- Project dependentItemProject = projectMapper.queryByName(dependentItem.getString("projectName"));
- if(dependentItemProject != null){
- ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.getString("definitionName"));
- if(definition != null){
- dependentItem.put("projectId",dependentItemProject.getId());
- dependentItem.put("definitionId",definition.getId());
- }
- }
- }
- }
- taskNode.put("dependence", dependentParameters);
- }
- }
- }
+ return result;
+ }
- //recursive sub-process parameter correction map key for old process id value for new process id
- Map<Integer, Integer> subProcessIdMap = new HashMap<>(20);
+ /**
+ * import process add special task param
+ * @param loginUser login user
+ * @param processDefinitionJson process definition json
+ * @param targetProject target project
+ * @return import process param
+ */
+ private String addImportTaskNodeParam(User loginUser, String processDefinitionJson, Project targetProject) {
+ JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson);
+ JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
+ //add sql and dependent param
+ for (int i = 0; i < jsonArray.size(); i++) {
+ JSONObject taskNode = jsonArray.getJSONObject(i);
+ String taskType = taskNode.getString("type");
+ ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
+ if (null != addTaskParam) {
+ addTaskParam.addImportSpecialParam(taskNode);
+ }
+ }
- List<Object> subProcessList = jsonArray.stream()
- .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).getString("type")))
- .collect(Collectors.toList());
+ //recursive sub-process parameter correction map key for old process id value for new process id
+ Map<Integer, Integer> subProcessIdMap = new HashMap<>(20);
- if (!subProcessList.isEmpty()) {
- importSubProcess(loginUser, targetProject, jsonArray, subProcessIdMap);
- }
+ List<Object> subProcessList = jsonArray.stream()
+ .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).getString("type")))
+ .collect(Collectors.toList());
- jsonObject.put("tasks", jsonArray);
+ if (CollectionUtils.isNotEmpty(subProcessList)) {
+ importSubProcess(loginUser, targetProject, jsonArray, subProcessIdMap);
+ }
- Map<String, Object> createProcessDefinitionResult = createProcessDefinition(loginUser,currentProjectName,processDefinitionName,jsonObject.toString(),processDefinitionDesc,processDefinitionLocations,processDefinitionConnects);
- Integer processDefinitionId = null;
- if (Objects.nonNull(createProcessDefinitionResult.get("processDefinitionId"))) {
- processDefinitionId = Integer.parseInt(createProcessDefinitionResult.get("processDefinitionId").toString());
- }
- if (Objects.nonNull(json.get("scheduleCrontab")) && processDefinitionId != null) {
- Date now = new Date();
- Schedule scheduleObj = new Schedule();
- scheduleObj.setProjectName(currentProjectName);
- scheduleObj.setProcessDefinitionId(processDefinitionId);
- scheduleObj.setProcessDefinitionName(processDefinitionName);
- scheduleObj.setCreateTime(now);
- scheduleObj.setUpdateTime(now);
- scheduleObj.setUserId(loginUser.getId());
- scheduleObj.setUserName(loginUser.getUserName());
-
-
- scheduleCrontab = json.get("scheduleCrontab").toString();
- scheduleObj.setCrontab(scheduleCrontab);
- if (Objects.nonNull(json.get("scheduleStartTime"))) {
- scheduleStartTime = json.get("scheduleStartTime").toString();
- scheduleObj.setStartTime(DateUtils.stringToDate(scheduleStartTime));
- }
- if (Objects.nonNull(json.get("scheduleEndTime"))) {
- scheduleEndTime = json.get("scheduleEndTime").toString();
- scheduleObj.setEndTime(DateUtils.stringToDate(scheduleEndTime));
- }
- if (Objects.nonNull(json.get("scheduleWarningType"))) {
- scheduleWarningType = json.get("scheduleWarningType").toString();
- scheduleObj.setWarningType(WarningType.valueOf(scheduleWarningType));
- }
- if (Objects.nonNull(json.get("scheduleWarningGroupId"))) {
- scheduleWarningGroupId = json.get("scheduleWarningGroupId").toString();
- scheduleObj.setWarningGroupId(Integer.parseInt(scheduleWarningGroupId));
- }
- if (Objects.nonNull(json.get("scheduleFailureStrategy"))) {
- scheduleFailureStrategy = json.get("scheduleFailureStrategy").toString();
- scheduleObj.setFailureStrategy(FailureStrategy.valueOf(scheduleFailureStrategy));
- }
- if (Objects.nonNull(json.get("scheduleReleaseState"))) {
- scheduleReleaseState = json.get("scheduleReleaseState").toString();
- scheduleObj.setReleaseState(ReleaseState.valueOf(scheduleReleaseState));
- }
- if (Objects.nonNull(json.get("scheduleProcessInstancePriority"))) {
- scheduleProcessInstancePriority = json.get("scheduleProcessInstancePriority").toString();
- scheduleObj.setProcessInstancePriority(Priority.valueOf(scheduleProcessInstancePriority));
- }
- if (Objects.nonNull(json.get("scheduleWorkerGroupId"))) {
- scheduleWorkerGroupId = json.get("scheduleWorkerGroupId").toString();
- if(scheduleWorkerGroupId != null){
- scheduleObj.setWorkerGroupId(Integer.parseInt(scheduleWorkerGroupId));
- }else{
- if (Objects.nonNull(json.get("scheduleWorkerGroupName"))) {
- scheduleWorkerGroupName = json.get("scheduleWorkerGroupName").toString();
- List<WorkerGroup> workerGroups = workerGroupMapper.queryWorkerGroupByName(scheduleWorkerGroupName);
- if(!workerGroups.isEmpty()){
- scheduleObj.setWorkerGroupId(workerGroups.get(0).getId());
- }
- }
- }
- }
- scheduleMapper.insert(scheduleObj);
- }
+ jsonObject.put("tasks", jsonArray);
+ return jsonObject.toString();
+ }
- putMsg(result, Status.SUCCESS);
- return result;
+ /**
+ * import process schedule
+ * @param loginUser login user
+ * @param currentProjectName current project name
+ * @param processMeta process meta data
+ * @param processDefinitionName process definition name
+ * @param processDefinitionId process definition id
+ * @return insert schedule flag
+ */
+ public int importProcessSchedule(User loginUser, String currentProjectName, ProcessMeta processMeta,
+ String processDefinitionName, Integer processDefinitionId) {
+ Date now = new Date();
+ Schedule scheduleObj = new Schedule();
+ scheduleObj.setProjectName(currentProjectName);
+ scheduleObj.setProcessDefinitionId(processDefinitionId);
+ scheduleObj.setProcessDefinitionName(processDefinitionName);
+ scheduleObj.setCreateTime(now);
+ scheduleObj.setUpdateTime(now);
+ scheduleObj.setUserId(loginUser.getId());
+ scheduleObj.setUserName(loginUser.getUserName());
+
+ scheduleObj.setCrontab(processMeta.getScheduleCrontab());
+
+ if (null != processMeta.getScheduleStartTime()) {
+ scheduleObj.setStartTime(DateUtils.stringToDate(processMeta.getScheduleStartTime()));
+ }
+ if (null != processMeta.getScheduleEndTime()) {
+ scheduleObj.setEndTime(DateUtils.stringToDate(processMeta.getScheduleEndTime()));
+ }
+ if (null != processMeta.getScheduleWarningType()) {
+ scheduleObj.setWarningType(WarningType.valueOf(processMeta.getScheduleWarningType()));
+ }
+ if (null != processMeta.getScheduleWarningGroupId()) {
+ scheduleObj.setWarningGroupId(processMeta.getScheduleWarningGroupId());
+ }
+ if (null != processMeta.getScheduleFailureStrategy()) {
+ scheduleObj.setFailureStrategy(FailureStrategy.valueOf(processMeta.getScheduleFailureStrategy()));
+ }
+ if (null != processMeta.getScheduleReleaseState()) {
+ scheduleObj.setReleaseState(ReleaseState.valueOf(processMeta.getScheduleReleaseState()));
+ }
+ if (null != processMeta.getScheduleProcessInstancePriority()) {
+ scheduleObj.setProcessInstancePriority(Priority.valueOf(processMeta.getScheduleProcessInstancePriority()));
+ }
+ if (null != processMeta.getScheduleWorkerGroupId()) {
+ scheduleObj.setWorkerGroupId(processMeta.getScheduleWorkerGroupId());
+ } else {
+ if (null != processMeta.getScheduleWorkerGroupName()) {
+ List<WorkerGroup> workerGroups = workerGroupMapper.queryWorkerGroupByName(processMeta.getScheduleWorkerGroupName());
+ if(CollectionUtils.isNotEmpty(workerGroups)){
+ scheduleObj.setWorkerGroupId(workerGroups.get(0).getId());
}
- }else{
- putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
- return result;
}
- } catch (IOException e) {
- throw new RuntimeException(e.getMessage(), e);
}
- return result;
+
+ return scheduleMapper.insert(scheduleObj);
}
/**
@@ -873,7 +813,7 @@ public class ProcessDefinitionService extends BaseDAGService {
.filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).getString("type")))
.collect(Collectors.toList());
- if (!subProcessList.isEmpty()) {
+ if (CollectionUtils.isNotEmpty(subProcessList)) {
importSubProcess(loginUser, targetProject, subJsonArray, subProcessIdMap);
//sub process processId correct
if (!subProcessIdMap.isEmpty()) {
@@ -1307,7 +1247,7 @@ public class ProcessDefinitionService extends BaseDAGService {
private String recursionProcessDefinitionName(Integer projectId,String processDefinitionName,int num){
ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(projectId, processDefinitionName);
if (processDefinition != null) {
- if(num>1){
+ if(num > 1){
String str = processDefinitionName.substring(0,processDefinitionName.length() - 3);
processDefinitionName = str + "("+num+")";
}else{
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java
index 9c0e339..f88d261 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java
@@ -22,9 +22,12 @@ import org.springframework.core.io.Resource;
import org.springframework.core.io.UrlResource;
import org.springframework.web.multipart.MultipartFile;
+import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.net.MalformedURLException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -38,7 +41,7 @@ public class FileUtils {
/**
* copy source file to target file
*
- * @param file file
+ * @param file file
* @param destFilename destination file name
*/
@@ -77,4 +80,27 @@ public class FileUtils {
}
return null;
}
+
+ /**
+ * file convert String
+ * @param file MultipartFile file
+ * @return file content string
+ */
+ public static String file2String(MultipartFile file) {
+ StringBuilder strBuilder = new StringBuilder();
+
+ try (InputStreamReader inputStreamReader = new InputStreamReader(file.getInputStream(), StandardCharsets.UTF_8)) {
+ BufferedReader streamReader = new BufferedReader(inputStreamReader);
+ String inputStr;
+
+ while ((inputStr = streamReader.readLine()) != null) {
+ strBuilder.append(inputStr);
+ }
+
+ } catch (IOException e) {
+ logger.error("file convert to string failed: {}", file.getName());
+ }
+
+ return strBuilder.toString();
+ }
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java
index c013aac..00d9577 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java
@@ -25,11 +25,13 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.List;
+
/**
* task node add datasource param strategy
*/
@Service
-public class DataSourceParam implements exportProcessAddTaskParam, InitializingBean {
+public class DataSourceParam implements ProcessAddTaskParam, InitializingBean {
@Autowired
private DataSourceMapper dataSourceMapper;
@@ -40,7 +42,7 @@ public class DataSourceParam implements exportProcessAddTaskParam, InitializingB
* @return task node json object
*/
@Override
- public JSONObject addSpecialParam(JSONObject taskNode) {
+ public JSONObject addExportSpecialParam(JSONObject taskNode) {
// add sqlParameters
JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
DataSource dataSource = dataSourceMapper.selectById((Integer) sqlParameters.get("datasource"));
@@ -52,6 +54,23 @@ public class DataSourceParam implements exportProcessAddTaskParam, InitializingB
return taskNode;
}
+ /**
+ * import process add datasource params
+ * @param taskNode task node json object
+ * @return task node json object
+ */
+ @Override
+ public JSONObject addImportSpecialParam(JSONObject taskNode) {
+ JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
+ List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName"));
+ if (!dataSources.isEmpty()) {
+ DataSource dataSource = dataSources.get(0);
+ sqlParameters.put("datasource", dataSource.getId());
+ }
+ taskNode.put("params", sqlParameters);
+ return taskNode;
+ }
+
/**
* put datasource strategy
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java
index bdf202c..b42b3b5 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java
@@ -21,7 +21,9 @@ import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -30,19 +32,22 @@ import org.springframework.stereotype.Service;
* task node add dependent param strategy
*/
@Service
-public class DependentParam implements exportProcessAddTaskParam, InitializingBean {
+public class DependentParam implements ProcessAddTaskParam, InitializingBean {
@Autowired
ProcessDefinitionMapper processDefineMapper;
+ @Autowired
+ ProjectMapper projectMapper;
+
/**
* add dependent param
* @param taskNode task node json object
* @return task node json object
*/
@Override
- public JSONObject addSpecialParam(JSONObject taskNode) {
+ public JSONObject addExportSpecialParam(JSONObject taskNode) {
// add dependent param
JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence"));
@@ -68,6 +73,36 @@ public class DependentParam implements exportProcessAddTaskParam, InitializingBe
}
/**
+ * import process add dependent param
+ * @param taskNode task node json object
+ * @return
+ */
+ @Override
+ public JSONObject addImportSpecialParam(JSONObject taskNode) {
+ JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence"));
+ if(dependentParameters != null){
+ JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList");
+ for (int h = 0; h < dependTaskList.size(); h++) {
+ JSONObject dependentTaskModel = dependTaskList.getJSONObject(h);
+ JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList");
+ for (int k = 0; k < dependItemList.size(); k++) {
+ JSONObject dependentItem = dependItemList.getJSONObject(k);
+ Project dependentItemProject = projectMapper.queryByName(dependentItem.getString("projectName"));
+ if(dependentItemProject != null){
+ ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.getString("definitionName"));
+ if(definition != null){
+ dependentItem.put("projectId",dependentItemProject.getId());
+ dependentItem.put("definitionId",definition.getId());
+ }
+ }
+ }
+ }
+ taskNode.put("dependence", dependentParameters);
+ }
+ return taskNode;
+ }
+
+ /**
* put dependent strategy
*/
@Override
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/exportProcessAddTaskParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java
similarity index 75%
rename from dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/exportProcessAddTaskParam.java
rename to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java
index 5ae1667..b30b777 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/exportProcessAddTaskParam.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java
@@ -19,14 +19,21 @@ package org.apache.dolphinscheduler.api.utils.exportprocess;
import com.alibaba.fastjson.JSONObject;
/**
- * exportProcessAddTaskParam
+ * ProcessAddTaskParam
*/
-public interface exportProcessAddTaskParam {
+public interface ProcessAddTaskParam {
+
+ /**
+ * add export task special param: sql task dependent task
+ * @param taskNode task node json object
+ * @return task node json object
+ */
+ JSONObject addExportSpecialParam(JSONObject taskNode);
/**
* add task special param: sql task dependent task
* @param taskNode task node json object
* @return task node json object
*/
- JSONObject addSpecialParam(JSONObject taskNode);
+ JSONObject addImportSpecialParam(JSONObject taskNode);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java
index f4faa15..b8f7b03 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java
@@ -24,13 +24,13 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class TaskNodeParamFactory {
- private static Map<String, exportProcessAddTaskParam> taskServices = new ConcurrentHashMap<>();
+ private static Map<String, ProcessAddTaskParam> taskServices = new ConcurrentHashMap<>();
- public static exportProcessAddTaskParam getByTaskType(String taskType){
+ public static ProcessAddTaskParam getByTaskType(String taskType){
return taskServices.get(taskType);
}
- static void register(String taskType, exportProcessAddTaskParam addSpecialTaskParam){
+ static void register(String taskType, ProcessAddTaskParam addSpecialTaskParam){
if (null != taskType) {
taskServices.put(taskType, addSpecialTaskParam);
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 82ba43d..b30a919 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -20,9 +20,11 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
+import org.apache.dolphinscheduler.api.dto.ProcessMeta;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.*;
@@ -164,7 +166,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(dataSourceMapper.selectById(1)).thenReturn(getDataSource());
Mockito.when(processDefineMapper.queryByDefineId(2)).thenReturn(getProcessDefinition());
- String corSqlDependentJson = processDefinitionService.addTaskNodeSpecialParam(sqlDependentJson);
+ String corSqlDependentJson = processDefinitionService.addExportTaskNodeSpecialParam(sqlDependentJson);
JSONAssert.assertEquals(sqlDependentJson,corSqlDependentJson,false);
@@ -182,6 +184,62 @@ public class ProcessDefinitionServiceTest {
Assert.assertNotEquals(sqlDependentJson,exportProcessMetaDataStr);
}
+ @Test
+ public void testAddExportTaskNodeSpecialParam() throws JSONException {
+ String shellJson = "{\"globalParams\":[],\"tasks\":[{\"id\":\"tasks-9527\",\"name\":\"shell-1\"," +
+ "\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}," +
+ "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
+ "\"timeout\":{\"strategy\":\"\",\"interval\":1,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
+ "\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
+
+ String resultStr = processDefinitionService.addExportTaskNodeSpecialParam(shellJson);
+ JSONAssert.assertEquals(shellJson, resultStr, false);
+ }
+
+ @Test
+ public void testImportProcessSchedule() {
+ User loginUser = new User();
+ loginUser.setId(1);
+ loginUser.setUserType(UserType.GENERAL_USER);
+
+ String currentProjectName = "test";
+ String processDefinitionName = "test_process";
+ Integer processDefinitionId = 1;
+ Schedule schedule = getSchedule();
+
+ ProcessMeta processMeta = getProcessMeta();
+
+ int insertFlag = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMeta,
+ processDefinitionName, processDefinitionId);
+ Assert.assertEquals(0, insertFlag);
+
+ ProcessMeta processMetaCron = new ProcessMeta();
+ processMetaCron.setScheduleCrontab(schedule.getCrontab());
+
+ int insertFlagCron = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron,
+ processDefinitionName, processDefinitionId);
+ Assert.assertEquals(0, insertFlagCron);
+
+ WorkerGroup workerGroup = new WorkerGroup();
+ workerGroup.setName("ds-test-workergroup");
+ workerGroup.setId(2);
+ List<WorkerGroup> workerGroups = new ArrayList<>();
+ workerGroups.add(workerGroup);
+ Mockito.when(workerGroupMapper.queryWorkerGroupByName("ds-test")).thenReturn(workerGroups);
+
+ processMetaCron.setScheduleWorkerGroupName("ds-test");
+ int insertFlagWorker = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron,
+ processDefinitionName, processDefinitionId);
+ Assert.assertEquals(0, insertFlagWorker);
+
+ Mockito.when(workerGroupMapper.queryWorkerGroupByName("ds-test")).thenReturn(null);
+ int workerNullFlag = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron,
+ processDefinitionName, processDefinitionId);
+ Assert.assertEquals(0, workerNullFlag);
+
+
+ }
+
/**
* import sub process test
*/
@@ -321,9 +379,50 @@ public class ProcessDefinitionServiceTest {
Assert.assertTrue(delete);
+ String processMetaJson = "";
+ improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
+ processMetaJson = "{\"scheduleWorkerGroupId\":-1}";
+ improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
+
+ processMetaJson = "{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\"}";
+ improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
+
+ processMetaJson = "{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\",\"processDefinitionName\":\"test_definition\"}";
+ improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson);
+
+
+ }
+
+ /**
+ * check import process metadata
+ * @param file file
+ * @param loginUser login user
+ * @param currentProjectName current project name
+ * @param processMetaJson process meta json
+ * @throws IOException IO exception
+ */
+ private void improssProcessCheckData(File file, User loginUser, String currentProjectName, String processMetaJson) throws IOException {
+ //check null
+ FileUtils.writeStringToFile(new File("/tmp/task.json"),processMetaJson);
+
+ File fileEmpty = new File("/tmp/task.json");
+
+ FileInputStream fileEmptyInputStream = new FileInputStream("/tmp/task.json");
+
+ MultipartFile multiFileEmpty = new MockMultipartFile(fileEmpty.getName(), fileEmpty.getName(),
+ ContentType.APPLICATION_OCTET_STREAM.toString(), fileEmptyInputStream);
+
+ Map<String, Object> resEmptyProcess = processDefinitionService.importProcessDefinition(loginUser, multiFileEmpty, currentProjectName);
+
+ Assert.assertEquals(Status.DATA_IS_NULL, resEmptyProcess.get(Constants.STATUS));
+
+ boolean deleteFlag = file.delete();
+
+ Assert.assertTrue(deleteFlag);
}
+
/**
* get mock datasource
* @return DataSource
@@ -382,6 +481,26 @@ public class ProcessDefinitionServiceTest {
return schedule;
}
+ /**
+ * get mock processMeta
+ * @return processMeta
+ */
+ private ProcessMeta getProcessMeta() {
+ ProcessMeta processMeta = new ProcessMeta();
+ Schedule schedule = getSchedule();
+ processMeta.setScheduleCrontab(schedule.getCrontab());
+ processMeta.setScheduleStartTime(DateUtils.dateToString(schedule.getStartTime()));
+ processMeta.setScheduleEndTime(DateUtils.dateToString(schedule.getEndTime()));
+ processMeta.setScheduleWarningType(String.valueOf(schedule.getWarningType()));
+ processMeta.setScheduleWarningGroupId(schedule.getWarningGroupId());
+ processMeta.setScheduleFailureStrategy(String.valueOf(schedule.getFailureStrategy()));
+ processMeta.setScheduleReleaseState(String.valueOf(schedule.getReleaseState()));
+ processMeta.setScheduleProcessInstancePriority(String.valueOf(schedule.getProcessInstancePriority()));
+ processMeta.setScheduleWorkerGroupId(schedule.getWorkerGroupId());
+ processMeta.setScheduleWorkerGroupName("workgroup1");
+ return processMeta;
+ }
+
private List<Schedule> getSchedulerList() {
List<Schedule> scheduleList = new ArrayList<>();
scheduleList.add(getSchedule());
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/FileUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/FileUtilsTest.java
index f205d39..1b05eb0 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/FileUtilsTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/FileUtilsTest.java
@@ -17,21 +17,17 @@
package org.apache.dolphinscheduler.api.utils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
+import org.apache.http.entity.ContentType;
+import org.junit.*;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
+import org.springframework.mock.web.MockMultipartFile;
import org.springframework.web.multipart.MultipartFile;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
import static org.junit.Assert.*;
@@ -106,4 +102,23 @@ public class FileUtilsTest {
assertNull(resource1);
}
+
+ @Test
+ public void testFile2String() throws IOException {
+ String content = "123";
+ org.apache.dolphinscheduler.common.utils.FileUtils.writeStringToFile(new File("/tmp/task.json"),content);
+
+ File file = new File("/tmp/task.json");
+ FileInputStream fileInputStream = new FileInputStream("/tmp/task.json");
+ MultipartFile multipartFile = new MockMultipartFile(file.getName(), file.getName(),
+ ContentType.APPLICATION_OCTET_STREAM.toString(), fileInputStream);
+
+ String resultStr = FileUtils.file2String(multipartFile);
+
+ Assert.assertEquals(content, resultStr);
+
+ boolean delete = file.delete();
+
+ Assert.assertTrue(delete);
+ }
}
\ No newline at end of file
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java
index 0a271d9..b8fcd62 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java
@@ -35,25 +35,52 @@ import org.springframework.test.context.junit4.SpringRunner;
public class DataSourceParamTest {
@Test
- public void testAddDependentSpecialParam() throws JSONException {
+ public void testAddExportDependentSpecialParam() throws JSONException {
- String dependentJson = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," +
- "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"," +
- "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\"," +
- "\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\"," +
- "\"cycle\":\"day\",\"dateValue\":\"today\"}]}]}}";
+ String sqlJson = "{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," +
+ "\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," +
+ "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\"" +
+ ",\"localParams\":[],\"connParams\":\"\"," +
+ "\"preStatements\":[],\"postStatements\":[]}," +
+ "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," +
+ "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\"," +
+ "\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," +
+ "\"preTasks\":[\"dependent\"]}";
- JSONObject taskNode = JSONUtils.parseObject(dependentJson);
+ JSONObject taskNode = JSONUtils.parseObject(sqlJson);
if (StringUtils.isNotEmpty(taskNode.getString("type"))) {
String taskType = taskNode.getString("type");
- exportProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
+ ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
- JSONObject dependent = addTaskParam.addSpecialParam(taskNode);
+ JSONObject sql = addTaskParam.addExportSpecialParam(taskNode);
- JSONAssert.assertEquals(taskNode.toString(),dependent.toString(),false);
+ JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false);
}
+ }
+
+ @Test
+ public void testAddImportDependentSpecialParam() throws JSONException {
+ String sqlJson = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\"," +
+ "\"type\":\"SQL\",\"params\":{\"postStatements\":[]," +
+ "\"connParams\":\"\",\"receiversCc\":\"\",\"udfs\":\"\"," +
+ "\"type\":\"MYSQL\",\"title\":\"\",\"sql\":\"show tables\",\"" +
+ "preStatements\":[],\"sqlType\":\"1\",\"receivers\":\"\",\"datasource\":1," +
+ "\"showType\":\"TABLE\",\"localParams\":[],\"datasourceName\":\"dsmetadata\"},\"timeout\"" +
+ ":{\"enable\":false,\"strategy\":\"\"},\"maxRetryTimes\":\"0\"," +
+ "\"taskInstancePriority\":\"MEDIUM\",\"name\":\"mysql\",\"dependence\":{}," +
+ "\"retryInterval\":\"1\",\"preTasks\":[\"dependent\"],\"id\":\"tasks-8745\"}";
+ JSONObject taskNode = JSONUtils.parseObject(sqlJson);
+ if (StringUtils.isNotEmpty(taskNode.getString("type"))) {
+ String taskType = taskNode.getString("type");
+
+ ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
+
+ JSONObject sql = addTaskParam.addImportSpecialParam(taskNode);
+
+ JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false);
+ }
}
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java
index db81138..d21b7be 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java
@@ -34,29 +34,78 @@ import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = ApiApplicationServer.class)
public class DependentParamTest {
+
@Test
- public void testAddDependentSpecialParam() throws JSONException {
+ public void testAddExportDependentSpecialParam() throws JSONException {
+ String dependentJson = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," +
+ "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"," +
+ "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\"," +
+ "\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\"," +
+ "\"cycle\":\"day\",\"dateValue\":\"today\"}]}]}}";
+
+ JSONObject taskNode = JSONUtils.parseObject(dependentJson);
+ if (StringUtils.isNotEmpty(taskNode.getString("type"))) {
+ String taskType = taskNode.getString("type");
+
+ ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
+
+ JSONObject dependent = addTaskParam.addExportSpecialParam(taskNode);
+
+ JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false);
+ }
+
+ String dependentEmpty = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," +
+ "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"}";
+
+ JSONObject taskEmpty = JSONUtils.parseObject(dependentEmpty);
+ if (StringUtils.isNotEmpty(taskEmpty.getString("type"))) {
+ String taskType = taskEmpty.getString("type");
+
+ ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
- String sqlJson = "{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," +
- "\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," +
- "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\"" +
- ",\"localParams\":[],\"connParams\":\"\"," +
- "\"preStatements\":[],\"postStatements\":[]}," +
- "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," +
- "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\"," +
- "\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," +
- "\"preTasks\":[\"dependent\"]}";
+ JSONObject dependent = addTaskParam.addImportSpecialParam(taskEmpty);
+ JSONAssert.assertEquals(taskEmpty.toString(), dependent.toString(), false);
+ }
+
+ }
- JSONObject taskNode = JSONUtils.parseObject(sqlJson);
+ @Test
+ public void testAddImportDependentSpecialParam() throws JSONException {
+ String dependentJson = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\"" +
+ ",\"type\":\"DEPENDENT\",\"params\":{},\"timeout\":{\"enable\":false," +
+ "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\"" +
+ ",\"name\":\"dependent\"," +
+ "\"dependence\":{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\"," +
+ "\"definitionName\":\"shell-1\",\"depTasks\":\"shell-1\",\"projectName\":\"test\"," +
+ "\"projectId\":1,\"cycle\":\"day\",\"definitionId\":7}],\"relation\":\"AND\"}]," +
+ "\"relation\":\"AND\"},\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}";
+
+ JSONObject taskNode = JSONUtils.parseObject(dependentJson);
if (StringUtils.isNotEmpty(taskNode.getString("type"))) {
String taskType = taskNode.getString("type");
- exportProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
+ ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
+
+ JSONObject dependent = addTaskParam.addImportSpecialParam(taskNode);
+
+ JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false);
+ }
+
+ String dependentEmpty = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\"" +
+ ",\"type\":\"DEPENDENT\",\"params\":{},\"timeout\":{\"enable\":false," +
+ "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\"" +
+ ",\"name\":\"dependent\",\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}";
+
+ JSONObject taskNodeEmpty = JSONUtils.parseObject(dependentEmpty);
+ if (StringUtils.isNotEmpty(taskNodeEmpty.getString("type"))) {
+ String taskType = taskNodeEmpty.getString("type");
+
+ ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
- JSONObject sql = addTaskParam.addSpecialParam(taskNode);
+ JSONObject dependent = addTaskParam.addImportSpecialParam(taskNode);
- JSONAssert.assertEquals(taskNode.toString(),sql.toString(),false);
+ JSONAssert.assertEquals(taskNodeEmpty.toString(), dependent.toString(), false);
}
}