You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/07/14 14:06:16 UTC

[dolphinscheduler] branch dev updated: [python] Fix tasks with multiple upstream and workflow query error (#10941)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 81930e5420 [python] Fix tasks with multiple upstream and workflow query error (#10941)
81930e5420 is described below

commit 81930e54208f7c0f305c0b9f8846149bfc38f428
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Thu Jul 14 22:06:07 2022 +0800

    [python] Fix tasks with multiple upstream and workflow query error (#10941)
    
    * when task with more than one upstreams, mapper
       TaskDefinitionMapper method queryByName will return
       more than one record, and failed the mybatis result
       type, so we have to add `limit 1` to it to
    * add multiple runs of example in integrate test
    * Change from subprocess.Popen to subprocess.call_check
      in integrating test which will raise an error when failed
---
 .github/workflows/py-ci.yml                        |  4 +--
 .../dolphinscheduler/api/python/PythonGateway.java |  7 +++---
 .../api/service/impl/QueueServiceImpl.java         | 12 ++++-----
 .../api/service/impl/TenantServiceImpl.java        |  4 +--
 .../api/service/QueueServiceTest.java              | 15 +++++++++++
 .../api/service/TenantServiceTest.java             | 29 ++++++++++++++++++++++
 .../dao/mapper/TaskDefinitionMapper.xml            |  1 +
 .../pydolphinscheduler/core/process_definition.py  |  4 +--
 .../tests/integration/test_submit_examples.py      | 22 ++++++++++++----
 9 files changed, 78 insertions(+), 20 deletions(-)

diff --git a/.github/workflows/py-ci.yml b/.github/workflows/py-ci.yml
index 0b1304d0ef..fdc55bac10 100644
--- a/.github/workflows/py-ci.yml
+++ b/.github/workflows/py-ci.yml
@@ -51,7 +51,7 @@ jobs:
             not-docs:
               - '!(docs/**)'
             py-change:
-              - 'dolphinscheduler-python/pydolphinscheduler'
+              - 'dolphinscheduler-python/pydolphinscheduler/**'
   lint:
     name: Lint
     if: ${{ (needs.paths-filter.outputs.py-change == 'true') || (github.event_name == 'push') }}
@@ -165,7 +165,7 @@ jobs:
       - name: Install Dependences
         run: |
           python -m pip install --upgrade ${{ env.DEPENDENCES }}
-      - name: Run Tests Build Docs
+      - name: Run Integrate Tests
         run: |
           python -m tox -vv -e integrate-test
   result:
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index ca8a107d9d..e3aecec9be 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -171,10 +171,11 @@ public class PythonGateway {
         }
 
         ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
+        // In the case project exists, but current process definition still not created, we should also return the init version of it
         if (processDefinition == null) {
-            String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
-            logger.error(msg);
-            throw new IllegalArgumentException(msg);
+            result.put("code", CodeGenerateUtils.getInstance().genCode());
+            result.put("version", 0L);
+            return result;
         }
 
         TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskName);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
index b8e47a4145..9a1d1c1978 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
@@ -282,14 +282,14 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
      */
     @Override
     public Queue createQueueIfNotExists(String queue, String queueName) {
-        Queue queueObj = new Queue(queueName, queue);
-        createQueueValid(queueObj);
         Queue existsQueue = queueMapper.queryQueueName(queue, queueName);
-        if (Objects.isNull(existsQueue)) {
-            queueMapper.insert(queueObj);
-            return queueObj;
+        if (!Objects.isNull(existsQueue)) {
+            return existsQueue;
         }
-        return existsQueue;
+        Queue queueObj = new Queue(queueName, queue);
+        createQueueValid(queueObj);
+        queueMapper.insert(queueObj);
+        return queueObj;
     }
 
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
index 6e7cfecb83..f07966af76 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
@@ -366,8 +366,8 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
             return tenantMapper.queryByTenantCode(tenantCode);
         }
 
-        Queue newQueue = queueService.createQueueIfNotExists(queue, queueName);
-        Tenant tenant = new Tenant(tenantCode, desc, newQueue.getId());
+        Queue queueObj = queueService.createQueueIfNotExists(queue, queueName);
+        Tenant tenant = new Tenant(tenantCode, desc, queueObj.getId());
         createTenantValid(tenant);
         tenantMapper.insert(tenant);
         return tenant;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
index f2001f314f..945a30626c 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
@@ -215,6 +215,21 @@ public class QueueServiceTest {
         Assert.assertEquals(result.getCode().intValue(), Status.SUCCESS.getCode());
     }
 
+    @Test
+    public void testCreateQueueIfNotExists() {
+        Queue queue;
+
+        // queue exists
+        Mockito.when(queueMapper.queryQueueName(QUEUE, QUEUE_NAME)).thenReturn(getQUEUE());
+        queue = queueService.createQueueIfNotExists(QUEUE, QUEUE_NAME);
+        Assert.assertEquals(getQUEUE(), queue);
+
+        // queue not exists
+        Mockito.when(queueMapper.queryQueueName(QUEUE, QUEUE_NAME)).thenReturn(null);
+        queue = queueService.createQueueIfNotExists(QUEUE, QUEUE_NAME);
+        Assert.assertEquals(new Queue(QUEUE_NAME, QUEUE), queue);
+    }
+
     /**
      * create admin user
      */
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
index 0229db3c02..62fa91d2c5 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
@@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Queue;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
@@ -77,6 +78,9 @@ public class TenantServiceTest {
     @InjectMocks
     private TenantServiceImpl tenantService;
 
+    @Mock
+    private QueueService queueService;
+
     @Mock
     private TenantMapper tenantMapper;
 
@@ -94,6 +98,8 @@ public class TenantServiceTest {
 
     private static final String tenantCode = "hayden";
     private static final String tenantDesc = "This is the tenant desc";
+    private static final String queue = "queue";
+    private static final String queueName = "queue_name";
 
     @Test
     public void testCreateTenant() throws Exception {
@@ -229,6 +235,23 @@ public class TenantServiceTest {
         Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
     }
 
+    @Test
+    public void testCreateTenantIfNotExists() {
+        Tenant tenant;
+
+        // Tenant exists
+        Mockito.when(tenantMapper.existTenant(tenantCode)).thenReturn(true);
+        Mockito.when(tenantMapper.queryByTenantCode(tenantCode)).thenReturn(getTenant());
+        tenant = tenantService.createTenantIfNotExists(tenantCode, tenantDesc, queue, queueName);
+        Assert.assertEquals(getTenant(), tenant);
+
+        // Tenant not exists
+        Mockito.when(tenantMapper.existTenant(tenantCode)).thenReturn(false);
+        Mockito.when(queueService.createQueueIfNotExists(queue, queueName)).thenReturn(getQueue());
+        tenant = tenantService.createTenantIfNotExists(tenantCode, tenantDesc, queue, queueName);
+        Assert.assertEquals(new Tenant(tenantCode, tenantDesc, getQueue().getId()), tenant);
+    }
+
     /**
      * get user
      */
@@ -284,4 +307,10 @@ public class TenantServiceTest {
         return processDefinitions;
     }
 
+    private Queue getQueue() {
+        Queue queue = new Queue();
+        queue.setId(1);
+        return queue;
+    }
+
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
index b8c49faa3a..4417e6e463 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
@@ -41,6 +41,7 @@
         and td.name = #{name}
         and ptr.process_definition_code = #{processCode}
         and td.code = ptr.post_task_code
+        limit 1
     </select>
     <select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
         select
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index a8cf875785..dd18dafa0f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -64,8 +64,8 @@ class ProcessDefinition(Base):
         ``user`` if it does not exists. And when ``project`` exists but project's create do not belongs
         to ``user``, will grant `project` to ``user`` automatically.
     :param resource_list: Resource files required by the current process definition.You can create and modify
-        resource files from this field. When the process definition is submitted, these resource files are also
-        submitted along with it.
+        resource files from this field. When the process definition is submitted, these resource files are
+        also submitted along with it.
     """
 
     # key attribute for identify ProcessDefinition object
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
index 218fa4a55c..393b0cc99a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
@@ -17,8 +17,8 @@
 
 """Test whether success submit examples DAG to PythonGatewayService."""
 
+import subprocess
 from pathlib import Path
-from subprocess import Popen
 
 import pytest
 
@@ -38,7 +38,19 @@ def test_exec_white_list_example(example_path: Path):
     """Test execute examples and submit DAG to PythonGatewayService."""
     try:
         # Because our task decorator used module ``inspect`` to get the source, and it will
-        # raise IOError when call it by built-in function ``exec``, so we change to ``subprocess.Popen``
-        Popen(["python", str(example_path)])
-    except Exception:
-        raise Exception("Run example %s failed.", example_path.stem)
+        # raise IOError when call it by built-in function ``exec``, so we change to ``subprocess.check_call``
+        subprocess.check_call(["python", str(example_path)])
+    except subprocess.CalledProcessError:
+        raise RuntimeError("Run example %s failed.", example_path.stem)
+
+
+def test_exec_multiple_times():
+    """Test whether process definition can be executed more than one times."""
+    tutorial_path = path_example.joinpath("tutorial.py")
+    time = 0
+    while time < 3:
+        try:
+            subprocess.check_call(["python", str(tutorial_path)])
+        except subprocess.CalledProcessError:
+            raise RuntimeError("Run example %s failed.", tutorial_path.stem)
+        time += 1