You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ga...@apache.org on 2021/07/29 03:36:21 UTC

[dolphinscheduler] branch dev updated: [Improvement][Api Module] refactor DataSourceParam and DependentParam, remove spring annotation (#5832)

This is an automated email from the ASF dual-hosted git repository.

gabrywu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 72d6804  [Improvement][Api Module] refactor DataSourceParam and DependentParam, remove spring annotation (#5832)
72d6804 is described below

commit 72d68041e3fccffdfd74d1be3c7129791b24ce2e
Author: wen-hemin <39...@users.noreply.github.com>
AuthorDate: Thu Jul 29 11:36:10 2021 +0800

    [Improvement][Api Module] refactor DataSourceParam and DependentParam, remove spring annotation (#5832)
    
    * fix: refactor api utils class, remove spring annotation.
    
    * fix: Optimization comments
    
    Co-authored-by: wen-hemin <we...@apache.com>
---
 .../service/impl/ProcessDefinitionServiceImpl.java | 130 ++++++++++++---
 .../api/utils/exportprocess/DataSourceParam.java   |  84 ----------
 .../api/utils/exportprocess/DependentParam.java    | 114 -------------
 .../utils/exportprocess/ProcessAddTaskParam.java   |  39 -----
 .../utils/exportprocess/TaskNodeParamFactory.java  |  38 -----
 .../api/service/ProcessDefinitionServiceTest.java  | 181 +++++++++++++++++----
 .../utils/exportprocess/DataSourceParamTest.java   |  84 ----------
 .../utils/exportprocess/DependentParamTest.java    | 110 -------------
 .../apache/dolphinscheduler/common/Constants.java  |  13 ++
 9 files changed, 274 insertions(+), 519 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 5bccb01..f3225a2 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -30,8 +30,6 @@ import org.apache.dolphinscheduler.api.service.SchedulerService;
 import org.apache.dolphinscheduler.api.utils.CheckUtils;
 import org.apache.dolphinscheduler.api.utils.FileUtils;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
-import org.apache.dolphinscheduler.api.utils.exportprocess.ProcessAddTaskParam;
-import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -51,6 +49,7 @@ import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
 import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
 import org.apache.dolphinscheduler.common.utils.StreamUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
 import org.apache.dolphinscheduler.dao.entity.ProcessData;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
@@ -62,6 +61,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
@@ -159,6 +159,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     @Autowired
     private SchedulerService schedulerService;
 
+    @Autowired
+    private DataSourceMapper dataSourceMapper;
+
     /**
      * create process definition
      *
@@ -720,23 +723,54 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     }
 
     /**
-     * correct task param which has datasource or dependent
+     * Injecting parameters into export process definition
+     * Because the import and export environment resource IDs may be inconsistent,So inject the resource name
+     *
+     * SQL and PROCEDURE node, inject datasourceName
+     * DEPENDENT node, inject projectName and definitionName
      *
      * @param processData process data
-     * @return correct processDefinitionJson
      */
     private void addExportTaskNodeSpecialParam(ProcessData processData) {
-        List<TaskNode> taskNodeList = processData.getTasks();
-        List<TaskNode> tmpNodeList = new ArrayList<>();
-        for (TaskNode taskNode : taskNodeList) {
-            ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskNode.getType());
-            JsonNode jsonNode = JSONUtils.toJsonNode(taskNode);
-            if (null != addTaskParam) {
-                addTaskParam.addExportSpecialParam(jsonNode);
+        for (TaskNode taskNode : processData.getTasks()) {
+            if (TaskType.SQL.getDesc().equals(taskNode.getType())
+                || TaskType.PROCEDURE.getDesc().equals(taskNode.getType())) {
+                ObjectNode sqlParameters = JSONUtils.parseObject(taskNode.getParams());
+
+                DataSource dataSource = dataSourceMapper.selectById(
+                        sqlParameters.path(Constants.TASK_PARAMS_DATASOURCE).asInt());
+
+                if (dataSource != null) {
+                    sqlParameters.put(Constants.TASK_PARAMS_DATASOURCE_NAME, dataSource.getName());
+                    taskNode.setParams(JSONUtils.toJsonString(sqlParameters));
+                }
+            }
+
+            if (TaskType.DEPENDENT.getDesc().equals(taskNode.getType())) {
+                ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.getDependence());
+
+                if (dependentParameters != null) {
+                    ArrayNode dependTaskList = (ArrayNode)dependentParameters.get(
+                        Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST);
+                    for (int j = 0; j < dependTaskList.size(); j++) {
+                        JsonNode dependentTaskModel = dependTaskList.path(j);
+                        ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get(
+                            Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST);
+                        for (int k = 0; k < dependItemList.size(); k++) {
+                            ObjectNode dependentItem = (ObjectNode)dependItemList.path(k);
+                            int definitionId = dependentItem.path(Constants.TASK_DEPENDENCE_DEFINITION_ID).asInt();
+                            ProcessDefinition definition = processDefinitionMapper.queryByDefineId(definitionId);
+                            if (definition != null) {
+                                dependentItem.put(Constants.TASK_DEPENDENCE_PROJECT_NAME, definition.getProjectName());
+                                dependentItem.put(Constants.TASK_DEPENDENCE_DEFINITION_NAME, definition.getName());
+                            }
+                        }
+                    }
+
+                    taskNode.setDependence(dependentParameters.toString());
+                }
             }
-            tmpNodeList.add(JSONUtils.parseObject(jsonNode.toString(), TaskNode.class));
         }
-        processData.setTasks(tmpNodeList);
     }
 
     /**
@@ -918,15 +952,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     private String addImportTaskNodeParam(User loginUser, String processDefinitionJson, Project targetProject) {
         ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson);
         ArrayNode jsonArray = (ArrayNode) jsonObject.get(TASKS);
-        //add sql and dependent param
-        for (int i = 0; i < jsonArray.size(); i++) {
-            JsonNode taskNode = jsonArray.path(i);
-            String taskType = taskNode.path("type").asText();
-            ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
-            if (null != addTaskParam) {
-                addTaskParam.addImportSpecialParam(taskNode);
-            }
-        }
+
+        addImportTaskNodeSpecialParam(jsonArray);
 
         //recursive sub-process parameter correction map key for old process code value for new process code
         Map<Long, Long> subProcessCodeMap = new HashMap<>();
@@ -944,6 +971,65 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     }
 
     /**
+     * Replace the injecting parameters in import process definition
+     *
+     * SQL and PROCEDURE node, inject datasource by datasourceName
+     * DEPENDENT node, inject projectId and definitionId by projectName and definitionName
+     *
+     * @param jsonArray array node
+     */
+    private void addImportTaskNodeSpecialParam(ArrayNode jsonArray) {
+        // add sql and dependent param
+        for (int i = 0; i < jsonArray.size(); i++) {
+            JsonNode taskNode = jsonArray.path(i);
+            String taskType = taskNode.path("type").asText();
+
+            if (TaskType.SQL.getDesc().equals(taskType) || TaskType.PROCEDURE.getDesc().equals(taskType)) {
+                ObjectNode sqlParameters = (ObjectNode)taskNode.path(Constants.TASK_PARAMS);
+
+                List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(
+                    sqlParameters.path(Constants.TASK_PARAMS_DATASOURCE_NAME).asText());
+                if (!dataSources.isEmpty()) {
+                    DataSource dataSource = dataSources.get(0);
+                    sqlParameters.put(Constants.TASK_PARAMS_DATASOURCE, dataSource.getId());
+                }
+
+                ((ObjectNode)taskNode).set(Constants.TASK_PARAMS, sqlParameters);
+            }
+
+            if (TaskType.DEPENDENT.getDesc().equals(taskType)) {
+                ObjectNode dependentParameters = (ObjectNode)taskNode.path(Constants.DEPENDENCE);
+                if (dependentParameters != null) {
+                    ArrayNode dependTaskList = (ArrayNode)dependentParameters.path(
+                        Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST);
+                    for (int h = 0; h < dependTaskList.size(); h++) {
+                        ObjectNode dependentTaskModel = (ObjectNode)dependTaskList.path(h);
+                        ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get(
+                            Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST);
+                        for (int k = 0; k < dependItemList.size(); k++) {
+                            ObjectNode dependentItem = (ObjectNode)dependItemList.path(k);
+                            Project dependentItemProject = projectMapper.queryByName(
+                                dependentItem.path(Constants.TASK_DEPENDENCE_PROJECT_NAME).asText());
+                            if (dependentItemProject != null) {
+                                ProcessDefinition definition = processDefinitionMapper.queryByDefineName(
+                                    dependentItemProject.getCode(),
+                                    dependentItem.path(Constants.TASK_DEPENDENCE_DEFINITION_NAME).asText());
+                                if (definition != null) {
+                                    dependentItem.put(Constants.TASK_DEPENDENCE_PROJECT_ID,
+                                        dependentItemProject.getId());
+                                    dependentItem.put(Constants.TASK_DEPENDENCE_DEFINITION_ID, definition.getId());
+                                }
+                            }
+                        }
+                    }
+
+                    ((ObjectNode)taskNode).set(Constants.DEPENDENCE, dependentParameters);
+                }
+            }
+        }
+    }
+
+    /**
      * import process schedule
      *
      * @param loginUser login user
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java
deleted file mode 100644
index 8572d7b..0000000
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.api.utils.exportprocess;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.dao.entity.DataSource;
-import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.List;
-
-/**
- * task node add datasource param strategy
- */
-@Service
-public class DataSourceParam implements ProcessAddTaskParam, InitializingBean {
-
-    private static final String PARAMS = "params";
-    @Autowired
-    private DataSourceMapper dataSourceMapper;
-
-    /**
-     * add datasource params
-     * @param taskNode task node json object
-     * @return task node json object
-     */
-    @Override
-    public JsonNode addExportSpecialParam(JsonNode taskNode) {
-        // add sqlParameters
-        ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS);
-        DataSource dataSource = dataSourceMapper.selectById(sqlParameters.get("datasource").asInt());
-        if (null != dataSource) {
-            sqlParameters.put("datasourceName", dataSource.getName());
-        }
-        ((ObjectNode)taskNode).set(PARAMS, sqlParameters);
-
-        return taskNode;
-    }
-
-    /**
-     * import process add datasource params
-     * @param taskNode task node json object
-     * @return task node json object
-     */
-    @Override
-    public JsonNode addImportSpecialParam(JsonNode taskNode) {
-        ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS);
-        List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.path("datasourceName").asText());
-        if (!dataSources.isEmpty()) {
-            DataSource dataSource = dataSources.get(0);
-            sqlParameters.put("datasource", dataSource.getId());
-        }
-        ((ObjectNode)taskNode).set(PARAMS, sqlParameters);
-        return taskNode;
-    }
-
-
-    /**
-     * put datasource strategy
-     */
-    @Override
-    public void afterPropertiesSet() {
-        TaskNodeParamFactory.register(TaskType.SQL.getDesc(), this);
-        TaskNodeParamFactory.register(TaskType.PROCEDURE.getDesc(), this);
-    }
-}
\ No newline at end of file
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java
deleted file mode 100644
index 29746f8..0000000
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.api.utils.exportprocess;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.utils.*;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.Project;
-import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
-import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-/**
- * task node add dependent param strategy
- */
-@Service
-public class DependentParam implements ProcessAddTaskParam, InitializingBean {
-
-    private static final String DEPENDENCE = "dependence";
-
-    @Autowired
-    ProcessDefinitionMapper processDefineMapper;
-
-    @Autowired
-    ProjectMapper projectMapper;
-
-    /**
-     * add dependent param
-     * @param taskNode task node json object
-     * @return task node json object
-     */
-    @Override
-    public JsonNode addExportSpecialParam(JsonNode taskNode) {
-        // add dependent param
-        ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText());
-
-        if (null != dependentParameters) {
-            ArrayNode dependTaskList = (ArrayNode) dependentParameters.get("dependTaskList");
-            for (int j = 0; j < dependTaskList.size(); j++) {
-                JsonNode dependentTaskModel = dependTaskList.path(j);
-                ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList");
-                for (int k = 0; k < dependItemList.size(); k++) {
-                    ObjectNode dependentItem = (ObjectNode) dependItemList.path(k);
-                    int definitionId = dependentItem.path("definitionId").asInt();
-                    ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId);
-                    if (null != definition) {
-                        dependentItem.put("projectName", definition.getProjectName());
-                        dependentItem.put("definitionName", definition.getName());
-                    }
-                }
-            }
-            ((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters);
-        }
-
-        return taskNode;
-    }
-
-    /**
-     * import process add dependent param
-     * @param taskNode task node json object
-     * @return
-     */
-    @Override
-    public JsonNode addImportSpecialParam(JsonNode taskNode) {
-        ObjectNode dependentParameters =  JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText());
-        if(dependentParameters != null){
-            ArrayNode dependTaskList = (ArrayNode) dependentParameters.path("dependTaskList");
-            for (int h = 0; h < dependTaskList.size(); h++) {
-                ObjectNode dependentTaskModel = (ObjectNode) dependTaskList.path(h);
-                ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList");
-                for (int k = 0; k < dependItemList.size(); k++) {
-                    ObjectNode dependentItem = (ObjectNode) dependItemList.path(k);
-                    Project dependentItemProject = projectMapper.queryByName(dependentItem.path("projectName").asText());
-                    if(dependentItemProject != null){
-                        ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getCode(),dependentItem.path("definitionName").asText());
-                        if(definition != null){
-                            dependentItem.put("projectId",dependentItemProject.getId());
-                            dependentItem.put("definitionId",definition.getId());
-                        }
-                    }
-                }
-            }
-            ((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters);
-        }
-        return taskNode;
-    }
-
-    /**
-     * put dependent strategy
-     */
-    @Override
-    public void afterPropertiesSet() {
-        TaskNodeParamFactory.register(TaskType.DEPENDENT.getDesc(), this);
-    }
-}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java
deleted file mode 100644
index 8e40855..0000000
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.api.utils.exportprocess;
-
-import com.fasterxml.jackson.databind.JsonNode;
-
-/**
- * ProcessAddTaskParam
- */
-public interface ProcessAddTaskParam {
-
-    /**
-     * add export task special param: sql task dependent task
-     * @param taskNode task node json object
-     * @return task node json object
-     */
-    JsonNode addExportSpecialParam(JsonNode taskNode);
-
-    /**
-     * add task special param: sql task dependent task
-     * @param taskNode task node json object
-     * @return task node json object
-     */
-    JsonNode addImportSpecialParam(JsonNode taskNode);
-}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java
deleted file mode 100644
index b8f7b03..0000000
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.api.utils.exportprocess;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * task node param factory
- */
-public class TaskNodeParamFactory {
-
-    private static Map<String, ProcessAddTaskParam> taskServices = new ConcurrentHashMap<>();
-
-    public static ProcessAddTaskParam getByTaskType(String taskType){
-        return taskServices.get(taskType);
-    }
-
-    static void register(String taskType, ProcessAddTaskParam addSpecialTaskParam){
-        if (null != taskType) {
-            taskServices.put(taskType, addSpecialTaskParam);
-        }
-    }
-}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 8ee5b9a..fc0b42a 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -47,6 +47,7 @@ import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
@@ -59,6 +60,7 @@ import org.apache.http.entity.ContentType;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -82,6 +84,9 @@ import org.springframework.web.multipart.MultipartFile;
 
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
  * process definition service test
@@ -129,6 +134,7 @@ public class ProcessDefinitionServiceTest {
             + "    \"tenantId\": 1,\n"
             + "    \"timeout\": 0\n"
             + "}";
+
     private static final String CYCLE_SHELL_JSON = "{\n"
             + "    \"globalParams\": [\n"
             + "        \n"
@@ -231,25 +237,37 @@ public class ProcessDefinitionServiceTest {
             + "    \"tenantId\": 1,\n"
             + "    \"timeout\": 0\n"
             + "}";
+
     @InjectMocks
     private ProcessDefinitionServiceImpl processDefinitionService;
+
     @Mock
-    private ProcessDefinitionMapper processDefineMapper;
+    private ProcessDefinitionMapper processDefinitionMapper;
+
     @Mock
     private ProcessTaskRelationMapper processTaskRelationMapper;
+
     @Mock
     private ProjectMapper projectMapper;
+
     @Mock
     private ProjectServiceImpl projectService;
+
     @Mock
     private ScheduleMapper scheduleMapper;
+
     @Mock
     private ProcessService processService;
+
     @Mock
     private ProcessInstanceService processInstanceService;
+
     @Mock
     private TaskInstanceMapper taskInstanceMapper;
 
+    @Mock
+    private DataSourceMapper dataSourceMapper;
+
     @Test
     public void testQueryProcessDefinitionList() {
         String projectName = "project_test1";
@@ -273,7 +291,7 @@ public class ProcessDefinitionServiceTest {
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
         List<ProcessDefinition> resourceList = new ArrayList<>();
         resourceList.add(getProcessDefinition());
-        Mockito.when(processDefineMapper.queryAllDefinitionList(project.getCode())).thenReturn(resourceList);
+        Mockito.when(processDefinitionMapper.queryAllDefinitionList(project.getCode())).thenReturn(resourceList);
         Map<String, Object> checkSuccessRes = processDefinitionService.queryProcessDefinitionList(loginUser, "project_test1");
         Assert.assertEquals(Status.SUCCESS, checkSuccessRes.get(Constants.STATUS));
     }
@@ -303,7 +321,7 @@ public class ProcessDefinitionServiceTest {
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
         Page<ProcessDefinition> page = new Page<>(1, 10);
         page.setTotal(30);
-        Mockito.when(processDefineMapper.queryDefineListPaging(
+        Mockito.when(processDefinitionMapper.queryDefineListPaging(
                 Mockito.any(IPage.class)
                 , Mockito.eq("")
                 , Mockito.eq(loginUser.getId())
@@ -339,7 +357,7 @@ public class ProcessDefinitionServiceTest {
         //project check auth success, instance not exist
         putMsg(result, Status.SUCCESS, projectName);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
-        Mockito.when(processDefineMapper.selectById(1)).thenReturn(null);
+        Mockito.when(processDefinitionMapper.selectById(1)).thenReturn(null);
 
         String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
                 + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
@@ -356,7 +374,7 @@ public class ProcessDefinitionServiceTest {
         Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
 
         //instance exit
-        Mockito.when(processDefineMapper.selectById(46)).thenReturn(getProcessDefinition());
+        Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(getProcessDefinition());
         Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionById(loginUser,
                 "project_test1", 46);
         Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
@@ -385,7 +403,7 @@ public class ProcessDefinitionServiceTest {
         //project check auth success, instance not exist
         putMsg(result, Status.SUCCESS, projectName);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
-        Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null);
+        Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null);
 
         ProcessData processData = getProcessData();
         Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData);
@@ -394,7 +412,7 @@ public class ProcessDefinitionServiceTest {
         Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
 
         //instance exit
-        Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test")).thenReturn(getProcessDefinition());
+        Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), "test")).thenReturn(getProcessDefinition());
         Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionByName(loginUser,
                 "project_test1", "test");
         Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
@@ -448,7 +466,7 @@ public class ProcessDefinitionServiceTest {
         definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}");
         definition.setConnects("[]");
 
-        Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition);
+        Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(definition);
         Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(getProcessData());
 
         Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition(
@@ -517,7 +535,7 @@ public class ProcessDefinitionServiceTest {
         //project check auth success, instance not exist
         putMsg(result, Status.SUCCESS, projectName);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
-        Mockito.when(processDefineMapper.selectById(1)).thenReturn(null);
+        Mockito.when(processDefinitionMapper.selectById(1)).thenReturn(null);
         Map<String, Object> instanceNotexitRes = processDefinitionService.deleteProcessDefinitionById(loginUser,
                 "project_test1", 1);
         Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
@@ -525,7 +543,7 @@ public class ProcessDefinitionServiceTest {
         ProcessDefinition processDefinition = getProcessDefinition();
         //user no auth
         loginUser.setUserType(UserType.GENERAL_USER);
-        Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
+        Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition);
         Map<String, Object> userNoAuthRes = processDefinitionService.deleteProcessDefinitionById(loginUser,
                 "project_test1", 46);
         Assert.assertEquals(Status.USER_NO_OPERATION_PERM, userNoAuthRes.get(Constants.STATUS));
@@ -533,14 +551,14 @@ public class ProcessDefinitionServiceTest {
         //process definition online
         loginUser.setUserType(UserType.ADMIN_USER);
         processDefinition.setReleaseState(ReleaseState.ONLINE);
-        Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
+        Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition);
         Map<String, Object> dfOnlineRes = processDefinitionService.deleteProcessDefinitionById(loginUser,
                 "project_test1", 46);
         Assert.assertEquals(Status.PROCESS_DEFINE_STATE_ONLINE, dfOnlineRes.get(Constants.STATUS));
 
         //scheduler list elements > 1
         processDefinition.setReleaseState(ReleaseState.OFFLINE);
-        Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
+        Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition);
         List<Schedule> schedules = new ArrayList<>();
         schedules.add(getSchedule());
         schedules.add(getSchedule());
@@ -564,13 +582,13 @@ public class ProcessDefinitionServiceTest {
         schedule.setReleaseState(ReleaseState.OFFLINE);
         schedules.add(schedule);
         Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules);
-        Mockito.when(processDefineMapper.deleteById(46)).thenReturn(0);
+        Mockito.when(processDefinitionMapper.deleteById(46)).thenReturn(0);
         Map<String, Object> deleteFail = processDefinitionService.deleteProcessDefinitionById(loginUser,
                 "project_test1", 46);
         Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS));
 
         //delete success
-        Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
+        Mockito.when(processDefinitionMapper.deleteById(46)).thenReturn(1);
         Map<String, Object> deleteSuccess = processDefinitionService.deleteProcessDefinitionById(loginUser,
                 "project_test1", 46);
         Assert.assertEquals(Status.SUCCESS, deleteSuccess.get(Constants.STATUS));
@@ -597,7 +615,7 @@ public class ProcessDefinitionServiceTest {
 
         // project check auth success, processs definition online
         putMsg(result, Status.SUCCESS, projectName);
-        Mockito.when(processDefineMapper.selectById(46)).thenReturn(getProcessDefinition());
+        Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(getProcessDefinition());
         Map<String, Object> onlineRes = processDefinitionService.releaseProcessDefinition(
                 loginUser, "project_test1", 46, ReleaseState.ONLINE);
         Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS));
@@ -605,7 +623,7 @@ public class ProcessDefinitionServiceTest {
         // project check auth success, processs definition online
         ProcessDefinition processDefinition1 = getProcessDefinition();
         processDefinition1.setResourceIds("1,2");
-        Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition1);
+        Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition1);
         Mockito.when(processService.getUserById(1)).thenReturn(loginUser);
         Map<String, Object> onlineWithResourceRes = processDefinitionService.releaseProcessDefinition(
                 loginUser, "project_test1", 46, ReleaseState.ONLINE);
@@ -638,13 +656,13 @@ public class ProcessDefinitionServiceTest {
 
         //project check auth success, process not exist
         putMsg(result, Status.SUCCESS, projectName);
-        Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(null);
+        Mockito.when(processDefinitionMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(null);
         Map<String, Object> processNotExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser,
                 "project_test1", "test_pdf");
         Assert.assertEquals(Status.SUCCESS, processNotExistRes.get(Constants.STATUS));
 
         //process exist
-        Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(getProcessDefinition());
+        Mockito.when(processDefinitionMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(getProcessDefinition());
         Map<String, Object> processExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser,
                 "project_test1", "test_pdf");
         Assert.assertEquals(Status.PROCESS_DEFINITION_NAME_EXIST, processExistRes.get(Constants.STATUS));
@@ -684,19 +702,19 @@ public class ProcessDefinitionServiceTest {
     @Test
     public void testGetTaskNodeListByDefinitionId() {
         //process definition not exist
-        Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(null);
+        Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(null);
         Map<String, Object> processDefinitionNullRes = processDefinitionService.getTaskNodeListByDefinitionCode(46L);
         Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
 
         //process data null
         ProcessDefinition processDefinition = getProcessDefinition();
-        Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
+        Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
         Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionCode(46L);
         Assert.assertEquals(Status.DATA_IS_NOT_VALID, successRes.get(Constants.STATUS));
 
         //success
         Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(new ProcessData());
-        Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
+        Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
         Map<String, Object> dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionCode(46L);
         Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS));
     }
@@ -707,7 +725,7 @@ public class ProcessDefinitionServiceTest {
         String defineCodeList = "46";
         Long[] codeArray = {46L};
         List<Long> codeList = Arrays.asList(codeArray);
-        Mockito.when(processDefineMapper.queryByCodes(codeList)).thenReturn(null);
+        Mockito.when(processDefinitionMapper.queryByCodes(codeList)).thenReturn(null);
         Map<String, Object> processNotExistRes = processDefinitionService.getTaskNodeListByDefinitionCodeList(defineCodeList);
         Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processNotExistRes.get(Constants.STATUS));
 
@@ -715,7 +733,7 @@ public class ProcessDefinitionServiceTest {
         ProcessDefinition processDefinition = getProcessDefinition();
         List<ProcessDefinition> processDefinitionList = new ArrayList<>();
         processDefinitionList.add(processDefinition);
-        Mockito.when(processDefineMapper.queryByCodes(codeList)).thenReturn(processDefinitionList);
+        Mockito.when(processDefinitionMapper.queryByCodes(codeList)).thenReturn(processDefinitionList);
         ProcessData processData = getProcessData();
         Mockito.when(processService.genProcessData(processDefinition)).thenReturn(processData);
 
@@ -748,7 +766,7 @@ public class ProcessDefinitionServiceTest {
         processDefinitionList.add(processDefinition);
         Project test = getProject("test");
         Mockito.when(projectMapper.selectById(projectId)).thenReturn(test);
-        Mockito.when(processDefineMapper.queryAllDefinitionList(test.getCode())).thenReturn(processDefinitionList);
+        Mockito.when(processDefinitionMapper.queryAllDefinitionList(test.getCode())).thenReturn(processDefinitionList);
         Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionAllByProjectId(projectId);
         Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
     }
@@ -758,7 +776,7 @@ public class ProcessDefinitionServiceTest {
         //process definition not exist
         ProcessDefinition processDefinition = getProcessDefinition();
         processDefinition.setProcessDefinitionJson(SHELL_JSON);
-        Mockito.when(processDefineMapper.selectById(46)).thenReturn(null);
+        Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(null);
         Map<String, Object> processDefinitionNullRes = processDefinitionService.viewTree(46, 10);
         Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
 
@@ -782,7 +800,7 @@ public class ProcessDefinitionServiceTest {
         taskInstance.setHost("192.168.xx.xx");
 
         //task instance not exist
-        Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
+        Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition);
         Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
         Map<String, Object> taskNullRes = processDefinitionService.viewTree(46, 10);
         Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
@@ -817,7 +835,7 @@ public class ProcessDefinitionServiceTest {
         taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
         taskInstance.setHost("192.168.xx.xx");
         taskInstance.setTaskParams("\"processDefinitionId\": \"222\",\n");
-        Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
+        Mockito.when(processDefinitionMapper.selectById(46)).thenReturn(processDefinition);
         Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
         Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
         Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
@@ -1035,6 +1053,7 @@ public class ProcessDefinitionServiceTest {
                 + "    \"tenantId\": 1,\n"
                 + "    \"timeout\": 0\n"
                 + "}";
+
         Map<String, Object> updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectName, 1, "test",
                 sqlDependentJson, "", "", "");
 
@@ -1075,7 +1094,7 @@ public class ProcessDefinitionServiceTest {
         checkResult.put(Constants.STATUS, Status.SUCCESS);
         Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(checkResult);
-        Mockito.when(processDefineMapper.queryByDefineId(1)).thenReturn(processDefinition);
+        Mockito.when(processDefinitionMapper.queryByDefineId(1)).thenReturn(processDefinition);
         HttpServletResponse response = mock(HttpServletResponse.class);
 
         ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
@@ -1218,4 +1237,110 @@ public class ProcessDefinitionServiceTest {
         Assert.assertEquals(0, processDefinitionService.importProcessSchedule(loginUser, projectName, processMeta, processDefinitionName, processDefinitionId));
     }
 
+    @Test
+    public void testAddExportTaskNodeSpecialParam() {
+        String sqlDependentJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":"
+                + "\"sql\",\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\",\"udfs\":\"\""
+                + ",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\","
+                + "\"localParams\":[],\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[]},\"description\":"
+                + "\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\","
+                + "\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\","
+                + "\"workerGroupId\":-1,\"preTasks\":[\"dependent\"]},{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\","
+                + "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":"
+                + "{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"projectId\":"
+                + "2,\"definitionId\":46,\"depTasks\":\"ALL\",\"cycle\":\"day\",\"dateValue\":\"today\"}]}]},"
+                + "\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"enable\":false},"
+                + "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,"
+                + "\"timeout\":0}";
+
+        ProcessData processData = JSONUtils.parseObject(sqlDependentJson, ProcessData.class);
+
+        DataSource dataSource = new DataSource();
+        dataSource.setName("testDataSource");
+        when(dataSourceMapper.selectById(1)).thenReturn(dataSource);
+
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setProjectName("testProjectName");
+        processDefinition.setName("testDefinitionName");
+        when(processDefinitionMapper.queryByDefineId(46)).thenReturn(processDefinition);
+
+        try {
+            Class clazz = ProcessDefinitionServiceImpl.class;
+            Method method = clazz.getDeclaredMethod("addExportTaskNodeSpecialParam", ProcessData.class);
+            method.setAccessible(true);
+            method.invoke(processDefinitionService, processData);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.toString());
+        }
+
+        List<TaskNode> taskNodeList = processData.getTasks();
+        ObjectNode sqlParameters = JSONUtils.parseObject(taskNodeList.get(0).getParams());
+        Assert.assertEquals("testDataSource", sqlParameters.get(Constants.TASK_PARAMS_DATASOURCE_NAME).asText());
+
+        ObjectNode dependentParameters = JSONUtils.parseObject(taskNodeList.get(1).getDependence());
+        ArrayNode dependTaskList = (ArrayNode)dependentParameters.get(Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST);
+        JsonNode dependentTaskModel = dependTaskList.path(0);
+        ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get(Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST);
+        ObjectNode dependentItem = (ObjectNode)dependItemList.path(0);
+        Assert.assertEquals("testProjectName", dependentItem.get(Constants.TASK_DEPENDENCE_PROJECT_NAME).asText());
+        Assert.assertEquals("testDefinitionName", dependentItem.get(Constants.TASK_DEPENDENCE_DEFINITION_NAME).asText());
+    }
+
+    @Test
+    public void testAddImportTaskNodeSpecialParam() {
+        String definitionJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\","
+                + "\"params\":{\"type\":\"MYSQL\",\"datasourceName\":\"testDataSource\",\"sql\":\"select * from test\","
+                + "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":"
+                + "\"TABLE\",\"localParams\":[],\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[]},"
+                + "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+                + "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":"
+                + "\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[\"dependent\"]},{\"type\":\"DEPENDENT\",\"id\":"
+                + "\"tasks-33787\",\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\","
+                + "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":"
+                + "[{\"projectName\":\"testProjectName\",\"definitionName\":\"testDefinitionName\",\"depTasks\":\"ALL\""
+                + ",\"cycle\":\"day\",\"dateValue\":\"today\"}]}]},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\""
+                + "timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\""
+                + ":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
+
+        ObjectNode jsonObject = JSONUtils.parseObject(definitionJson);
+        ArrayNode jsonArray = (ArrayNode) jsonObject.get("tasks");
+
+        List<DataSource> dataSources = new ArrayList<>();
+        DataSource dataSource = new DataSource();
+        dataSource.setId(1);
+        dataSources.add(dataSource);
+        when(dataSourceMapper.queryDataSourceByName("testDataSource")).thenReturn(dataSources);
+
+        Project project = new Project();
+        project.setId(1);
+        project.setCode(1L);
+        when(projectMapper.queryByName("testProjectName")).thenReturn(project);
+
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setId(1);
+        when(processDefinitionMapper.queryByDefineName(1L, "testDefinitionName")).thenReturn(processDefinition);
+
+        try {
+            Class clazz = ProcessDefinitionServiceImpl.class;
+            Method method = clazz.getDeclaredMethod("addImportTaskNodeSpecialParam", ArrayNode.class);
+            method.setAccessible(true);
+            method.invoke(processDefinitionService, jsonArray);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail(e.toString());
+        }
+
+        ObjectNode sqlParameters = (ObjectNode)jsonArray.path(0).path(Constants.TASK_PARAMS);
+        Assert.assertEquals(1, sqlParameters.get(Constants.TASK_PARAMS_DATASOURCE).asInt());
+
+        ObjectNode dependentParameters = (ObjectNode)jsonArray.path(1).path(Constants.DEPENDENCE);
+        ArrayNode dependTaskList = (ArrayNode)dependentParameters.get(Constants.TASK_DEPENDENCE_DEPEND_TASK_LIST);
+        JsonNode dependentTaskModel = dependTaskList.path(0);
+        ArrayNode dependItemList = (ArrayNode)dependentTaskModel.get(Constants.TASK_DEPENDENCE_DEPEND_ITEM_LIST);
+        ObjectNode dependentItem = (ObjectNode)dependItemList.path(0);
+        Assert.assertEquals(1, dependentItem.get(Constants.TASK_DEPENDENCE_PROJECT_ID).asInt());
+        Assert.assertEquals(1, dependentItem.get(Constants.TASK_DEPENDENCE_DEFINITION_ID).asInt());
+    }
+
 }
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java
deleted file mode 100644
index ceee22f..0000000
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.utils.exportprocess;
-
-import org.apache.dolphinscheduler.api.controller.AbstractControllerTest;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-
-import org.json.JSONException;
-import org.junit.Test;
-import org.skyscreamer.jsonassert.JSONAssert;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * DataSourceParamTest
- */
-public class DataSourceParamTest extends AbstractControllerTest {
-
-    @Test
-    public void testAddExportDependentSpecialParam() throws JSONException {
-
-        String sqlJson = "{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\","
-                + "\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\","
-                + "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\""
-                + ",\"localParams\":[],\"connParams\":\"\","
-                + "\"preStatements\":[],\"postStatements\":[]},"
-                + "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
-                + "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\","
-                + "\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,"
-                + "\"preTasks\":[\"dependent\"]}";
-
-        ObjectNode taskNode = JSONUtils.parseObject(sqlJson);
-        if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
-            String taskType = taskNode.path("type").asText();
-
-            ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
-
-            JsonNode sql = addTaskParam.addExportSpecialParam(taskNode);
-
-            JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false);
-        }
-    }
-
-    @Test
-    public void testAddImportDependentSpecialParam() throws JSONException {
-        String sqlJson = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\","
-                + "\"type\":\"SQL\",\"params\":{\"postStatements\":[],"
-                + "\"connParams\":\"\",\"receiversCc\":\"\",\"udfs\":\"\","
-                + "\"type\":\"MYSQL\",\"title\":\"\",\"sql\":\"show tables\",\""
-                + "preStatements\":[],\"sqlType\":\"1\",\"receivers\":\"\",\"datasource\":1,"
-                + "\"showType\":\"TABLE\",\"localParams\":[],\"datasourceName\":\"dsmetadata\"},\"timeout\""
-                + ":{\"enable\":false,\"strategy\":\"\"},\"maxRetryTimes\":\"0\","
-                + "\"taskInstancePriority\":\"MEDIUM\",\"name\":\"mysql\",\"dependence\":{},"
-                + "\"retryInterval\":\"1\",\"preTasks\":[\"dependent\"],\"id\":\"tasks-8745\"}";
-
-        ObjectNode taskNode = JSONUtils.parseObject(sqlJson);
-        if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
-            String taskType = taskNode.path("type").asText();
-
-            ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
-
-            JsonNode sql = addTaskParam.addImportSpecialParam(taskNode);
-
-            JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false);
-        }
-    }
-}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java
deleted file mode 100644
index 531856c..0000000
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.api.utils.exportprocess;
-
-import org.apache.dolphinscheduler.api.controller.AbstractControllerTest;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-
-import org.json.JSONException;
-import org.junit.Test;
-import org.skyscreamer.jsonassert.JSONAssert;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * DependentParamTest
- */
-public class DependentParamTest extends AbstractControllerTest {
-
-    @Test
-    public void testAddExportDependentSpecialParam() throws JSONException {
-        String dependentJson = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\","
-                + "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\","
-                + "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\","
-                + "\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\","
-                + "\"cycle\":\"day\",\"dateValue\":\"today\"}]}]}}";
-
-        ObjectNode taskNode = JSONUtils.parseObject(dependentJson);
-        if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
-            String taskType = taskNode.path("type").asText();
-
-            ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
-
-            JsonNode dependent = addTaskParam.addExportSpecialParam(taskNode);
-
-            JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false);
-        }
-
-        String dependentEmpty = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\","
-                + "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"}";
-
-        ObjectNode taskEmpty = JSONUtils.parseObject(dependentEmpty);
-        if (StringUtils.isNotEmpty(taskEmpty.path("type").asText())) {
-            String taskType = taskEmpty.path("type").asText();
-
-            ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
-
-            JsonNode dependent = addTaskParam.addImportSpecialParam(taskEmpty);
-
-            JSONAssert.assertEquals(taskEmpty.toString(), dependent.toString(), false);
-        }
-
-    }
-
-    @Test
-    public void testAddImportDependentSpecialParam() throws JSONException {
-        String dependentJson = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\""
-                + ",\"type\":\"DEPENDENT\",\"params\":{},\"timeout\":{\"enable\":false,"
-                + "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\""
-                + ",\"name\":\"dependent\","
-                + "\"dependence\":{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\","
-                + "\"definitionName\":\"shell-1\",\"depTasks\":\"shell-1\",\"projectName\":\"test\","
-                + "\"projectId\":1,\"cycle\":\"day\",\"definitionId\":7}],\"relation\":\"AND\"}],"
-                + "\"relation\":\"AND\"},\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}";
-
-        ObjectNode taskNode = JSONUtils.parseObject(dependentJson);
-        if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
-            String taskType = taskNode.path("type").asText();
-
-            ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
-
-            JsonNode dependent = addTaskParam.addImportSpecialParam(taskNode);
-
-            JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false);
-        }
-
-        String dependentEmpty = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\""
-                + ",\"type\":\"DEPENDENT\",\"params\":{},\"timeout\":{\"enable\":false,"
-                + "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\""
-                + ",\"name\":\"dependent\",\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}";
-
-        JsonNode taskNodeEmpty = JSONUtils.parseObject(dependentEmpty);
-        if (StringUtils.isNotEmpty(taskNodeEmpty.path("type").asText())) {
-            String taskType = taskNodeEmpty.path("type").asText();
-
-            ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
-
-            JsonNode dependent = addTaskParam.addImportSpecialParam(taskNode);
-
-            JSONAssert.assertEquals(taskNodeEmpty.toString(), dependent.toString(), false);
-        }
-
-    }
-}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 225dc23..53645b7 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -1091,4 +1091,17 @@ public final class Constants {
     public static final boolean DOCKER_MODE = StringUtils.isNotEmpty(System.getenv("DOCKER"));
     public static final boolean KUBERNETES_MODE = StringUtils.isNotEmpty(System.getenv("KUBERNETES_SERVICE_HOST")) && StringUtils.isNotEmpty(System.getenv("KUBERNETES_SERVICE_PORT"));
 
+    /**
+     * task parameter keys
+     */
+    public static final String TASK_PARAMS = "params";
+    public static final String TASK_PARAMS_DATASOURCE = "datasource";
+    public static final String TASK_PARAMS_DATASOURCE_NAME = "datasourceName";
+    public static final String TASK_DEPENDENCE = "dependence";
+    public static final String TASK_DEPENDENCE_DEPEND_TASK_LIST = "dependTaskList";
+    public static final String TASK_DEPENDENCE_DEPEND_ITEM_LIST = "dependItemList";
+    public static final String TASK_DEPENDENCE_PROJECT_ID = "projectId";
+    public static final String TASK_DEPENDENCE_PROJECT_NAME = "projectName";
+    public static final String TASK_DEPENDENCE_DEFINITION_ID = "definitionId";
+    public static final String TASK_DEPENDENCE_DEFINITION_NAME = "definitionName";
 }