You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/07/14 14:06:16 UTC
[dolphinscheduler] branch dev updated: [python] Fix tasks with multiple upstream and workflow query error (#10941)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 81930e5420 [python] Fix tasks with multiple upstream and workflow query error (#10941)
81930e5420 is described below
commit 81930e54208f7c0f305c0b9f8846149bfc38f428
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Thu Jul 14 22:06:07 2022 +0800
[python] Fix tasks with multiple upstream and workflow query error (#10941)
* when task with more than one upstreams, mapper
TaskDefinitionMapper method queryByName will return
more than one record, and failed the mybatis result
type, so we have to add `limit 1` to it to
* add multiple runs of example in integrate test
* Change from subprocess.Popen to subprocess.call_check
in integrating test which will raise an error when failed
---
.github/workflows/py-ci.yml | 4 +--
.../dolphinscheduler/api/python/PythonGateway.java | 7 +++---
.../api/service/impl/QueueServiceImpl.java | 12 ++++-----
.../api/service/impl/TenantServiceImpl.java | 4 +--
.../api/service/QueueServiceTest.java | 15 +++++++++++
.../api/service/TenantServiceTest.java | 29 ++++++++++++++++++++++
.../dao/mapper/TaskDefinitionMapper.xml | 1 +
.../pydolphinscheduler/core/process_definition.py | 4 +--
.../tests/integration/test_submit_examples.py | 22 ++++++++++++----
9 files changed, 78 insertions(+), 20 deletions(-)
diff --git a/.github/workflows/py-ci.yml b/.github/workflows/py-ci.yml
index 0b1304d0ef..fdc55bac10 100644
--- a/.github/workflows/py-ci.yml
+++ b/.github/workflows/py-ci.yml
@@ -51,7 +51,7 @@ jobs:
not-docs:
- '!(docs/**)'
py-change:
- - 'dolphinscheduler-python/pydolphinscheduler'
+ - 'dolphinscheduler-python/pydolphinscheduler/**'
lint:
name: Lint
if: ${{ (needs.paths-filter.outputs.py-change == 'true') || (github.event_name == 'push') }}
@@ -165,7 +165,7 @@ jobs:
- name: Install Dependences
run: |
python -m pip install --upgrade ${{ env.DEPENDENCES }}
- - name: Run Tests Build Docs
+ - name: Run Integrate Tests
run: |
python -m tox -vv -e integrate-test
result:
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index ca8a107d9d..e3aecec9be 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -171,10 +171,11 @@ public class PythonGateway {
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
+ // In the case project exists, but current process definition still not created, we should also return the init version of it
if (processDefinition == null) {
- String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
- logger.error(msg);
- throw new IllegalArgumentException(msg);
+ result.put("code", CodeGenerateUtils.getInstance().genCode());
+ result.put("version", 0L);
+ return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskName);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
index b8e47a4145..9a1d1c1978 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
@@ -282,14 +282,14 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
*/
@Override
public Queue createQueueIfNotExists(String queue, String queueName) {
- Queue queueObj = new Queue(queueName, queue);
- createQueueValid(queueObj);
Queue existsQueue = queueMapper.queryQueueName(queue, queueName);
- if (Objects.isNull(existsQueue)) {
- queueMapper.insert(queueObj);
- return queueObj;
+ if (!Objects.isNull(existsQueue)) {
+ return existsQueue;
}
- return existsQueue;
+ Queue queueObj = new Queue(queueName, queue);
+ createQueueValid(queueObj);
+ queueMapper.insert(queueObj);
+ return queueObj;
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
index 6e7cfecb83..f07966af76 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
@@ -366,8 +366,8 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
return tenantMapper.queryByTenantCode(tenantCode);
}
- Queue newQueue = queueService.createQueueIfNotExists(queue, queueName);
- Tenant tenant = new Tenant(tenantCode, desc, newQueue.getId());
+ Queue queueObj = queueService.createQueueIfNotExists(queue, queueName);
+ Tenant tenant = new Tenant(tenantCode, desc, queueObj.getId());
createTenantValid(tenant);
tenantMapper.insert(tenant);
return tenant;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
index f2001f314f..945a30626c 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
@@ -215,6 +215,21 @@ public class QueueServiceTest {
Assert.assertEquals(result.getCode().intValue(), Status.SUCCESS.getCode());
}
+ @Test
+ public void testCreateQueueIfNotExists() {
+ Queue queue;
+
+ // queue exists
+ Mockito.when(queueMapper.queryQueueName(QUEUE, QUEUE_NAME)).thenReturn(getQUEUE());
+ queue = queueService.createQueueIfNotExists(QUEUE, QUEUE_NAME);
+ Assert.assertEquals(getQUEUE(), queue);
+
+ // queue not exists
+ Mockito.when(queueMapper.queryQueueName(QUEUE, QUEUE_NAME)).thenReturn(null);
+ queue = queueService.createQueueIfNotExists(QUEUE, QUEUE_NAME);
+ Assert.assertEquals(new Queue(QUEUE_NAME, QUEUE), queue);
+ }
+
/**
* create admin user
*/
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
index 0229db3c02..62fa91d2c5 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
@@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
@@ -77,6 +78,9 @@ public class TenantServiceTest {
@InjectMocks
private TenantServiceImpl tenantService;
+ @Mock
+ private QueueService queueService;
+
@Mock
private TenantMapper tenantMapper;
@@ -94,6 +98,8 @@ public class TenantServiceTest {
private static final String tenantCode = "hayden";
private static final String tenantDesc = "This is the tenant desc";
+ private static final String queue = "queue";
+ private static final String queueName = "queue_name";
@Test
public void testCreateTenant() throws Exception {
@@ -229,6 +235,23 @@ public class TenantServiceTest {
Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
}
+ @Test
+ public void testCreateTenantIfNotExists() {
+ Tenant tenant;
+
+ // Tenant exists
+ Mockito.when(tenantMapper.existTenant(tenantCode)).thenReturn(true);
+ Mockito.when(tenantMapper.queryByTenantCode(tenantCode)).thenReturn(getTenant());
+ tenant = tenantService.createTenantIfNotExists(tenantCode, tenantDesc, queue, queueName);
+ Assert.assertEquals(getTenant(), tenant);
+
+ // Tenant not exists
+ Mockito.when(tenantMapper.existTenant(tenantCode)).thenReturn(false);
+ Mockito.when(queueService.createQueueIfNotExists(queue, queueName)).thenReturn(getQueue());
+ tenant = tenantService.createTenantIfNotExists(tenantCode, tenantDesc, queue, queueName);
+ Assert.assertEquals(new Tenant(tenantCode, tenantDesc, getQueue().getId()), tenant);
+ }
+
/**
* get user
*/
@@ -284,4 +307,10 @@ public class TenantServiceTest {
return processDefinitions;
}
+ private Queue getQueue() {
+ Queue queue = new Queue();
+ queue.setId(1);
+ return queue;
+ }
+
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
index b8c49faa3a..4417e6e463 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
@@ -41,6 +41,7 @@
and td.name = #{name}
and ptr.process_definition_code = #{processCode}
and td.code = ptr.post_task_code
+ limit 1
</select>
<select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index a8cf875785..dd18dafa0f 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -64,8 +64,8 @@ class ProcessDefinition(Base):
``user`` if it does not exists. And when ``project`` exists but project's create do not belongs
to ``user``, will grant `project` to ``user`` automatically.
:param resource_list: Resource files required by the current process definition.You can create and modify
- resource files from this field. When the process definition is submitted, these resource files are also
- submitted along with it.
+ resource files from this field. When the process definition is submitted, these resource files are
+ also submitted along with it.
"""
# key attribute for identify ProcessDefinition object
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
index 218fa4a55c..393b0cc99a 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
@@ -17,8 +17,8 @@
"""Test whether success submit examples DAG to PythonGatewayService."""
+import subprocess
from pathlib import Path
-from subprocess import Popen
import pytest
@@ -38,7 +38,19 @@ def test_exec_white_list_example(example_path: Path):
"""Test execute examples and submit DAG to PythonGatewayService."""
try:
# Because our task decorator used module ``inspect`` to get the source, and it will
- # raise IOError when call it by built-in function ``exec``, so we change to ``subprocess.Popen``
- Popen(["python", str(example_path)])
- except Exception:
- raise Exception("Run example %s failed.", example_path.stem)
+ # raise IOError when call it by built-in function ``exec``, so we change to ``subprocess.check_call``
+ subprocess.check_call(["python", str(example_path)])
+ except subprocess.CalledProcessError:
+ raise RuntimeError("Run example %s failed.", example_path.stem)
+
+
+def test_exec_multiple_times():
+ """Test whether process definition can be executed more than one times."""
+ tutorial_path = path_example.joinpath("tutorial.py")
+ time = 0
+ while time < 3:
+ try:
+ subprocess.check_call(["python", str(tutorial_path)])
+ except subprocess.CalledProcessError:
+ raise RuntimeError("Run example %s failed.", tutorial_path.stem)
+ time += 1