You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ga...@apache.org on 2021/07/29 03:36:21 UTC
[dolphinscheduler] branch dev updated: [Improvement][Api Module]
refactor DataSourceParam and DependentParam,
remove spring annotation (#5832)
This is an automated email from the ASF dual-hosted git repository.
gabrywu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 72d6804 [Improvement][Api Module] refactor DataSourceParam and DependentParam, remove spring annotation (#5832)
72d6804 is described below
commit 72d68041e3fccffdfd74d1be3c7129791b24ce2e
Author: wen-hemin <39...@users.noreply.github.com>
AuthorDate: Thu Jul 29 11:36:10 2021 +0800
[Improvement][Api Module] refactor DataSourceParam and DependentParam, remove spring annotation (#5832)
* fix: refactor api utils class, remove spring annotation.
* fix: Optimization comments
Co-authored-by: wen-hemin <we...@apache.com>
---
.../service/impl/ProcessDefinitionServiceImpl.java | 130 ++++++++++++---
.../api/utils/exportprocess/DataSourceParam.java | 84 ----------
.../api/utils/exportprocess/DependentParam.java | 114 -------------
.../utils/exportprocess/ProcessAddTaskParam.java | 39 -----
.../utils/exportprocess/TaskNodeParamFactory.java | 38 -----
.../api/service/ProcessDefinitionServiceTest.java | 181 +++++++++++++++++----
.../utils/exportprocess/DataSourceParamTest.java | 84 ----------
.../utils/exportprocess/DependentParamTest.java | 110 -------------
.../apache/dolphinscheduler/common/Constants.java | 13 ++
9 files changed, 274 insertions(+), 519 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 5bccb01..f3225a2 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -30,8 +30,6 @@ import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
-import org.apache.dolphinscheduler.api.utils.exportprocess.ProcessAddTaskParam;
-import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -51,6 +49,7 @@ import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.common.utils.StreamUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
@@ -62,6 +61,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
@@ -159,6 +159,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private SchedulerService schedulerService;
+ @Autowired
+ private DataSourceMapper dataSourceMapper;
+
/**
* create process definition
*
@@ -720,23 +723,54 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
/**
- * correct task param which has datasource or dependent
+ * Injecting parameters into export process definition
+ * Because the import and export environment resource IDs may be inconsistent,So inject the resource name
+ *
+ * SQL and PROCEDURE node, inject datasourceName
+ * DEPENDENT node, inject projectName and definitionName
*
* @param processData process data
- * @return correct processDefinitionJson
*/
private void addExportTaskNodeSpecialParam(ProcessData processData) {
- List<TaskNode> taskNodeList = processData.getTasks();
- List<TaskNode> tmpNodeList = new ArrayList<>();
- for (TaskNode taskNode : taskNodeList) {
- ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskNode.getType());
- JsonNode jsonNode = JSONUtils.toJsonNode(taskNode);
- if (null != addTaskParam) {
- addTaskParam.addExportSpecialParam(jsonNode);
+ for (TaskNode taskNode : processData.getTasks()) {
+ if (TaskType.SQL.getDesc().equals(taskNode.getType())
+ || TaskType.PROCEDURE.getDesc().equals(taskNode.getType())) {
+ ObjectNode sqlParameters = JSONUtils.parseObject(taskNode.getParams());
+
+ DataSource dataSource = dataSourceMapper.selectById(
+ sqlParameters.path(Constants.TASK_PARAMS_DATASOURCE).asInt());
+
+ if (dataSource != null) {
+ sqlParameters.put(Constants.TASK_PARAMS_DATASOURCE_NAME, dataSource.getName());
+ taskNode.setParams(JSONUtils.toJsonString(sqlParameters));
+ }
+ }
+
+ if (TaskType.DEPENDENT.getDesc().equals(taskNode.getType())) {
+ ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.getDependence());
+
+ if (dependentParameters != null) {
+ ArrayNode dependTaskList = (ArrayNode)dependentParameters.get(
+ Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST);
+ for (int j = 0; j < dependTaskList.size(); j++) {
+ JsonNode dependentTaskModel = dependTaskList.path(j);
+ ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get(
+ Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST);
+ for (int k = 0; k < dependItemList.size(); k++) {
+ ObjectNode dependentItem = (ObjectNode)dependItemList.path(k);
+ int definitionId = dependentItem.path(Constants.TASK_DEPENDENCE_DEFINITION_ID).asInt();
+ ProcessDefinition definition = processDefinitionMapper.queryByDefineId(definitionId);
+ if (definition != null) {
+ dependentItem.put(Constants.TASK_DEPENDENCE_PROJECT_NAME, definition.getProjectName());
+ dependentItem.put(Constants.TASK_DEPENDENCE_DEFINITION_NAME, definition.getName());
+ }
+ }
+ }
+
+ taskNode.setDependence(dependentParameters.toString());
+ }
}
- tmpNodeList.add(JSONUtils.parseObject(jsonNode.toString(), TaskNode.class));
}
- processData.setTasks(tmpNodeList);
}
/**
@@ -918,15 +952,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
private String addImportTaskNodeParam(User loginUser, String processDefinitionJson, Project targetProject) {
ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson);
ArrayNode jsonArray = (ArrayNode) jsonObject.get(TASKS);
- //add sql and dependent param
- for (int i = 0; i < jsonArray.size(); i++) {
- JsonNode taskNode = jsonArray.path(i);
- String taskType = taskNode.path("type").asText();
- ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
- if (null != addTaskParam) {
- addTaskParam.addImportSpecialParam(taskNode);
- }
- }
+
+ addImportTaskNodeSpecialParam(jsonArray);
//recursive sub-process parameter correction map key for old process code value for new process code
Map<Long, Long> subProcessCodeMap = new HashMap<>();
@@ -944,6 +971,65 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
/**
+ * Replace the injecting parameters in import process definition
+ *
+ * SQL and PROCEDURE node, inject datasource by datasourceName
+ * DEPENDENT node, inject projectId and definitionId by projectName and definitionName
+ *
+ * @param jsonArray array node
+ */
+ private void addImportTaskNodeSpecialParam(ArrayNode jsonArray) {
+ // add sql and dependent param
+ for (int i = 0; i < jsonArray.size(); i++) {
+ JsonNode taskNode = jsonArray.path(i);
+ String taskType = taskNode.path("type").asText();
+
+ if (TaskType.SQL.getDesc().equals(taskType) || TaskType.PROCEDURE.getDesc().equals(taskType)) {
+ ObjectNode sqlParameters = (ObjectNode)taskNode.path(Constants.TASK_PARAMS);
+
+ List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(
+ sqlParameters.path(Constants.TASK_PARAMS_DATASOURCE_NAME).asText());
+ if (!dataSources.isEmpty()) {
+ DataSource dataSource = dataSources.get(0);
+ sqlParameters.put(Constants.TASK_PARAMS_DATASOURCE, dataSource.getId());
+ }
+
+ ((ObjectNode)taskNode).set(Constants.TASK_PARAMS, sqlParameters);
+ }
+
+ if (TaskType.DEPENDENT.getDesc().equals(taskType)) {
+ ObjectNode dependentParameters = (ObjectNode)taskNode.path(Constants.DEPENDENCE);
+ if (dependentParameters != null) {
+ ArrayNode dependTaskList = (ArrayNode)dependentParameters.path(
+ Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST);
+ for (int h = 0; h < dependTaskList.size(); h++) {
+ ObjectNode dependentTaskModel = (ObjectNode)dependTaskList.path(h);
+ ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get(
+ Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST);
+ for (int k = 0; k < dependItemList.size(); k++) {
+ ObjectNode dependentItem = (ObjectNode)dependItemList.path(k);
+ Project dependentItemProject = projectMapper.queryByName(
+ dependentItem.path(Constants.TASK_DEPENDENCE_PROJECT_NAME).asText());
+ if (dependentItemProject != null) {
+ ProcessDefinition definition = processDefinitionMapper.queryByDefineName(
+ dependentItemProject.getCode(),
+ dependentItem.path(Constants.TASK_DEPENDENCE_DEFINITION_NAME).asText());
+ if (definition != null) {
+ dependentItem.put(Constants.TASK_DEPENDENCE_PROJECT_ID,
+ dependentItemProject.getId());
+ dependentItem.put(Constants.TASK_DEPENDENCE_DEFINITION_ID, definition.getId());
+ }
+ }
+ }
+ }
+
+ ((ObjectNode)taskNode).set(Constants.DEPENDENCE, dependentParameters);
+ }
+ }
+ }
+ }
+
+ /**
* import process schedule
*
* @param loginUser login user
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java
deleted file mode 100644
index 8572d7b..0000000
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.api.utils.exportprocess;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.dao.entity.DataSource;
-import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.List;
-
-/**
- * task node add datasource param strategy
- */
-@Service
-public class DataSourceParam implements ProcessAddTaskParam, InitializingBean {
-
- private static final String PARAMS = "params";
- @Autowired
- private DataSourceMapper dataSourceMapper;
-
- /**
- * add datasource params
- * @param taskNode task node json object
- * @return task node json object
- */
- @Override
- public JsonNode addExportSpecialParam(JsonNode taskNode) {
- // add sqlParameters
- ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS);
- DataSource dataSource = dataSourceMapper.selectById(sqlParameters.get("datasource").asInt());
- if (null != dataSource) {
- sqlParameters.put("datasourceName", dataSource.getName());
- }
- ((ObjectNode)taskNode).set(PARAMS, sqlParameters);
-
- return taskNode;
- }
-
- /**
- * import process add datasource params
- * @param taskNode task node json object
- * @return task node json object
- */
- @Override
- public JsonNode addImportSpecialParam(JsonNode taskNode) {
- ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS);
- List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.path("datasourceName").asText());
- if (!dataSources.isEmpty()) {
- DataSource dataSource = dataSources.get(0);
- sqlParameters.put("datasource", dataSource.getId());
- }
- ((ObjectNode)taskNode).set(PARAMS, sqlParameters);
- return taskNode;
- }
-
-
- /**
- * put datasource strategy
- */
- @Override
- public void afterPropertiesSet() {
- TaskNodeParamFactory.register(TaskType.SQL.getDesc(), this);
- TaskNodeParamFactory.register(TaskType.PROCEDURE.getDesc(), this);
- }
-}
\ No newline at end of file
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java
deleted file mode 100644
index 29746f8..0000000
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.api.utils.exportprocess;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-/**
- * task node add dependent param strategy
- */
-@Service
-public class DependentParam implements ProcessAddTaskParam, InitializingBean {
-
- private static final String DEPENDENCE = "dependence";
-
- @Autowired
- ProcessDefinitionMapper processDefineMapper;
-
- @Autowired
- ProjectMapper projectMapper;
-
- /**
- * add dependent param
- * @param taskNode task node json object
- * @return task node json object
- */
- @Override
- public JsonNode addExportSpecialParam(JsonNode taskNode) {
- // add dependent param
- ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText());
-
- if (null != dependentParameters) {
- ArrayNode dependTaskList = (ArrayNode) dependentParameters.get("dependTaskList");
- for (int j = 0; j < dependTaskList.size(); j++) {
- JsonNode dependentTaskModel = dependTaskList.path(j);
- ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList");
- for (int k = 0; k < dependItemList.size(); k++) {
- ObjectNode dependentItem = (ObjectNode) dependItemList.path(k);
- int definitionId = dependentItem.path("definitionId").asInt();
- ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId);
- if (null != definition) {
- dependentItem.put("projectName", definition.getProjectName());
- dependentItem.put("definitionName", definition.getName());
- }
- }
- }
- ((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters);
- }
-
- return taskNode;
- }
-
- /**
- * import process add dependent param
- * @param taskNode task node json object
- * @return
- */
- @Override
- public JsonNode addImportSpecialParam(JsonNode taskNode) {
- ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText());
- if(dependentParameters != null){
- ArrayNode dependTaskList = (ArrayNode) dependentParameters.path("dependTaskList");
- for (int h = 0; h < dependTaskList.size(); h++) {
- ObjectNode dependentTaskModel = (ObjectNode) dependTaskList.path(h);
- ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList");
- for (int k = 0; k < dependItemList.size(); k++) {
- ObjectNode dependentItem = (ObjectNode) dependItemList.path(k);
- Project dependentItemProject = projectMapper.queryByName(dependentItem.path("projectName").asText());
- if(dependentItemProject != null){
- ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getCode(),dependentItem.path("definitionName").asText());
- if(definition != null){
- dependentItem.put("projectId",dependentItemProject.getId());
- dependentItem.put("definitionId",definition.getId());
- }
- }
- }
- }
- ((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters);
- }
- return taskNode;
- }
-
- /**
- * put dependent strategy
- */
- @Override
- public void afterPropertiesSet() {
- TaskNodeParamFactory.register(TaskType.DEPENDENT.getDesc(), this);
- }
-}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java
deleted file mode 100644
index 8e40855..0000000
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.api.utils.exportprocess;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * ProcessAddTaskParam
- */
-public interface ProcessAddTaskParam {
-
- /**
- * add export task special param: sql task dependent task
- * @param taskNode task node json object
- * @return task node json object
- */
- JsonNode addExportSpecialParam(JsonNode taskNode);
-
- /**
- * add task special param: sql task dependent task
- * @param taskNode task node json object
- * @return task node json object
- */
- JsonNode addImportSpecialParam(JsonNode taskNode);
-}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java
deleted file mode 100644
index b8f7b03..0000000
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.api.utils.exportprocess;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * task node param factory
- */
-public class TaskNodeParamFactory {
-
- private static Map<String, ProcessAddTaskParam> taskServices = new ConcurrentHashMap<>();
-
- public static ProcessAddTaskParam getByTaskType(String taskType){
- return taskServices.get(taskType);
- }
-
- static void register(String taskType, ProcessAddTaskParam addSpecialTaskParam){
- if (null != taskType) {
- taskServices.put(taskType, addSpecialTaskParam);
- }
- }
-}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 8ee5b9a..fc0b42a 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -47,6 +47,7 @@ import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
@@ -59,6 +60,7 @@ import org.apache.http.entity.ContentType;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -82,6 +84,9 @@ import org.springframework.web.multipart.MultipartFile;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* process definition service test
@@ -129,6 +134,7 @@ public class ProcessDefinitionServiceTest {
+ " \"tenantId\": 1,\n"
+ " \"timeout\": 0\n"
+ "}";
+
private static final String CYCLE_SHELL_JSON = "{\n"
+ " \"globalParams\": [\n"
+ " \n"
@@ -231,25 +237,37 @@ public class ProcessDefinitionServiceTest {
+ " \"tenantId\": 1,\n"
+ " \"timeout\": 0\n"
+ "}";
+
@InjectMocks
private ProcessDefinitionServiceImpl processDefinitionService;
+
@Mock
- private ProcessDefinitionMapper processDefineMapper;
+ private ProcessDefinitionMapper processDefinitionMapper;
+
@Mock
private ProcessTaskRelationMapper processTaskRelationMapper;
+
@Mock
private ProjectMapper projectMapper;
+
@Mock
private ProjectServiceImpl projectService;
+
@Mock
private ScheduleMapper scheduleMapper;
+
@Mock
private ProcessService processService;
+
@Mock
private ProcessInstanceService processInstanceService;
+
@Mock
private TaskInstanceMapper taskInstanceMapper;
+ @Mock
+ private DataSourceMapper dataSourceMapper;
+
@Test
public void testQueryProcessDefinitionList() {
String projectName = "project_test1";
@@ -273,7 +291,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
List<ProcessDefinition> resourceList = new ArrayList<>();
resourceList.add(getProcessDefinition());
- Mockito.when(processDefineMapper.queryAllDefinitionList(project.getCode())).thenReturn(resourceList);
+ Mockito.when(processDefinitionMapper.queryAllDefinitionList(project.getCode())).thenReturn(resourceList);
Map<String, Object> checkSuccessRes = processDefinitionService.queryProcessDefinitionList(loginUser, "project_test1");
Assert.assertEquals(Status.SUCCESS, checkSuccessRes.get(Constants.STATUS));
}
@@ -303,7 +321,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Page<ProcessDefinition> page = new Page<>(1, 10);
page.setTotal(30);
- Mockito.when(processDefineMapper.queryDefineListPaging(
+ Mockito.when(processDefinitionMapper.queryDefineListPaging(
Mockito.any(IPage.class)
, Mockito.eq("")
, Mockito.eq(loginUser.getId())
@@ -339,7 +357,7 @@ public class ProcessDefinitionServiceTest {
//project check auth success, instance not exist
putMsg(result, Status.SUCCESS, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
- Mockito.when(processDefineMapper.selectById(1)).thenReturn(null);
+ Mockito.when(processDefinitionMapper.selectById(1)).thenReturn(null);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
@@ -356,7 +374,7 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
//instance exit
- Mockito.when(processDefineMapper.selectById(46)).thenReturn(getProcessDefinition());
+ Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(getProcessDefinition());
Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionById(loginUser,
"project_test1", 46);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
@@ -385,7 +403,7 @@ public class ProcessDefinitionServiceTest {
//project check auth success, instance not exist
putMsg(result, Status.SUCCESS, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
- Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null);
+ Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null);
ProcessData processData = getProcessData();
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData);
@@ -394,7 +412,7 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
//instance exit
- Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test")).thenReturn(getProcessDefinition());
+ Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), "test")).thenReturn(getProcessDefinition());
Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionByName(loginUser,
"project_test1", "test");
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
@@ -448,7 +466,7 @@ public class ProcessDefinitionServiceTest {
definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}");
definition.setConnects("[]");
- Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition);
+ Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(definition);
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(getProcessData());
Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition(
@@ -517,7 +535,7 @@ public class ProcessDefinitionServiceTest {
//project check auth success, instance not exist
putMsg(result, Status.SUCCESS, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
- Mockito.when(processDefineMapper.selectById(1)).thenReturn(null);
+ Mockito.when(processDefinitionMapper.selectById(1)).thenReturn(null);
Map<String, Object> instanceNotexitRes = processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 1);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
@@ -525,7 +543,7 @@ public class ProcessDefinitionServiceTest {
ProcessDefinition processDefinition = getProcessDefinition();
//user no auth
loginUser.setUserType(UserType.GENERAL_USER);
- Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
+ Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> userNoAuthRes = processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 46);
Assert.assertEquals(Status.USER_NO_OPERATION_PERM, userNoAuthRes.get(Constants.STATUS));
@@ -533,14 +551,14 @@ public class ProcessDefinitionServiceTest {
//process definition online
loginUser.setUserType(UserType.ADMIN_USER);
processDefinition.setReleaseState(ReleaseState.ONLINE);
- Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
+ Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> dfOnlineRes = processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 46);
Assert.assertEquals(Status.PROCESS_DEFINE_STATE_ONLINE, dfOnlineRes.get(Constants.STATUS));
//scheduler list elements > 1
processDefinition.setReleaseState(ReleaseState.OFFLINE);
- Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
+ Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition);
List<Schedule> schedules = new ArrayList<>();
schedules.add(getSchedule());
schedules.add(getSchedule());
@@ -564,13 +582,13 @@ public class ProcessDefinitionServiceTest {
schedule.setReleaseState(ReleaseState.OFFLINE);
schedules.add(schedule);
Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules);
- Mockito.when(processDefineMapper.deleteById(46)).thenReturn(0);
+ Mockito.when(processDefinitionMapper.deleteById(46)).thenReturn(0);
Map<String, Object> deleteFail = processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 46);
Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS));
//delete success
- Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
+ Mockito.when(processDefinitionMapper.deleteById(46)).thenReturn(1);
Map<String, Object> deleteSuccess = processDefinitionService.deleteProcessDefinitionById(loginUser,
"project_test1", 46);
Assert.assertEquals(Status.SUCCESS, deleteSuccess.get(Constants.STATUS));
@@ -597,7 +615,7 @@ public class ProcessDefinitionServiceTest {
// project check auth success, processs definition online
putMsg(result, Status.SUCCESS, projectName);
- Mockito.when(processDefineMapper.selectById(46)).thenReturn(getProcessDefinition());
+ Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(getProcessDefinition());
Map<String, Object> onlineRes = processDefinitionService.releaseProcessDefinition(
loginUser, "project_test1", 46, ReleaseState.ONLINE);
Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS));
@@ -605,7 +623,7 @@ public class ProcessDefinitionServiceTest {
// project check auth success, processs definition online
ProcessDefinition processDefinition1 = getProcessDefinition();
processDefinition1.setResourceIds("1,2");
- Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition1);
+ Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition1);
Mockito.when(processService.getUserById(1)).thenReturn(loginUser);
Map<String, Object> onlineWithResourceRes = processDefinitionService.releaseProcessDefinition(
loginUser, "project_test1", 46, ReleaseState.ONLINE);
@@ -638,13 +656,13 @@ public class ProcessDefinitionServiceTest {
//project check auth success, process not exist
putMsg(result, Status.SUCCESS, projectName);
- Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(null);
+ Mockito.when(processDefinitionMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser,
"project_test1", "test_pdf");
Assert.assertEquals(Status.SUCCESS, processNotExistRes.get(Constants.STATUS));
//process exist
- Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(getProcessDefinition());
+ Mockito.when(processDefinitionMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(getProcessDefinition());
Map<String, Object> processExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser,
"project_test1", "test_pdf");
Assert.assertEquals(Status.PROCESS_DEFINITION_NAME_EXIST, processExistRes.get(Constants.STATUS));
@@ -684,19 +702,19 @@ public class ProcessDefinitionServiceTest {
@Test
public void testGetTaskNodeListByDefinitionId() {
//process definition not exist
- Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(null);
+ Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(null);
Map<String, Object> processDefinitionNullRes = processDefinitionService.getTaskNodeListByDefinitionCode(46L);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
//process data null
ProcessDefinition processDefinition = getProcessDefinition();
- Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
+ Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionCode(46L);
Assert.assertEquals(Status.DATA_IS_NOT_VALID, successRes.get(Constants.STATUS));
//success
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(new ProcessData());
- Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
+ Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
Map<String, Object> dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionCode(46L);
Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS));
}
@@ -707,7 +725,7 @@ public class ProcessDefinitionServiceTest {
String defineCodeList = "46";
Long[] codeArray = {46L};
List<Long> codeList = Arrays.asList(codeArray);
- Mockito.when(processDefineMapper.queryByCodes(codeList)).thenReturn(null);
+ Mockito.when(processDefinitionMapper.queryByCodes(codeList)).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.getTaskNodeListByDefinitionCodeList(defineCodeList);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processNotExistRes.get(Constants.STATUS));
@@ -715,7 +733,7 @@ public class ProcessDefinitionServiceTest {
ProcessDefinition processDefinition = getProcessDefinition();
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
- Mockito.when(processDefineMapper.queryByCodes(codeList)).thenReturn(processDefinitionList);
+ Mockito.when(processDefinitionMapper.queryByCodes(codeList)).thenReturn(processDefinitionList);
ProcessData processData = getProcessData();
Mockito.when(processService.genProcessData(processDefinition)).thenReturn(processData);
@@ -748,7 +766,7 @@ public class ProcessDefinitionServiceTest {
processDefinitionList.add(processDefinition);
Project test = getProject("test");
Mockito.when(projectMapper.selectById(projectId)).thenReturn(test);
- Mockito.when(processDefineMapper.queryAllDefinitionList(test.getCode())).thenReturn(processDefinitionList);
+ Mockito.when(processDefinitionMapper.queryAllDefinitionList(test.getCode())).thenReturn(processDefinitionList);
Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionAllByProjectId(projectId);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@@ -758,7 +776,7 @@ public class ProcessDefinitionServiceTest {
//process definition not exist
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(SHELL_JSON);
- Mockito.when(processDefineMapper.selectById(46)).thenReturn(null);
+ Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(null);
Map<String, Object> processDefinitionNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
@@ -782,7 +800,7 @@ public class ProcessDefinitionServiceTest {
taskInstance.setHost("192.168.xx.xx");
//task instance not exist
- Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
+ Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
@@ -817,7 +835,7 @@ public class ProcessDefinitionServiceTest {
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setHost("192.168.xx.xx");
taskInstance.setTaskParams("\"processDefinitionId\": \"222\",\n");
- Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
+ Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
@@ -1035,6 +1053,7 @@ public class ProcessDefinitionServiceTest {
+ " \"tenantId\": 1,\n"
+ " \"timeout\": 0\n"
+ "}";
+
Map<String, Object> updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectName, 1, "test",
sqlDependentJson, "", "", "");
@@ -1075,7 +1094,7 @@ public class ProcessDefinitionServiceTest {
checkResult.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(checkResult);
- Mockito.when(processDefineMapper.queryByDefineId(1)).thenReturn(processDefinition);
+ Mockito.when(processDefinitionMapper.queryByDefineId(1)).thenReturn(processDefinition);
HttpServletResponse response = mock(HttpServletResponse.class);
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
@@ -1218,4 +1237,110 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(0, processDefinitionService.importProcessSchedule(loginUser, projectName, processMeta, processDefinitionName, processDefinitionId));
}
+ @Test
+ public void testAddExportTaskNodeSpecialParam() {
+ String sqlDependentJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":"
+ + "\"sql\",\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\",\"udfs\":\"\""
+ + ",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\","
+ + "\"localParams\":[],\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[]},\"description\":"
+ + "\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\","
+ + "\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\","
+ + "\"workerGroupId\":-1,\"preTasks\":[\"dependent\"]},{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\","
+ + "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":"
+ + "{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"projectId\":"
+ + "2,\"definitionId\":46,\"depTasks\":\"ALL\",\"cycle\":\"day\",\"dateValue\":\"today\"}]}]},"
+ + "\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"enable\":false},"
+ + "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,"
+ + "\"timeout\":0}";
+
+ ProcessData processData = JSONUtils.parseObject(sqlDependentJson, ProcessData.class);
+
+ DataSource dataSource = new DataSource();
+ dataSource.setName("testDataSource");
+ when(dataSourceMapper.selectById(1)).thenReturn(dataSource);
+
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setProjectName("testProjectName");
+ processDefinition.setName("testDefinitionName");
+ when(processDefinitionMapper.queryByDefineId(46)).thenReturn(processDefinition);
+
+ try {
+ Class clazz = ProcessDefinitionServiceImpl.class;
+ Method method = clazz.getDeclaredMethod("addExportTaskNodeSpecialParam", ProcessData.class);
+ method.setAccessible(true);
+ method.invoke(processDefinitionService, processData);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.toString());
+ }
+
+ List<TaskNode> taskNodeList = processData.getTasks();
+ ObjectNode sqlParameters = JSONUtils.parseObject(taskNodeList.get(0).getParams());
+ Assert.assertEquals("testDataSource", sqlParameters.get(Constants.TASK_PARAMS_DATASOURCE_NAME).asText());
+
+ ObjectNode dependentParameters = JSONUtils.parseObject(taskNodeList.get(1).getDependence());
+ ArrayNode dependTaskList = (ArrayNode)dependentParameters.get(Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST);
+ JsonNode dependentTaskModel = dependTaskList.path(0);
+ ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get(Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST);
+ ObjectNode dependentItem = (ObjectNode)dependItemList.path(0);
+ Assert.assertEquals("testProjectName", dependentItem.get(Constants.TASK_DEPENDENCE_PROJECT_NAME).asText());
+ Assert.assertEquals("testDefinitionName", dependentItem.get(Constants.TASK_DEPENDENCE_DEFINITION_NAME).asText());
+ }
+
+ @Test
+ public void testAddImportTaskNodeSpecialParam() {
+ String definitionJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\","
+ + "\"params\":{\"type\":\"MYSQL\",\"datasourceName\":\"testDataSource\",\"sql\":\"select * from test\","
+ + "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":"
+ + "\"TABLE\",\"localParams\":[],\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[]},"
+ + "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+ + "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":"
+ + "\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[\"dependent\"]},{\"type\":\"DEPENDENT\",\"id\":"
+ + "\"tasks-33787\",\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\","
+ + "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":"
+ + "[{\"projectName\":\"testProjectName\",\"definitionName\":\"testDefinitionName\",\"depTasks\":\"ALL\""
+ + ",\"cycle\":\"day\",\"dateValue\":\"today\"}]}]},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\""
+ + "timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\""
+ + ":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
+
+ ObjectNode jsonObject = JSONUtils.parseObject(definitionJson);
+ ArrayNode jsonArray = (ArrayNode) jsonObject.get("tasks");
+
+ List<DataSource> dataSources = new ArrayList<>();
+ DataSource dataSource = new DataSource();
+ dataSource.setId(1);
+ dataSources.add(dataSource);
+ when(dataSourceMapper.queryDataSourceByName("testDataSource")).thenReturn(dataSources);
+
+ Project project = new Project();
+ project.setId(1);
+ project.setCode(1L);
+ when(projectMapper.queryByName("testProjectName")).thenReturn(project);
+
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setId(1);
+ when(processDefinitionMapper.queryByDefineName(1L, "testDefinitionName")).thenReturn(processDefinition);
+
+ try {
+ Class clazz = ProcessDefinitionServiceImpl.class;
+ Method method = clazz.getDeclaredMethod("addImportTaskNodeSpecialParam", ArrayNode.class);
+ method.setAccessible(true);
+ method.invoke(processDefinitionService, jsonArray);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.toString());
+ }
+
+ ObjectNode sqlParameters = (ObjectNode)jsonArray.path(0).path(Constants.TASK_PARAMS);
+ Assert.assertEquals(1, sqlParameters.get(Constants.TASK_PARAMS_DATASOURCE).asInt());
+
+ ObjectNode dependentParameters = (ObjectNode)jsonArray.path(1).path(Constants.DEPENDENCE);
+ ArrayNode dependTaskList = (ArrayNode)dependentParameters.get(Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST);
+ JsonNode dependentTaskModel = dependTaskList.path(0);
+ ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get(Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST);
+ ObjectNode dependentItem = (ObjectNode)dependItemList.path(0);
+ Assert.assertEquals(1, dependentItem.get(Constants.TASK_DEPENDENCE_PROJECT_ID).asInt());
+ Assert.assertEquals(1, dependentItem.get(Constants.TASK_DEPENDENCE_DEFINITION_ID).asInt());
+ }
+
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java
deleted file mode 100644
index ceee22f..0000000
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.utils.exportprocess;
-
-import org.apache.dolphinscheduler.api.controller.AbstractControllerTest;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-
-import org.json.JSONException;
-import org.junit.Test;
-import org.skyscreamer.jsonassert.JSONAssert;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * DataSourceParamTest
- */
-public class DataSourceParamTest extends AbstractControllerTest {
-
- @Test
- public void testAddExportDependentSpecialParam() throws JSONException {
-
- String sqlJson = "{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\","
- + "\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\","
- + "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\""
- + ",\"localParams\":[],\"connParams\":\"\","
- + "\"preStatements\":[],\"postStatements\":[]},"
- + "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
- + "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\","
- + "\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,"
- + "\"preTasks\":[\"dependent\"]}";
-
- ObjectNode taskNode = JSONUtils.parseObject(sqlJson);
- if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
- String taskType = taskNode.path("type").asText();
-
- ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
-
- JsonNode sql = addTaskParam.addExportSpecialParam(taskNode);
-
- JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false);
- }
- }
-
- @Test
- public void testAddImportDependentSpecialParam() throws JSONException {
- String sqlJson = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\","
- + "\"type\":\"SQL\",\"params\":{\"postStatements\":[],"
- + "\"connParams\":\"\",\"receiversCc\":\"\",\"udfs\":\"\","
- + "\"type\":\"MYSQL\",\"title\":\"\",\"sql\":\"show tables\",\""
- + "preStatements\":[],\"sqlType\":\"1\",\"receivers\":\"\",\"datasource\":1,"
- + "\"showType\":\"TABLE\",\"localParams\":[],\"datasourceName\":\"dsmetadata\"},\"timeout\""
- + ":{\"enable\":false,\"strategy\":\"\"},\"maxRetryTimes\":\"0\","
- + "\"taskInstancePriority\":\"MEDIUM\",\"name\":\"mysql\",\"dependence\":{},"
- + "\"retryInterval\":\"1\",\"preTasks\":[\"dependent\"],\"id\":\"tasks-8745\"}";
-
- ObjectNode taskNode = JSONUtils.parseObject(sqlJson);
- if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
- String taskType = taskNode.path("type").asText();
-
- ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
-
- JsonNode sql = addTaskParam.addImportSpecialParam(taskNode);
-
- JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false);
- }
- }
-}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java
deleted file mode 100644
index 531856c..0000000
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.utils.exportprocess;
-
-import org.apache.dolphinscheduler.api.controller.AbstractControllerTest;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-
-import org.json.JSONException;
-import org.junit.Test;
-import org.skyscreamer.jsonassert.JSONAssert;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * DependentParamTest
- */
-public class DependentParamTest extends AbstractControllerTest {
-
- @Test
- public void testAddExportDependentSpecialParam() throws JSONException {
- String dependentJson = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\","
- + "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\","
- + "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\","
- + "\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\","
- + "\"cycle\":\"day\",\"dateValue\":\"today\"}]}]}}";
-
- ObjectNode taskNode = JSONUtils.parseObject(dependentJson);
- if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
- String taskType = taskNode.path("type").asText();
-
- ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
-
- JsonNode dependent = addTaskParam.addExportSpecialParam(taskNode);
-
- JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false);
- }
-
- String dependentEmpty = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\","
- + "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"}";
-
- ObjectNode taskEmpty = JSONUtils.parseObject(dependentEmpty);
- if (StringUtils.isNotEmpty(taskEmpty.path("type").asText())) {
- String taskType = taskEmpty.path("type").asText();
-
- ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
-
- JsonNode dependent = addTaskParam.addImportSpecialParam(taskEmpty);
-
- JSONAssert.assertEquals(taskEmpty.toString(), dependent.toString(), false);
- }
-
- }
-
- @Test
- public void testAddImportDependentSpecialParam() throws JSONException {
- String dependentJson = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\""
- + ",\"type\":\"DEPENDENT\",\"params\":{},\"timeout\":{\"enable\":false,"
- + "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\""
- + ",\"name\":\"dependent\","
- + "\"dependence\":{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\","
- + "\"definitionName\":\"shell-1\",\"depTasks\":\"shell-1\",\"projectName\":\"test\","
- + "\"projectId\":1,\"cycle\":\"day\",\"definitionId\":7}],\"relation\":\"AND\"}],"
- + "\"relation\":\"AND\"},\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}";
-
- ObjectNode taskNode = JSONUtils.parseObject(dependentJson);
- if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
- String taskType = taskNode.path("type").asText();
-
- ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
-
- JsonNode dependent = addTaskParam.addImportSpecialParam(taskNode);
-
- JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false);
- }
-
- String dependentEmpty = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\""
- + ",\"type\":\"DEPENDENT\",\"params\":{},\"timeout\":{\"enable\":false,"
- + "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\""
- + ",\"name\":\"dependent\",\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}";
-
- JsonNode taskNodeEmpty = JSONUtils.parseObject(dependentEmpty);
- if (StringUtils.isNotEmpty(taskNodeEmpty.path("type").asText())) {
- String taskType = taskNodeEmpty.path("type").asText();
-
- ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
-
- JsonNode dependent = addTaskParam.addImportSpecialParam(taskNode);
-
- JSONAssert.assertEquals(taskNodeEmpty.toString(), dependent.toString(), false);
- }
-
- }
-}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 225dc23..53645b7 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -1091,4 +1091,17 @@ public final class Constants {
public static final boolean DOCKER_MODE = StringUtils.isNotEmpty(System.getenv("DOCKER"));
public static final boolean KUBERNETES_MODE = StringUtils.isNotEmpty(System.getenv("KUBERNETES_SERVICE_HOST")) && StringUtils.isNotEmpty(System.getenv("KUBERNETES_SERVICE_PORT"));
+ /**
+ * task parameter keys
+ */
+ public static final String TASK_PARAMS = "params";
+ public static final String TASK_PARAMS_DATASOURCE = "datasource";
+ public static final String TASK_PARAMS_DATASOURCE_NAME = "datasourceName";
+ public static final String TASK_DEPENDENCE = "dependence";
+ public static final String TASK_DEPENDENCE_DEPEND_TASK_LIST = "dependTaskList";
+ public static final String TASK_DEPENDENCE_DEPEND_ITEM_LIST = "dependItemList";
+ public static final String TASK_DEPENDENCE_PROJECT_ID = "projectId";
+ public static final String TASK_DEPENDENCE_PROJECT_NAME = "projectName";
+ public static final String TASK_DEPENDENCE_DEFINITION_ID = "definitionId";
+ public static final String TASK_DEPENDENCE_DEFINITION_NAME = "definitionName";
}