You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2022/06/18 13:53:48 UTC
[dolphinscheduler] 11/16: [fix][Python] Support same task name in project defferent process definition (#10428)
This is an automated email from the ASF dual-hosted git repository.
liudongkai pushed a commit to branch 3.0.0-beta-2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit 476f2395dc2841f2f148cb40de0dbfe10e22536a
Author: 陈家名 <13...@163.com>
AuthorDate: Tue Jun 14 13:53:18 2022 +0800
[fix][Python] Support same task name in project defferent process definition (#10428)
close: #10431
(cherry picked from commit b86dc53ad10cb5c4d76f5e85c38e5a5659a349fd)
---
.../dolphinscheduler/api/python/PythonGateway.java | 14 ++-
.../api/service/TaskDefinitionService.java | 2 +
.../service/impl/TaskDefinitionServiceImpl.java | 5 +-
.../api/python/PythonGatewayTest.java | 123 +++++++++++++++++++++
.../api/service/TaskDefinitionServiceImplTest.java | 6 +-
.../dao/mapper/TaskDefinitionMapper.java | 2 +
.../dao/mapper/TaskDefinitionMapper.xml | 20 +++-
.../dao/mapper/TaskDefinitionMapperTest.java | 25 ++++-
.../src/pydolphinscheduler/core/task.py | 2 +-
9 files changed, 185 insertions(+), 14 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index 817f411854..54962b523d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -166,7 +166,7 @@ public class PythonGateway {
return taskDefinitionService.genTaskCodeList(genNum);
}
- public Map<String, Long> getCodeAndVersion(String projectName, String taskName) throws CodeGenerateUtils.CodeGenerateException {
+ public Map<String, Long> getCodeAndVersion(String projectName, String processDefinitionName, String taskName) throws CodeGenerateUtils.CodeGenerateException {
Project project = projectMapper.queryByName(projectName);
Map<String, Long> result = new HashMap<>();
// project do not exists, mean task not exists too, so we should directly return init value
@@ -175,7 +175,15 @@ public class PythonGateway {
result.put("version", 0L);
return result;
}
- TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName);
+
+ ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
+ if (processDefinition == null) {
+ String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
+ logger.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+
+ TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskName);
if (taskDefinition == null) {
result.put("code", CodeGenerateUtils.getInstance().genCode());
result.put("version", 0L);
@@ -520,7 +528,7 @@ public class PythonGateway {
result.put("processDefinitionCode", processDefinition.getCode());
if (taskName != null) {
- TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, taskName);
+ TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, processDefinition.getCode(), taskName);
result.put("taskDefinitionCode", taskDefinition.getCode());
}
return result;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
index 5d2ba85b02..a715b68e97 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
@@ -60,10 +60,12 @@ public interface TaskDefinitionService {
*
* @param loginUser login user
* @param projectCode project code
+ * @param processCode process code
* @param taskName task name
*/
Map<String, Object> queryTaskDefinitionByName(User loginUser,
long projectCode,
+ long processCode,
String taskName);
/**
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index cd617958e0..15105acfe8 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -270,10 +270,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
*
* @param loginUser login user
* @param projectCode project code
+ * @param processCode process code
* @param taskName task name
*/
@Override
- public Map<String, Object> queryTaskDefinitionByName(User loginUser, long projectCode, String taskName) {
+ public Map<String, Object> queryTaskDefinitionByName(User loginUser, long projectCode, long processCode, String taskName) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
@@ -281,7 +282,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
- TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName);
+ TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskName);
} else {
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
new file mode 100644
index 0000000000..7d8b6efabc
--- /dev/null
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.python;
+
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * python gate test
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class PythonGatewayTest {
+
+ @InjectMocks
+ private PythonGateway pythonGateway;
+
+ @Mock
+ private ProjectMapper projectMapper;
+
+ @Mock
+ private ProcessDefinitionMapper processDefinitionMapper;
+
+ @Mock
+ private TaskDefinitionMapper taskDefinitionMapper;
+
+ @Test
+ public void testGetCodeAndVersion() throws CodeGenerateUtils.CodeGenerateException {
+ Project project = getTestProject();
+ Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project);
+
+ ProcessDefinition processDefinition = getTestProcessDefinition();
+ Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), processDefinition.getName())).thenReturn(processDefinition);
+
+ TaskDefinition taskDefinition = getTestTaskDefinition();
+ Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskDefinition.getName())).thenReturn(taskDefinition);
+
+ Map<String, Long> result = pythonGateway.getCodeAndVersion(project.getName(), processDefinition.getName(), taskDefinition.getName());
+ Assert.assertEquals(result.get("code").longValue(), taskDefinition.getCode());
+ }
+
+ @Test
+ public void testGetDependentInfo() {
+ Project project = getTestProject();
+ Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project);
+
+ ProcessDefinition processDefinition = getTestProcessDefinition();
+ Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), processDefinition.getName())).thenReturn(processDefinition);
+
+ TaskDefinition taskDefinition = getTestTaskDefinition();
+ Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskDefinition.getName())).thenReturn(taskDefinition);
+
+ Map<String, Object> result = pythonGateway.getDependentInfo(project.getName(), processDefinition.getName(), taskDefinition.getName());
+ Assert.assertEquals((long) result.get("taskDefinitionCode"), taskDefinition.getCode());
+ }
+
+ private Project getTestProject() {
+ Project project = new Project();
+ project.setName("ut-project");
+ project.setUserId(111);
+ project.setCode(1L);
+ project.setCreateTime(new Date());
+ project.setUpdateTime(new Date());
+ return project;
+ }
+
+ private ProcessDefinition getTestProcessDefinition() {
+ ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setCode(1L);
+ processDefinition.setName("ut-process-definition");
+ processDefinition.setProjectCode(1L);
+ processDefinition.setUserId(111);
+ processDefinition.setUpdateTime(new Date());
+ processDefinition.setCreateTime(new Date());
+ return processDefinition;
+ }
+
+ private TaskDefinition getTestTaskDefinition() {
+ TaskDefinition taskDefinition = new TaskDefinition();
+ taskDefinition.setCode(888888L);
+ taskDefinition.setName("ut-task-definition");
+ taskDefinition.setProjectCode(1L);
+ taskDefinition.setTaskType("SHELL");
+ taskDefinition.setUserId(111);
+ taskDefinition.setResourceIds("1");
+ taskDefinition.setWorkerGroup("default");
+ taskDefinition.setEnvironmentCode(1L);
+ taskDefinition.setVersion(1);
+ taskDefinition.setCreateTime(new Date());
+ taskDefinition.setUpdateTime(new Date());
+ return taskDefinition;
+ }
+
+}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index 9aade5555c..676688a3ec 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -145,7 +145,7 @@ public class TaskDefinitionServiceImplTest {
public void queryTaskDefinitionByName() {
String taskName = "task";
long projectCode = 1L;
-
+ long processCode = 1L;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
@@ -157,11 +157,11 @@ public class TaskDefinitionServiceImplTest {
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
- Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), taskName))
+ Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName))
.thenReturn(new TaskDefinition());
Map<String, Object> relation = taskDefinitionService
- .queryTaskDefinitionByName(loginUser, projectCode, taskName);
+ .queryTaskDefinitionByName(loginUser, projectCode, processCode, taskName);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
index 304e623a0a..c426da6b04 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
@@ -41,10 +41,12 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
* query task definition by name
*
* @param projectCode projectCode
+ * @param processCode processCode
* @param name name
* @return task definition
*/
TaskDefinition queryByName(@Param("projectCode") long projectCode,
+ @Param("processCode") long processCode,
@Param("name") String name);
/**
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
index 5c889d1b07..255be38b2f 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
@@ -23,12 +23,24 @@
worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
resource_ids, create_time, update_time, task_group_id,task_group_priority
</sql>
+ <sql id="baseSqlV2">
+ ${alias}.id, ${alias}.code, ${alias}.name, ${alias}.version, ${alias}.description, ${alias}.project_code, ${alias}.user_id,
+ ${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code,
+ ${alias}.fail_retry_times, ${alias}.fail_retry_interval, ${alias}.timeout_flag, ${alias}.timeout_notify_strategy, ${alias}.timeout,
+ ${alias}.delay_time, ${alias}.resource_ids, ${alias}.create_time, ${alias}.update_time, ${alias}.task_group_id,
+ ${alias}.task_group_priority, ${alias}.cpu_quota, ${alias}.memory_max
+ </sql>
<select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select
- <include refid="baseSql"/>
- from t_ds_task_definition
- WHERE project_code = #{projectCode}
- and name = #{name}
+ <include refid="baseSqlV2">
+ <property name="alias" value="td"/>
+ </include>
+ from t_ds_task_definition td
+ join t_ds_process_task_relation ptr on ptr.project_code = td.project_code
+ where td.project_code = #{projectCode}
+ and td.name = #{name}
+ and ptr.process_definition_code = #{processCode}
+ and td.code = ptr.post_task_code
</select>
<select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
index 757f751dd2..3d16e2e44d 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
@@ -36,6 +37,9 @@ public class TaskDefinitionMapperTest extends BaseDaoTest {
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
+ @Autowired
+ private ProcessTaskRelationMapper processTaskRelationMapper;
+
@Autowired
private UserMapper userMapper;
@@ -60,6 +64,24 @@ public class TaskDefinitionMapperTest extends BaseDaoTest {
return taskDefinition;
}
+ /**
+ * insert
+ *
+ * @return ProcessDefinition
+ */
+ private ProcessTaskRelation insertTaskRelation(long postTaskCode) {
+ ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+ processTaskRelation.setName("def 1");
+ processTaskRelation.setProjectCode(1L);
+ processTaskRelation.setProcessDefinitionCode(1L);
+ processTaskRelation.setPostTaskCode(postTaskCode);
+ processTaskRelation.setPreTaskCode(0L);
+ processTaskRelation.setUpdateTime(new Date());
+ processTaskRelation.setCreateTime(new Date());
+ processTaskRelationMapper.insert(processTaskRelation);
+ return processTaskRelation;
+ }
+
@Test
public void testInsert() {
TaskDefinition taskDefinition = insertOne();
@@ -69,7 +91,8 @@ public class TaskDefinitionMapperTest extends BaseDaoTest {
@Test
public void testQueryByDefinitionName() {
TaskDefinition taskDefinition = insertOne();
- TaskDefinition result = taskDefinitionMapper.queryByName(taskDefinition.getProjectCode()
+ ProcessTaskRelation processTaskRelation = insertTaskRelation(taskDefinition.getCode());
+ TaskDefinition result = taskDefinitionMapper.queryByName(taskDefinition.getProjectCode(), processTaskRelation.getProcessDefinitionCode()
, taskDefinition.getName());
Assert.assertNotNull(result);
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index 599b979369..08a1cdf7e8 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -273,7 +273,7 @@ class Task(Base):
# TODO get code from specific project process definition and task name
gateway = launch_gateway()
result = gateway.entry_point.getCodeAndVersion(
- self.process_definition._project, self.name
+ self.process_definition._project, self.process_definition.name, self.name
)
# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
# gateway_result_checker(result)