You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2022/06/18 13:53:48 UTC

[dolphinscheduler] 11/16: [fix][Python] Support same task name in project defferent process definition (#10428)

This is an automated email from the ASF dual-hosted git repository.

liudongkai pushed a commit to branch 3.0.0-beta-2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 476f2395dc2841f2f148cb40de0dbfe10e22536a
Author: 陈家名 <13...@163.com>
AuthorDate: Tue Jun 14 13:53:18 2022 +0800

    [fix][Python] Support same task name in project defferent process definition (#10428)
    
    close: #10431
    (cherry picked from commit b86dc53ad10cb5c4d76f5e85c38e5a5659a349fd)
---
 .../dolphinscheduler/api/python/PythonGateway.java |  14 ++-
 .../api/service/TaskDefinitionService.java         |   2 +
 .../service/impl/TaskDefinitionServiceImpl.java    |   5 +-
 .../api/python/PythonGatewayTest.java              | 123 +++++++++++++++++++++
 .../api/service/TaskDefinitionServiceImplTest.java |   6 +-
 .../dao/mapper/TaskDefinitionMapper.java           |   2 +
 .../dao/mapper/TaskDefinitionMapper.xml            |  20 +++-
 .../dao/mapper/TaskDefinitionMapperTest.java       |  25 ++++-
 .../src/pydolphinscheduler/core/task.py            |   2 +-
 9 files changed, 185 insertions(+), 14 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index 817f411854..54962b523d 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -166,7 +166,7 @@ public class PythonGateway {
         return taskDefinitionService.genTaskCodeList(genNum);
     }
 
-    public Map<String, Long> getCodeAndVersion(String projectName, String taskName) throws CodeGenerateUtils.CodeGenerateException {
+    public Map<String, Long> getCodeAndVersion(String projectName, String processDefinitionName, String taskName) throws CodeGenerateUtils.CodeGenerateException {
         Project project = projectMapper.queryByName(projectName);
         Map<String, Long> result = new HashMap<>();
         // project do not exists, mean task not exists too, so we should directly return init value
@@ -175,7 +175,15 @@ public class PythonGateway {
             result.put("version", 0L);
             return result;
         }
-        TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName);
+
+        ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
+        if (processDefinition == null) {
+            String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
+            logger.error(msg);
+            throw new IllegalArgumentException(msg);
+        }
+
+        TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskName);
         if (taskDefinition == null) {
             result.put("code", CodeGenerateUtils.getInstance().genCode());
             result.put("version", 0L);
@@ -520,7 +528,7 @@ public class PythonGateway {
         result.put("processDefinitionCode", processDefinition.getCode());
 
         if (taskName != null) {
-            TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, taskName);
+            TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, processDefinition.getCode(), taskName);
             result.put("taskDefinitionCode", taskDefinition.getCode());
         }
         return result;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
index 5d2ba85b02..a715b68e97 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
@@ -60,10 +60,12 @@ public interface TaskDefinitionService {
      *
      * @param loginUser login user
      * @param projectCode project code
+     * @param processCode process code
      * @param taskName task name
      */
     Map<String, Object> queryTaskDefinitionByName(User loginUser,
                                                   long projectCode,
+                                                  long processCode,
                                                   String taskName);
 
     /**
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index cd617958e0..15105acfe8 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -270,10 +270,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
      *
      * @param loginUser login user
      * @param projectCode project code
+     * @param processCode process code
      * @param taskName task name
      */
     @Override
-    public Map<String, Object> queryTaskDefinitionByName(User loginUser, long projectCode, String taskName) {
+    public Map<String, Object> queryTaskDefinitionByName(User loginUser, long projectCode, long processCode, String taskName) {
         Project project = projectMapper.queryByCode(projectCode);
         //check user access for project
         Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
@@ -281,7 +282,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
             return result;
         }
 
-        TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName);
+        TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName);
         if (taskDefinition == null) {
             putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskName);
         } else {
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
new file mode 100644
index 0000000000..7d8b6efabc
--- /dev/null
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.python;
+
+import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * python gate test
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class PythonGatewayTest {
+
+    @InjectMocks
+    private PythonGateway pythonGateway;
+
+    @Mock
+    private ProjectMapper projectMapper;
+
+    @Mock
+    private ProcessDefinitionMapper processDefinitionMapper;
+
+    @Mock
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    @Test
+    public void testGetCodeAndVersion() throws CodeGenerateUtils.CodeGenerateException {
+        Project project = getTestProject();
+        Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project);
+
+        ProcessDefinition processDefinition = getTestProcessDefinition();
+        Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), processDefinition.getName())).thenReturn(processDefinition);
+
+        TaskDefinition taskDefinition = getTestTaskDefinition();
+        Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskDefinition.getName())).thenReturn(taskDefinition);
+
+        Map<String, Long> result = pythonGateway.getCodeAndVersion(project.getName(), processDefinition.getName(), taskDefinition.getName());
+        Assert.assertEquals(result.get("code").longValue(), taskDefinition.getCode());
+    }
+
+    @Test
+    public void testGetDependentInfo() {
+        Project project = getTestProject();
+        Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project);
+
+        ProcessDefinition processDefinition = getTestProcessDefinition();
+        Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), processDefinition.getName())).thenReturn(processDefinition);
+
+        TaskDefinition taskDefinition = getTestTaskDefinition();
+        Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskDefinition.getName())).thenReturn(taskDefinition);
+
+        Map<String, Object> result = pythonGateway.getDependentInfo(project.getName(), processDefinition.getName(), taskDefinition.getName());
+        Assert.assertEquals((long) result.get("taskDefinitionCode"), taskDefinition.getCode());
+    }
+
+    private Project getTestProject() {
+        Project project = new Project();
+        project.setName("ut-project");
+        project.setUserId(111);
+        project.setCode(1L);
+        project.setCreateTime(new Date());
+        project.setUpdateTime(new Date());
+        return project;
+    }
+
+    private ProcessDefinition getTestProcessDefinition() {
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        processDefinition.setCode(1L);
+        processDefinition.setName("ut-process-definition");
+        processDefinition.setProjectCode(1L);
+        processDefinition.setUserId(111);
+        processDefinition.setUpdateTime(new Date());
+        processDefinition.setCreateTime(new Date());
+        return processDefinition;
+    }
+
+    private TaskDefinition getTestTaskDefinition() {
+        TaskDefinition taskDefinition = new TaskDefinition();
+        taskDefinition.setCode(888888L);
+        taskDefinition.setName("ut-task-definition");
+        taskDefinition.setProjectCode(1L);
+        taskDefinition.setTaskType("SHELL");
+        taskDefinition.setUserId(111);
+        taskDefinition.setResourceIds("1");
+        taskDefinition.setWorkerGroup("default");
+        taskDefinition.setEnvironmentCode(1L);
+        taskDefinition.setVersion(1);
+        taskDefinition.setCreateTime(new Date());
+        taskDefinition.setUpdateTime(new Date());
+        return taskDefinition;
+    }
+
+}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index 9aade5555c..676688a3ec 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -145,7 +145,7 @@ public class TaskDefinitionServiceImplTest {
     public void queryTaskDefinitionByName() {
         String taskName = "task";
         long projectCode = 1L;
-
+        long processCode = 1L;
         Project project = getProject(projectCode);
         Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
 
@@ -157,11 +157,11 @@ public class TaskDefinitionServiceImplTest {
         putMsg(result, Status.SUCCESS, projectCode);
         Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
 
-        Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), taskName))
+        Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName))
             .thenReturn(new TaskDefinition());
 
         Map<String, Object> relation = taskDefinitionService
-            .queryTaskDefinitionByName(loginUser, projectCode, taskName);
+            .queryTaskDefinitionByName(loginUser, projectCode, processCode, taskName);
 
         Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
     }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
index 304e623a0a..c426da6b04 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
@@ -41,10 +41,12 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
      * query task definition by name
      *
      * @param projectCode projectCode
+     * @param processCode processCode
      * @param name name
      * @return task definition
      */
     TaskDefinition queryByName(@Param("projectCode") long projectCode,
+                               @Param("processCode") long processCode,
                                @Param("name") String name);
 
     /**
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
index 5c889d1b07..255be38b2f 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
@@ -23,12 +23,24 @@
         worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
         resource_ids, create_time, update_time, task_group_id,task_group_priority
     </sql>
+    <sql id="baseSqlV2">
+        ${alias}.id, ${alias}.code, ${alias}.name, ${alias}.version, ${alias}.description, ${alias}.project_code, ${alias}.user_id,
+        ${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code,
+        ${alias}.fail_retry_times, ${alias}.fail_retry_interval, ${alias}.timeout_flag, ${alias}.timeout_notify_strategy, ${alias}.timeout,
+        ${alias}.delay_time, ${alias}.resource_ids, ${alias}.create_time, ${alias}.update_time, ${alias}.task_group_id,
+        ${alias}.task_group_priority, ${alias}.cpu_quota, ${alias}.memory_max
+    </sql>
     <select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
         select
-        <include refid="baseSql"/>
-        from t_ds_task_definition
-        WHERE project_code = #{projectCode}
-        and name = #{name}
+        <include refid="baseSqlV2">
+            <property name="alias" value="td"/>
+        </include>
+        from t_ds_task_definition td
+        join t_ds_process_task_relation ptr on ptr.project_code = td.project_code
+        where td.project_code = #{projectCode}
+        and td.name = #{name}
+        and ptr.process_definition_code = #{processCode}
+        and td.code = ptr.post_task_code
     </select>
     <select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
         select
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
index 757f751dd2..3d16e2e44d 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.dao.mapper;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.BaseDaoTest;
 import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.User;
 
@@ -36,6 +37,9 @@ public class TaskDefinitionMapperTest extends BaseDaoTest {
     @Autowired
     private TaskDefinitionMapper taskDefinitionMapper;
 
+    @Autowired
+    private ProcessTaskRelationMapper processTaskRelationMapper;
+
     @Autowired
     private UserMapper userMapper;
 
@@ -60,6 +64,24 @@ public class TaskDefinitionMapperTest extends BaseDaoTest {
         return taskDefinition;
     }
 
+    /**
+     * insert
+     *
+     * @return ProcessDefinition
+     */
+    private ProcessTaskRelation insertTaskRelation(long postTaskCode) {
+        ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
+        processTaskRelation.setName("def 1");
+        processTaskRelation.setProjectCode(1L);
+        processTaskRelation.setProcessDefinitionCode(1L);
+        processTaskRelation.setPostTaskCode(postTaskCode);
+        processTaskRelation.setPreTaskCode(0L);
+        processTaskRelation.setUpdateTime(new Date());
+        processTaskRelation.setCreateTime(new Date());
+        processTaskRelationMapper.insert(processTaskRelation);
+        return processTaskRelation;
+    }
+
     @Test
     public void testInsert() {
         TaskDefinition taskDefinition = insertOne();
@@ -69,7 +91,8 @@ public class TaskDefinitionMapperTest extends BaseDaoTest {
     @Test
     public void testQueryByDefinitionName() {
         TaskDefinition taskDefinition = insertOne();
-        TaskDefinition result = taskDefinitionMapper.queryByName(taskDefinition.getProjectCode()
+        ProcessTaskRelation processTaskRelation = insertTaskRelation(taskDefinition.getCode());
+        TaskDefinition result = taskDefinitionMapper.queryByName(taskDefinition.getProjectCode(), processTaskRelation.getProcessDefinitionCode()
                 , taskDefinition.getName());
 
         Assert.assertNotNull(result);
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index 599b979369..08a1cdf7e8 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -273,7 +273,7 @@ class Task(Base):
         # TODO get code from specific project process definition and task name
         gateway = launch_gateway()
         result = gateway.entry_point.getCodeAndVersion(
-            self.process_definition._project, self.name
+            self.process_definition._project, self.process_definition.name, self.name
         )
         # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
         # gateway_result_checker(result)