You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/06/23 15:00:11 UTC

[dolphinscheduler] branch dev updated: [python] Support using full-name definitions for resources (#10551)

This is an automated email from the ASF dual-hosted git repository.

zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7970573365 [python] Support using full-name definitions for resources (#10551)
7970573365 is described below

commit 797057336529d3077c42ec689416c08c2e8d14cf
Author: 陈家名 <13...@163.com>
AuthorDate: Thu Jun 23 22:59:59 2022 +0800

    [python] Support using full-name definitions for resources (#10551)
---
 .../dolphinscheduler/api/python/PythonGateway.java | 23 +++++++++++
 .../api/python/PythonGatewayTest.java              | 44 ++++++++++++++++++++++
 .../src/pydolphinscheduler/constants.py            |  6 +++
 .../pydolphinscheduler/core/process_definition.py  |  1 +
 .../src/pydolphinscheduler/core/task.py            | 26 ++++++++++++-
 .../pydolphinscheduler/tests/core/test_task.py     | 39 ++++++++++++++++++-
 6 files changed, 136 insertions(+), 3 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index e6400e79fa..eaca6707d4 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -59,6 +59,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.ProjectUser;
 import org.apache.dolphinscheduler.dao.entity.Queue;
+import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.Tenant;
@@ -557,6 +558,28 @@ public class PythonGateway {
         return result;
     }
 
+    /**
+     * Get resource by given resource type and full name. It return map contain resource id, name.
+     * Useful in Python API create task which need processDefinition information.
+     *
+     * @param userName user who query resource
+     * @param fullName full name of the resource
+     */
+    public Map<String, Object> queryResourcesFileInfo(String userName, String fullName) {
+        Map<String, Object> result = new HashMap<>();
+        User user = usersService.queryUser(userName);
+        Result<Object> resourceResponse = resourceService.queryResource(user, fullName, null, ResourceType.FILE);
+        if (resourceResponse.getCode() != Status.SUCCESS.getCode()) {
+            String msg = String.format("Can not find valid resource by name %s", fullName);
+            logger.error(msg);
+            throw new IllegalArgumentException(msg);
+        }
+        Resource resource = (Resource) resourceResponse.getData();
+        result.put("id", resource.getId());
+        result.put("name", resource.getFullName());
+        return result;
+    }
+
     @PostConstruct
     public void init() {
         if (pythonGatewayConfiguration.getEnabled()) {
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
index 7d8b6efabc..34ff2b6753 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
@@ -17,13 +17,20 @@
 
 package org.apache.dolphinscheduler.api.python;
 
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.ResourcesService;
+import org.apache.dolphinscheduler.api.service.UsersService;
+import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.Resource;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.spi.enums.ResourceType;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -53,6 +60,12 @@ public class PythonGatewayTest {
     @Mock
     private TaskDefinitionMapper taskDefinitionMapper;
 
+    @Mock
+    private ResourcesService resourcesService;
+
+    @Mock
+    private UsersService usersService;
+
     @Test
     public void testGetCodeAndVersion() throws CodeGenerateUtils.CodeGenerateException {
         Project project = getTestProject();
@@ -83,6 +96,37 @@ public class PythonGatewayTest {
         Assert.assertEquals((long) result.get("taskDefinitionCode"), taskDefinition.getCode());
     }
 
+
+    @Test
+    public void testQueryResourcesFileInfo() {
+        User user = getTestUser();
+        Mockito.when(usersService.queryUser(user.getUserName())).thenReturn(user);
+
+        Result<Object> mockResult = new Result<>();
+        mockResult.setCode(Status.SUCCESS.getCode());
+        Resource resource = getTestResource();
+        mockResult.setData(resource);
+        Mockito.when(resourcesService.queryResource(user, resource.getFullName(), null, ResourceType.FILE)).thenReturn(mockResult);
+
+        Map<String, Object> result = pythonGateway.queryResourcesFileInfo(user.getUserName(), resource.getFullName());
+        Assert.assertEquals((int) result.get("id"), resource.getId());
+    }
+
+    private Resource getTestResource() {
+        Resource resource = new Resource();
+        resource.setId(1);
+        resource.setType(ResourceType.FILE);
+        resource.setFullName("/dev/test.py");
+        return resource;
+    }
+
+    private User getTestUser() {
+        User user = new User();
+        user.setId(1);
+        user.setUserName("ut-user");
+        return user;
+    }
+
     private Project getTestProject() {
         Project project = new Project();
         project.setName("ut-project");
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 262469c88f..a5089ac165 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -100,3 +100,9 @@ class Time(str):
 
     FMT_STD_TIME = "%H:%M:%S"
     FMT_NO_COLON_TIME = "%H%M%S"
+
+
+class ResourceKey(str):
+    """Constants for key of resource."""
+
+    ID = "id"
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index cef01706df..5f34b6db60 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -405,6 +405,7 @@ class ProcessDefinition(Base):
             json.dumps(self.task_relation_json),
             json.dumps(self.task_definition_json),
             None,
+            None,
         )
         return self._process_definition_code
 
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index 08a1cdf7e8..90c0e89087 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -22,6 +22,7 @@ from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
 
 from pydolphinscheduler.constants import (
     Delimiter,
+    ResourceKey,
     TaskFlag,
     TaskPriority,
     TaskTimeoutFlag,
@@ -155,7 +156,7 @@ class Task(Base):
 
         # Attribute for task param
         self.local_params = local_params or []
-        self.resource_list = resource_list or []
+        self._resource_list = resource_list or []
         self.dependence = dependence or {}
         self.wait_start_timeout = wait_start_timeout or {}
         self._condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
@@ -170,6 +171,22 @@ class Task(Base):
         """Set attribute process_definition."""
         self._process_definition = process_definition
 
+    @property
+    def resource_list(self) -> List:
+        """Get task define attribute `resource_list`."""
+        resources = set()
+        for resource in self._resource_list:
+            if type(resource) == str:
+                resources.add(self.query_resource(resource).get(ResourceKey.ID))
+            elif type(resource) == dict and resource.get(ResourceKey.ID) is not None:
+                logger.warning(
+                    """`resource_list` should be defined using List[str] with resource paths,
+                       the use of ids to define resources will be remove in version 3.2.0.
+                    """
+                )
+                resources.add(resource.get(ResourceKey.ID))
+        return [{ResourceKey.ID: r} for r in resources]
+
     @property
     def condition_result(self) -> Dict:
         """Get attribute condition_result."""
@@ -278,3 +295,10 @@ class Task(Base):
         # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
         # gateway_result_checker(result)
         return result.get("code"), result.get("version")
+
+    def query_resource(self, full_name):
+        """Get resource info from java gateway, contains resource id, name."""
+        gateway = launch_gateway()
+        return gateway.entry_point.queryResourcesFileInfo(
+            self.process_definition.user.name, full_name
+        )
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
index 7d4bbebdd3..65555c1eb5 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
@@ -54,7 +54,7 @@ TEST_TASK_RELATION_SIZE = 0
             },
             {
                 "localParams": ["foo", "bar"],
-                "resourceList": ["foo", "bar"],
+                "resourceList": [{"id": 1}],
                 "dependence": {"foo", "bar"},
                 "waitStartTimeout": {"foo", "bar"},
                 "conditionResult": {"foo": ["bar"]},
@@ -62,7 +62,11 @@ TEST_TASK_RELATION_SIZE = 0
         ),
     ],
 )
-def test_property_task_params(attr, expect):
+@patch(
+    "pydolphinscheduler.core.task.Task.query_resource",
+    return_value=({"id": 1, "name": "foo"}),
+)
+def test_property_task_params(mock_resource, attr, expect):
     """Test class task property."""
     task = testTask(
         "test-property-task-params",
@@ -241,3 +245,34 @@ def test_add_duplicate(caplog):
                 re.findall("already in process definition", caplog.text),
             ]
         )
+
+
+@pytest.mark.parametrize(
+    "resources, expect",
+    [
+        (
+            ["/dev/test.py"],
+            [{"id": 1}],
+        ),
+        (
+            ["/dev/test.py", {"id": 2}],
+            [{"id": 1}, {"id": 2}],
+        ),
+    ],
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.gen_code_and_version",
+    return_value=(123, 1),
+)
+@patch(
+    "pydolphinscheduler.core.task.Task.query_resource",
+    return_value=({"id": 1, "name": "/dev/test.py"}),
+)
+def test_python_resource_list(mock_code_version, mock_resource, resources, expect):
+    """Test python task resource list."""
+    task = Task(
+        name="python_resource_list.",
+        task_type="PYTHON",
+        resource_list=resources,
+    )
+    assert task.resource_list == expect