You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zh...@apache.org on 2022/06/23 15:00:11 UTC
[dolphinscheduler] branch dev updated: [python] Support using full-name definitions for resources (#10551)
This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 7970573365 [python] Support using full-name definitions for resources (#10551)
7970573365 is described below
commit 797057336529d3077c42ec689416c08c2e8d14cf
Author: 陈家名 <13...@163.com>
AuthorDate: Thu Jun 23 22:59:59 2022 +0800
[python] Support using full-name definitions for resources (#10551)
---
.../dolphinscheduler/api/python/PythonGateway.java | 23 +++++++++++
.../api/python/PythonGatewayTest.java | 44 ++++++++++++++++++++++
.../src/pydolphinscheduler/constants.py | 6 +++
.../pydolphinscheduler/core/process_definition.py | 1 +
.../src/pydolphinscheduler/core/task.py | 26 ++++++++++++-
.../pydolphinscheduler/tests/core/test_task.py | 39 ++++++++++++++++++-
6 files changed, 136 insertions(+), 3 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index e6400e79fa..eaca6707d4 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -59,6 +59,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Queue;
+import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.Tenant;
@@ -557,6 +558,28 @@ public class PythonGateway {
return result;
}
+ /**
+ * Get resource by given resource type and full name. It return map contain resource id, name.
+ * Useful in Python API create task which need processDefinition information.
+ *
+ * @param userName user who query resource
+ * @param fullName full name of the resource
+ */
+ public Map<String, Object> queryResourcesFileInfo(String userName, String fullName) {
+ Map<String, Object> result = new HashMap<>();
+ User user = usersService.queryUser(userName);
+ Result<Object> resourceResponse = resourceService.queryResource(user, fullName, null, ResourceType.FILE);
+ if (resourceResponse.getCode() != Status.SUCCESS.getCode()) {
+ String msg = String.format("Can not find valid resource by name %s", fullName);
+ logger.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ Resource resource = (Resource) resourceResponse.getData();
+ result.put("id", resource.getId());
+ result.put("name", resource.getFullName());
+ return result;
+ }
+
@PostConstruct
public void init() {
if (pythonGatewayConfiguration.getEnabled()) {
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
index 7d8b6efabc..34ff2b6753 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java
@@ -17,13 +17,20 @@
package org.apache.dolphinscheduler.api.python;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.ResourcesService;
+import org.apache.dolphinscheduler.api.service.UsersService;
+import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -53,6 +60,12 @@ public class PythonGatewayTest {
@Mock
private TaskDefinitionMapper taskDefinitionMapper;
+ @Mock
+ private ResourcesService resourcesService;
+
+ @Mock
+ private UsersService usersService;
+
@Test
public void testGetCodeAndVersion() throws CodeGenerateUtils.CodeGenerateException {
Project project = getTestProject();
@@ -83,6 +96,37 @@ public class PythonGatewayTest {
Assert.assertEquals((long) result.get("taskDefinitionCode"), taskDefinition.getCode());
}
+
+ @Test
+ public void testQueryResourcesFileInfo() {
+ User user = getTestUser();
+ Mockito.when(usersService.queryUser(user.getUserName())).thenReturn(user);
+
+ Result<Object> mockResult = new Result<>();
+ mockResult.setCode(Status.SUCCESS.getCode());
+ Resource resource = getTestResource();
+ mockResult.setData(resource);
+ Mockito.when(resourcesService.queryResource(user, resource.getFullName(), null, ResourceType.FILE)).thenReturn(mockResult);
+
+ Map<String, Object> result = pythonGateway.queryResourcesFileInfo(user.getUserName(), resource.getFullName());
+ Assert.assertEquals((int) result.get("id"), resource.getId());
+ }
+
+ private Resource getTestResource() {
+ Resource resource = new Resource();
+ resource.setId(1);
+ resource.setType(ResourceType.FILE);
+ resource.setFullName("/dev/test.py");
+ return resource;
+ }
+
+ private User getTestUser() {
+ User user = new User();
+ user.setId(1);
+ user.setUserName("ut-user");
+ return user;
+ }
+
private Project getTestProject() {
Project project = new Project();
project.setName("ut-project");
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
index 262469c88f..a5089ac165 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
@@ -100,3 +100,9 @@ class Time(str):
FMT_STD_TIME = "%H:%M:%S"
FMT_NO_COLON_TIME = "%H%M%S"
+
+
+class ResourceKey(str):
+ """Constants for key of resource."""
+
+ ID = "id"
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index cef01706df..5f34b6db60 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -405,6 +405,7 @@ class ProcessDefinition(Base):
json.dumps(self.task_relation_json),
json.dumps(self.task_definition_json),
None,
+ None,
)
return self._process_definition_code
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
index 08a1cdf7e8..90c0e89087 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py
@@ -22,6 +22,7 @@ from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
from pydolphinscheduler.constants import (
Delimiter,
+ ResourceKey,
TaskFlag,
TaskPriority,
TaskTimeoutFlag,
@@ -155,7 +156,7 @@ class Task(Base):
# Attribute for task param
self.local_params = local_params or []
- self.resource_list = resource_list or []
+ self._resource_list = resource_list or []
self.dependence = dependence or {}
self.wait_start_timeout = wait_start_timeout or {}
self._condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
@@ -170,6 +171,22 @@ class Task(Base):
"""Set attribute process_definition."""
self._process_definition = process_definition
+ @property
+ def resource_list(self) -> List:
+ """Get task define attribute `resource_list`."""
+ resources = set()
+ for resource in self._resource_list:
+ if type(resource) == str:
+ resources.add(self.query_resource(resource).get(ResourceKey.ID))
+ elif type(resource) == dict and resource.get(ResourceKey.ID) is not None:
+ logger.warning(
+ """`resource_list` should be defined using List[str] with resource paths,
+ the use of ids to define resources will be remove in version 3.2.0.
+ """
+ )
+ resources.add(resource.get(ResourceKey.ID))
+ return [{ResourceKey.ID: r} for r in resources]
+
@property
def condition_result(self) -> Dict:
"""Get attribute condition_result."""
@@ -278,3 +295,10 @@ class Task(Base):
# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
# gateway_result_checker(result)
return result.get("code"), result.get("version")
+
+ def query_resource(self, full_name):
+ """Get resource info from java gateway, contains resource id, name."""
+ gateway = launch_gateway()
+ return gateway.entry_point.queryResourcesFileInfo(
+ self.process_definition.user.name, full_name
+ )
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
index 7d4bbebdd3..65555c1eb5 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py
@@ -54,7 +54,7 @@ TEST_TASK_RELATION_SIZE = 0
},
{
"localParams": ["foo", "bar"],
- "resourceList": ["foo", "bar"],
+ "resourceList": [{"id": 1}],
"dependence": {"foo", "bar"},
"waitStartTimeout": {"foo", "bar"},
"conditionResult": {"foo": ["bar"]},
@@ -62,7 +62,11 @@ TEST_TASK_RELATION_SIZE = 0
),
],
)
-def test_property_task_params(attr, expect):
+@patch(
+ "pydolphinscheduler.core.task.Task.query_resource",
+ return_value=({"id": 1, "name": "foo"}),
+)
+def test_property_task_params(mock_resource, attr, expect):
"""Test class task property."""
task = testTask(
"test-property-task-params",
@@ -241,3 +245,34 @@ def test_add_duplicate(caplog):
re.findall("already in process definition", caplog.text),
]
)
+
+
+@pytest.mark.parametrize(
+ "resources, expect",
+ [
+ (
+ ["/dev/test.py"],
+ [{"id": 1}],
+ ),
+ (
+ ["/dev/test.py", {"id": 2}],
+ [{"id": 1}, {"id": 2}],
+ ),
+ ],
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.gen_code_and_version",
+ return_value=(123, 1),
+)
+@patch(
+ "pydolphinscheduler.core.task.Task.query_resource",
+ return_value=({"id": 1, "name": "/dev/test.py"}),
+)
+def test_python_resource_list(mock_code_version, mock_resource, resources, expect):
+ """Test python task resource list."""
+ task = Task(
+ name="python_resource_list.",
+ task_type="PYTHON",
+ resource_list=resources,
+ )
+ assert task.resource_list == expect