You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/01/05 09:55:12 UTC
[dolphinscheduler] branch dev updated: [DS-7016][feat] Auto create workflow while import sql script with specific hint (#7214)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 4c2f77e [DS-7016][feat] Auto create workflow while import sql script with specific hint (#7214)
4c2f77e is described below
commit 4c2f77ee9cbd599edfb38e4bf82755f74e96e7c6
Author: ououtt <73...@qq.com>
AuthorDate: Wed Jan 5 17:55:08 2022 +0800
[DS-7016][feat] Auto create workflow while import sql script with specific hint (#7214)
* [DS-7016][feat] Auto create workflow while import sql script with specific hint
* datasource : datasource name
* name: task name
* upstream: pre task names
* [DS-7016][feat] Auto create workflow while import sql script with specific hint
* remove excess blank lines
* [DS-7016][feat] Auto create workflow while import sql script with specific hint
* datasource : datasource name
* name: task name
* upstream: pre task names
* [DS-7016][feat] Auto create workflow while import sql script with specific hint
* datasource : datasource name
* name: task name
* upstream: pre task names
* Code optimization
* for Zip Bomb Attack
Co-authored-by: eye <ey...@aloudata.com>
---
.../controller/ProcessDefinitionController.java | 7 +-
.../api/service/ProcessDefinitionService.java | 12 ++
.../service/impl/ProcessDefinitionServiceImpl.java | 211 +++++++++++++++++++++
.../api/service/ProcessDefinitionServiceTest.java | 46 +++++
.../dao/mapper/DataSourceMapper.java | 9 +-
.../dao/mapper/DataSourceMapper.xml | 13 ++
6 files changed, 296 insertions(+), 2 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
index d9516d4..5dba775 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
@@ -696,7 +696,12 @@ public class ProcessDefinitionController extends BaseController {
public Result importProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("file") MultipartFile file) {
- Map<String, Object> result = processDefinitionService.importProcessDefinition(loginUser, projectCode, file);
+ Map<String, Object> result;
+ if ("application/zip".equals(file.getContentType())) {
+ result = processDefinitionService.importSqlProcessDefinition(loginUser, projectCode, file);
+ } else {
+ result = processDefinitionService.importProcessDefinition(loginUser, projectCode, file);
+ }
return returnDataList(result);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index 64a0fbe..99a6d95 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -243,6 +243,18 @@ public interface ProcessDefinitionService {
MultipartFile file);
/**
+ * import sql process definition
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param file sql file, zip
+ * @return import process
+ */
+ Map<String, Object> importSqlProcessDefinition(User loginUser,
+ long projectCode,
+ MultipartFile file);
+
+ /**
* check the process task relation json
*
* @param processTaskRelationJson process task relation json
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index eb6aa5a..20d109a 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
+import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import org.apache.dolphinscheduler.api.dto.DagDataSchedule;
import org.apache.dolphinscheduler.api.dto.ScheduleParam;
@@ -34,22 +35,29 @@ import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
+import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
+import org.apache.dolphinscheduler.common.task.sql.SqlType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.DagData;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -62,6 +70,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
@@ -79,11 +88,14 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
@@ -93,6 +105,8 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
@@ -167,6 +181,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private TenantMapper tenantMapper;
+ @Autowired
+ private DataSourceMapper dataSourceMapper;
+
/**
* create process definition
*
@@ -867,6 +884,200 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
+ @Override
+ @Transactional(rollbackFor = RuntimeException.class)
+ public Map<String, Object> importSqlProcessDefinition(User loginUser, long projectCode, MultipartFile file) {
+ Map<String, Object> result = new HashMap<>();
+ String processDefinitionName = file.getOriginalFilename() == null ? file.getName() : file.getOriginalFilename();
+ int index = processDefinitionName.lastIndexOf(".");
+ if (index > 0) {
+ processDefinitionName = processDefinitionName.substring(0, index);
+ }
+ processDefinitionName = processDefinitionName + "_import_" + DateUtils.getCurrentTimeStamp();
+
+ ProcessDefinition processDefinition;
+ List<TaskDefinitionLog> taskDefinitionList = new ArrayList<>();
+ List<ProcessTaskRelationLog> processTaskRelationList = new ArrayList<>();
+
+ // for Zip Bomb Attack
+ int THRESHOLD_ENTRIES = 10000;
+ int THRESHOLD_SIZE = 1000000000; // 1 GB
+ double THRESHOLD_RATIO = 10;
+ int totalEntryArchive = 0;
+ int totalSizeEntry = 0;
+ // In most cases, there will be only one data source
+ Map<String, DataSource> dataSourceCache = new HashMap<>(1);
+ Map<String, Long> taskNameToCode = new HashMap<>(16);
+ Map<String, List<String>> taskNameToUpstream = new HashMap<>(16);
+ try (ZipInputStream zIn = new ZipInputStream(file.getInputStream());
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(zIn))) {
+ // build process definition
+ processDefinition = new ProcessDefinition(projectCode,
+ processDefinitionName,
+ CodeGenerateUtils.getInstance().genCode(),
+ "",
+ "[]", null,
+ 0, loginUser.getId(), loginUser.getTenantId());
+
+ ZipEntry entry;
+ while ((entry = zIn.getNextEntry()) != null) {
+ totalEntryArchive ++;
+ int totalSizeArchive = 0;
+ if (!entry.isDirectory()) {
+ StringBuilder sql = new StringBuilder();
+ String taskName = null;
+ String datasourceName = null;
+ List<String> upstreams = Collections.emptyList();
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ int nBytes = line.getBytes(StandardCharsets.UTF_8).length;
+ totalSizeEntry += nBytes;
+ totalSizeArchive += nBytes;
+ long compressionRatio = totalSizeEntry / entry.getCompressedSize();
+ if(compressionRatio > THRESHOLD_RATIO) {
+ throw new IllegalStateException("ratio between compressed and uncompressed data is highly suspicious, looks like a Zip Bomb Attack");
+ }
+ int commentIndex = line.indexOf("-- ");
+ if (commentIndex >= 0) {
+ int colonIndex = line.indexOf(":", commentIndex);
+ if (colonIndex > 0) {
+ String key = line.substring(commentIndex + 3, colonIndex).trim().toLowerCase();
+ String value = line.substring(colonIndex + 1).trim();
+ switch (key) {
+ case "name":
+ taskName = value;
+ line = line.substring(0, commentIndex);
+ break;
+ case "upstream":
+ upstreams = Arrays.stream(value.split(",")).map(String::trim)
+ .filter(s -> !"".equals(s)).collect(Collectors.toList());
+ line = line.substring(0, commentIndex);
+ break;
+ case "datasource":
+ datasourceName = value;
+ line = line.substring(0, commentIndex);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ if (!"".equals(line)) {
+ sql.append(line).append("\n");
+ }
+ }
+ // import/sql1.sql -> sql1
+ if (taskName == null) {
+ taskName = entry.getName();
+ index = taskName.indexOf("/");
+ if (index > 0) {
+ taskName = taskName.substring(index + 1);
+ }
+ index = taskName.lastIndexOf(".");
+ if (index > 0) {
+ taskName = taskName.substring(0, index);
+ }
+ }
+ DataSource dataSource = dataSourceCache.get(datasourceName);
+ if (dataSource == null) {
+ dataSource = queryDatasourceByNameAndUser(datasourceName, loginUser);
+ }
+ if (dataSource == null) {
+ putMsg(result, Status.DATASOURCE_NAME_ILLEGAL);
+ return result;
+ }
+ dataSourceCache.put(datasourceName, dataSource);
+
+ TaskDefinitionLog taskDefinition = buildNormalSqlTaskDefinition(taskName, dataSource, sql.substring(0, sql.length() - 1));
+
+ taskDefinitionList.add(taskDefinition);
+ taskNameToCode.put(taskDefinition.getName(), taskDefinition.getCode());
+ taskNameToUpstream.put(taskDefinition.getName(), upstreams);
+ }
+
+ if(totalSizeArchive > THRESHOLD_SIZE) {
+ throw new IllegalStateException("the uncompressed data size is too much for the application resource capacity");
+ }
+
+ if(totalEntryArchive > THRESHOLD_ENTRIES) {
+ throw new IllegalStateException("too much entries in this archive, can lead to inodes exhaustion of the system");
+ }
+ }
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
+ return result;
+ }
+
+ // build task relation
+ for (Map.Entry<String, Long> entry : taskNameToCode.entrySet()) {
+ List<String> upstreams = taskNameToUpstream.get(entry.getKey());
+ if (CollectionUtils.isEmpty(upstreams)
+ || (upstreams.size() == 1 && upstreams.contains("root") && !taskNameToCode.containsKey("root"))) {
+ ProcessTaskRelationLog processTaskRelation = buildNormalTaskRelation(0, entry.getValue());
+ processTaskRelationList.add(processTaskRelation);
+ continue;
+ }
+ for (String upstream : upstreams) {
+ ProcessTaskRelationLog processTaskRelation = buildNormalTaskRelation(taskNameToCode.get(upstream), entry.getValue());
+ processTaskRelationList.add(processTaskRelation);
+ }
+ }
+
+ return createDagDefine(loginUser, processTaskRelationList, processDefinition, taskDefinitionList);
+ }
+
+ private ProcessTaskRelationLog buildNormalTaskRelation(long preTaskCode, long postTaskCode) {
+ ProcessTaskRelationLog processTaskRelation = new ProcessTaskRelationLog();
+ processTaskRelation.setPreTaskCode(preTaskCode);
+ processTaskRelation.setPreTaskVersion(0);
+ processTaskRelation.setPostTaskCode(postTaskCode);
+ processTaskRelation.setPostTaskVersion(0);
+ processTaskRelation.setConditionType(ConditionType.NONE);
+ processTaskRelation.setName("");
+ return processTaskRelation;
+ }
+
+ private DataSource queryDatasourceByNameAndUser(String datasourceName, User loginUser) {
+ if (isAdmin(loginUser)) {
+ List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(datasourceName);
+ if (CollectionUtils.isNotEmpty(dataSources)) {
+ return dataSources.get(0);
+ }
+ } else {
+ return dataSourceMapper.queryDataSourceByNameAndUserId(loginUser.getId(), datasourceName);
+ }
+ return null;
+ }
+
+ private TaskDefinitionLog buildNormalSqlTaskDefinition(String taskName, DataSource dataSource, String sql) throws CodeGenerateException {
+ TaskDefinitionLog taskDefinition = new TaskDefinitionLog();
+ taskDefinition.setName(taskName);
+ taskDefinition.setFlag(Flag.YES);
+ SqlParameters sqlParameters = new SqlParameters();
+ sqlParameters.setType(dataSource.getType().name());
+ sqlParameters.setDatasource(dataSource.getId());
+ sqlParameters.setSql(sql.substring(0, sql.length() - 1));
+ // it may be a query type, but it can only be determined by parsing SQL
+ sqlParameters.setSqlType(SqlType.NON_QUERY.ordinal());
+ sqlParameters.setLocalParams(Collections.emptyList());
+ taskDefinition.setTaskParams(JSONUtils.toJsonString(sqlParameters));
+ taskDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
+ taskDefinition.setTaskType(TaskType.SQL.getDesc());
+ taskDefinition.setFailRetryTimes(0);
+ taskDefinition.setFailRetryInterval(0);
+ taskDefinition.setTimeoutFlag(TimeoutFlag.CLOSE);
+ taskDefinition.setWorkerGroup(DEFAULT_WORKER_GROUP);
+ taskDefinition.setTaskPriority(Priority.MEDIUM);
+ taskDefinition.setEnvironmentCode(-1);
+ taskDefinition.setTimeout(0);
+ taskDefinition.setDelayTime(0);
+ taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
+ taskDefinition.setVersion(0);
+ taskDefinition.setResourceIds("");
+ return taskDefinition;
+ }
+
/**
* check and import
*/
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 607341d..8bdcc1d 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.dao.entity.DagData;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
@@ -46,9 +47,12 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.lang.StringUtils;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -58,6 +62,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
import javax.servlet.http.HttpServletResponse;
@@ -69,6 +75,7 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.mock.web.MockMultipartFile;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -656,6 +663,45 @@ public class ProcessDefinitionServiceTest {
Assert.assertNotNull(processDefinitionService.exportProcessDagData(processDefinition));
}
+ @Test
+ public void testImportSqlProcessDefinition() throws Exception {
+ int userId = 10;
+ User loginUser = Mockito.mock(User.class);
+ Mockito.when(loginUser.getId()).thenReturn(userId);
+ Mockito.when(loginUser.getTenantId()).thenReturn(2);
+ Mockito.when(loginUser.getUserType()).thenReturn(UserType.GENERAL_USER);
+
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ ZipOutputStream outputStream = new ZipOutputStream(byteArrayOutputStream);
+ outputStream.putNextEntry(new ZipEntry("import_sql/"));
+
+ outputStream.putNextEntry(new ZipEntry("import_sql/a.sql"));
+ outputStream.write("-- upstream: start_auto_dag\n-- datasource: mysql_1\nselect 1;".getBytes(StandardCharsets.UTF_8));
+
+ outputStream.putNextEntry(new ZipEntry("import_sql/b.sql"));
+ outputStream.write("-- name: start_auto_dag\n-- datasource: mysql_1\nselect 1;".getBytes(StandardCharsets.UTF_8));
+
+ outputStream.close();
+
+ MockMultipartFile mockMultipartFile = new MockMultipartFile("import_sql.zip", byteArrayOutputStream.toByteArray());
+
+ DataSource dataSource = Mockito.mock(DataSource.class);
+ Mockito.when(dataSource.getId()).thenReturn(1);
+ Mockito.when(dataSource.getType()).thenReturn(DbType.MYSQL);
+
+ Mockito.when(dataSourceMapper.queryDataSourceByNameAndUserId(userId, "mysql_1")).thenReturn(dataSource);
+
+ long projectCode = 1001;
+ Mockito.when(processService.saveTaskDefine(Mockito.same(loginUser), Mockito.eq(projectCode), Mockito.notNull())).thenReturn(2);
+ Mockito.when(processService.saveProcessDefine(Mockito.same(loginUser), Mockito.notNull(), Mockito.notNull())).thenReturn(1);
+ Mockito.when(processService.saveTaskRelation(Mockito.same(loginUser), Mockito.eq(projectCode), Mockito.anyLong(),
+ Mockito.eq(1), Mockito.notNull(), Mockito.notNull())).thenReturn(0);
+
+ Map<String, Object> result = processDefinitionService.importSqlProcessDefinition(loginUser, projectCode, mockMultipartFile);
+
+ Assert.assertEquals(result.get(Constants.STATUS), Status.SUCCESS);
+ }
+
/**
* get mock processDefinition
*
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java
index 0c3238a..bfb0640 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java
@@ -87,5 +87,12 @@ public interface DataSourceMapper extends BaseMapper<DataSource> {
*/
<T> List<DataSource> listAuthorizedDataSource(@Param("userId") int userId,@Param("dataSourceIds")T[] dataSourceIds);
-
+ /**
+ * query datasource by name and user id
+ *
+ * @param userId userId
+ * @param name datasource name
+ * @return If the name does not exist or the user does not have permission, it will return null
+ */
+ DataSource queryDataSourceByNameAndUserId(@Param("userId") int userId, @Param("name") String name);
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml
index acf310e..2241608 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml
@@ -98,4 +98,17 @@
</foreach>
</if>
</select>
+
+ <select id="queryDataSourceByNameAndUserId" resultType="org.apache.dolphinscheduler.dao.entity.DataSource">
+ select
+ ds.id as id,
+ ds.name as name,
+ ds.type as type
+ from t_ds_datasource ds
+ join t_ds_relation_datasource_user rel
+ on ds.id = rel.datasource_id
+ where ds.name = #{name,jdbcType=VARCHAR}
+ and rel.user_id = #{userId}
+ </select>
+
</mapper>