You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/03/24 07:24:09 UTC
[dolphinscheduler] branch dev updated: [python] Fix change exists pd attribute user error (#9140)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 5289b09 [python] Fix change exists pd attribute user error (#9140)
5289b09 is described below
commit 5289b09817396046489b28857ad54a85c5ffddb2
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Thu Mar 24 15:24:02 2022 +0800
[python] Fix change exists pd attribute user error (#9140)
* [python] Fix change exists pd attribute user error
* Remove attribute from class ProcessDefinition, cause it should to
object User instead of object ProcessDefinition.
* Grant project to user if attribute user change for exists ProcessDefinition
close: #8751
* Add py.test conftest.py for package integration
---
.../pydolphinscheduler/UPDATING.md | 1 +
.../pydolphinscheduler/pytest.ini | 5 +--
.../pydolphinscheduler/core/process_definition.py | 22 ++++------
.../src/pydolphinscheduler/side/project.py | 2 +-
.../src/pydolphinscheduler/side/user.py | 21 ++++++---
.../tests/core/test_process_definition.py | 3 --
.../{test_submit_examples.py => conftest.py} | 36 ++++++----------
.../tests/{ => integration}/test_java_gateway.py | 7 +--
.../tests/integration/test_process_definition.py | 50 ++++++++++++++++++++++
.../tests/integration/test_submit_examples.py | 19 +-------
.../server/PythonGatewayServer.java | 42 ++++++++++++++++--
11 files changed, 133 insertions(+), 75 deletions(-)
diff --git a/dolphinscheduler-python/pydolphinscheduler/UPDATING.md b/dolphinscheduler-python/pydolphinscheduler/UPDATING.md
index b5d69cd..cf45c09 100644
--- a/dolphinscheduler-python/pydolphinscheduler/UPDATING.md
+++ b/dolphinscheduler-python/pydolphinscheduler/UPDATING.md
@@ -24,5 +24,6 @@ It started after version 2.0.5 released
## dev
+* Remove parameter `queue` from class `ProcessDefinition` to avoid confuse user when it change but not work
* Change `yaml_parser.py` method `to_string` to magic method `__str__` make it more pythonic.
* Use package ``ruamel.yaml`` replace ``pyyaml`` for write yaml file with comment.
diff --git a/dolphinscheduler-python/pydolphinscheduler/pytest.ini b/dolphinscheduler-python/pydolphinscheduler/pytest.ini
index 9ed8ccf..b1aa850 100644
--- a/dolphinscheduler-python/pydolphinscheduler/pytest.ini
+++ b/dolphinscheduler-python/pydolphinscheduler/pytest.ini
@@ -14,11 +14,8 @@
# limitations under the License.
[pytest]
-# Do not test test_java_gateway.py due to we can not mock java gateway for now
-addopts = --ignore=tests/test_java_gateway.py
-
# add path here to skip pytest scan it
norecursedirs =
tests/testing
- # Integration test run seperated which do not calculate coverage
+ # Integration test run seperated which do not calculate coverage, it will run in `tox -e integrate-test`
tests/integration
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index dd2b83a..7615226 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -55,6 +55,14 @@ class ProcessDefinition(Base):
"""process definition object, will define process definition attribute, task, relation.
TODO: maybe we should rename this class, currently use DS object name.
+
+ :param user: The user for current process definition. Will create a new one if it do not exists. If your
+ parameter ``project`` already exists but project's create do not belongs to ``user``, will grant
+ ``project`` to ``user`` automatically.
+ :param project: The project for current process definition. You could see the workflow in this project
+ thought Web UI after it :func:`submit` or :func:`run`. It will create a new project belongs to
+ ``user`` if it does not exists. And when ``project`` exists but project's create do not belongs
+ to ``user``, will grant `project` to ``user`` automatically.
"""
# key attribute for identify ProcessDefinition object
@@ -91,7 +99,6 @@ class ProcessDefinition(Base):
user: Optional[str] = configuration.WORKFLOW_USER,
project: Optional[str] = configuration.WORKFLOW_PROJECT,
tenant: Optional[str] = configuration.WORKFLOW_TENANT,
- queue: Optional[str] = configuration.WORKFLOW_QUEUE,
worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
timeout: Optional[int] = 0,
release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
@@ -105,7 +112,6 @@ class ProcessDefinition(Base):
self._user = user
self._project = project
self._tenant = tenant
- self._queue = queue
self.worker_group = worker_group
self.timeout = timeout
self.release_state = release_state
@@ -148,15 +154,7 @@ class ProcessDefinition(Base):
For now we just get from python side but not from java gateway side, so it may not correct.
"""
- return User(
- self._user,
- configuration.USER_PASSWORD,
- configuration.USER_EMAIL,
- configuration.USER_PHONE,
- self._tenant,
- self._queue,
- configuration.USER_STATE,
- )
+ return User(name=self._user, tenant=self._tenant)
@staticmethod
def _parse_datetime(val: Any) -> Any:
@@ -331,8 +329,6 @@ class ProcessDefinition(Base):
:class:`pydolphinscheduler.constants.ProcessDefinitionDefault`.
"""
# TODO used metaclass for more pythonic
- self.tenant.create_if_not_exists(self._queue)
- # model User have to create after Tenant created
self.user.create_if_not_exists()
# Project model need User object exists
self.project.create_if_not_exists(self._user)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
index b568cb4..750e3b8 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
@@ -37,6 +37,6 @@ class Project(BaseSide):
def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
"""Create Project if not exists."""
gateway = launch_gateway()
- gateway.entry_point.createProject(user, self.name, self.description)
+ gateway.entry_point.createOrGrantProject(user, self.name, self.description)
# TODO recover result checker
# gateway_result_checker(result, None)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
index cd0145a..510e3a8 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
@@ -19,8 +19,10 @@
from typing import Optional
+from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base_side import BaseSide
from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.side.tenant import Tenant
class User(BaseSide):
@@ -39,12 +41,12 @@ class User(BaseSide):
def __init__(
self,
name: str,
- password: str,
- email: str,
- phone: str,
- tenant: str,
- queue: Optional[str] = None,
- status: Optional[int] = 1,
+ password: Optional[str] = configuration.USER_PASSWORD,
+ email: Optional[str] = configuration.USER_EMAIL,
+ phone: Optional[str] = configuration.USER_PHONE,
+ tenant: Optional[str] = configuration.WORKFLOW_TENANT,
+ queue: Optional[str] = configuration.WORKFLOW_QUEUE,
+ status: Optional[int] = configuration.USER_STATE,
):
super().__init__(name)
self.password = password
@@ -54,8 +56,15 @@ class User(BaseSide):
self.queue = queue
self.status = status
+ def create_tenant_if_not_exists(self) -> None:
+ """Create tenant object."""
+ tenant = Tenant(name=self.tenant, queue=self.queue)
+ tenant.create_if_not_exists(self.queue)
+
def create_if_not_exists(self, **kwargs):
"""Create User if not exists."""
+ # Should make sure queue already exists.
+ self.create_tenant_if_not_exists()
gateway = launch_gateway()
gateway.entry_point.createUser(
self.name,
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
index 655b7fd..e311be2 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
@@ -309,15 +309,12 @@ def test_process_definition_simple_separate():
"user_attrs",
[
{"tenant": "tenant_specific"},
- {"queue": "queue_specific"},
- {"tenant": "tenant_specific", "queue": "queue_specific"},
],
)
def test_set_process_definition_user_attr(user_attrs):
"""Test user with correct attributes if we specific assigned to process definition object."""
default_value = {
"tenant": configuration.WORKFLOW_TENANT,
- "queue": configuration.WORKFLOW_QUEUE,
}
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd:
user = pd.user
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py
similarity index 62%
copy from dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
copy to dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py
index 423c6c3..a9cd352 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py
@@ -15,20 +15,24 @@
# specific language governing permissions and limitations
# under the License.
-"""Test whether success submit examples DAG to PythonGatewayServer."""
-
-from pathlib import Path
+"""py.test conftest.py file for package integration test."""
import pytest
-from tests.testing.constants import ignore_exec_examples
from tests.testing.docker_wrapper import DockerWrapper
-from tests.testing.path import path_example
-@pytest.fixture(scope="module")
-def setup_docker():
- """Set up and teardown docker env for fixture."""
+@pytest.fixture(scope="package", autouse=True)
+def docker_setup_teardown():
+ """Fixture for whole package tests, Set up and teardown docker env.
+
+ Fixture in file named ``conftest.py`` with ``scope=package`` could be auto import in the
+ whole package, and with attribute ``autouse=True`` will be auto-use for each test cases.
+
+ .. seealso::
+ For more information about conftest.py see:
+ https://docs.pytest.org/en/latest/example/simple.html#package-directory-level-fixtures-setups
+ """
docker_wrapper = DockerWrapper(
image="apache/dolphinscheduler-standalone-server:ci",
container_name="ci-dolphinscheduler-standalone-server",
@@ -40,19 +44,3 @@ def setup_docker():
assert container is not None
yield
docker_wrapper.remove_container()
-
-
-@pytest.mark.parametrize(
- "example_path",
- [
- path
- for path in path_example.iterdir()
- if path.is_file() and path.stem not in ignore_exec_examples
- ],
-)
-def test_exec_white_list_example(setup_docker, example_path: Path):
- """Test execute examples and submit DAG to PythonGatewayServer."""
- try:
- exec(example_path.read_text())
- except Exception:
- raise Exception("Run example %s failed.", example_path.stem)
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_java_gateway.py
similarity index 90%
rename from dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py
rename to dolphinscheduler-python/pydolphinscheduler/tests/integration/test_java_gateway.py
index 3c8831e..8b7c5ff 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_java_gateway.py
@@ -31,9 +31,10 @@ def test_gateway_connect():
def test_jvm_simple():
"""Test use JVM build-in object and operator from java gateway."""
gateway = JavaGateway()
- smaller = gateway.jvm.java.lang.Integer.MIN_VALUE
- bigger = gateway.jvm.java.lang.Integer.MAX_VALUE
- assert bigger > smaller
+ smallest = gateway.jvm.java.lang.Integer.MIN_VALUE
+ biggest = gateway.jvm.java.lang.Integer.MAX_VALUE
+ assert smallest is not None and biggest is not None
+ assert biggest > smallest
def test_python_client_java_import_single():
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_process_definition.py
new file mode 100644
index 0000000..1672bde
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_process_definition.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test process definition in integration."""
+
+from typing import Dict
+
+import pytest
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.shell import Shell
+
+PROCESS_DEFINITION_NAME = "test_change_exists_attr_pd"
+TASK_NAME = f"task_{PROCESS_DEFINITION_NAME}"
+
+
+@pytest.mark.parametrize(
+ "pre, post",
+ [
+ (
+ {
+ "user": "pre_user",
+ },
+ {
+ "user": "post_user",
+ },
+ )
+ ],
+)
+def test_change_process_definition_attr(pre: Dict, post: Dict):
+ """Test whether process definition success when specific attribute change."""
+ assert pre.keys() == post.keys(), "Not equal keys for pre and post attribute."
+ for attrs in [pre, post]:
+ with ProcessDefinition(name=PROCESS_DEFINITION_NAME, **attrs) as pd:
+ Shell(name=TASK_NAME, command="echo 1")
+ pd.submit()
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
index 423c6c3..0964e1b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
@@ -22,26 +22,9 @@ from pathlib import Path
import pytest
from tests.testing.constants import ignore_exec_examples
-from tests.testing.docker_wrapper import DockerWrapper
from tests.testing.path import path_example
-@pytest.fixture(scope="module")
-def setup_docker():
- """Set up and teardown docker env for fixture."""
- docker_wrapper = DockerWrapper(
- image="apache/dolphinscheduler-standalone-server:ci",
- container_name="ci-dolphinscheduler-standalone-server",
- )
- ports = {"25333/tcp": 25333}
- container = docker_wrapper.run_until_log(
- log="Started StandaloneServer in", tty=True, ports=ports
- )
- assert container is not None
- yield
- docker_wrapper.remove_container()
-
-
@pytest.mark.parametrize(
"example_path",
[
@@ -50,7 +33,7 @@ def setup_docker():
if path.is_file() and path.stem not in ignore_exec_examples
],
)
-def test_exec_white_list_example(setup_docker, example_path: Path):
+def test_exec_white_list_example(example_path: Path):
"""Test execute examples and submit DAG to PythonGatewayServer."""
try:
exec(example_path.read_text())
diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
index 9fa1366..218cf61 100644
--- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
+++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -44,6 +44,7 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -52,6 +53,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectUserMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.server.config.PythonGatewayConfig;
@@ -59,6 +61,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -142,6 +145,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
@Autowired
private PythonGatewayConfig pythonGatewayConfig;
+ @Autowired
+ private ProjectUserMapper projectUserMapper;
+
@Value("${spring.jackson.time-zone:UTC}")
private String timezone;
@@ -231,8 +237,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
String taskDefinitionJson,
ProcessExecutionTypeEnum executionType) {
User user = usersService.queryUser(userName);
- Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST);
+ Project project = projectMapper.queryByName(projectName);
long projectCode = project.getCode();
+
ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name);
long processDefinitionCode;
// create or update process definition
@@ -349,9 +356,38 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
}
// side object
- public Map<String, Object> createProject(String userName, String name, String desc) {
+ /*
+ Grant project's permission to user. Use when project's created user not current but
+ Python API use it to change process definition.
+ */
+ private Integer grantProjectToUser(Project project, User user) {
+ Date now = new Date();
+ ProjectUser projectUser = new ProjectUser();
+ projectUser.setUserId(user.getId());
+ projectUser.setProjectId(project.getId());
+ projectUser.setPerm(Constants.AUTHORIZE_WRITABLE_PERM);
+ projectUser.setCreateTime(now);
+ projectUser.setUpdateTime(now);
+ return projectUserMapper.insert(projectUser);
+ }
+
+ /*
+ Grant or create project. Create a new project if project do not exists, and grant the project
+ permission to user if project exists but without permission to this user.
+ */
+ public void createOrGrantProject(String userName, String name, String desc) {
User user = usersService.queryUser(userName);
- return projectService.createProject(user, name, desc);
+
+ Project project;
+ project = projectMapper.queryByName(name);
+ if (project == null) {
+ projectService.createProject(user, name, desc);
+ } else if (project.getUserId() != user.getId()) {
+ ProjectUser projectUser = projectUserMapper.queryProjectRelation(project.getId(), user.getId());
+ if (projectUser == null) {
+ grantProjectToUser(project, user);
+ }
+ }
}
public Map<String, Object> createQueue(String name, String queueName) {