You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/03/24 07:24:09 UTC

[dolphinscheduler] branch dev updated: [python] Fix change exists pd attribute user error (#9140)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5289b09  [python] Fix change exists pd attribute user error (#9140)
5289b09 is described below

commit 5289b09817396046489b28857ad54a85c5ffddb2
Author: Jiajie Zhong <zh...@hotmail.com>
AuthorDate: Thu Mar 24 15:24:02 2022 +0800

    [python] Fix change exists pd attribute user error (#9140)
    
    * [python] Fix change exists pd attribute user error
    
    * Remove attribute from class ProcessDefinition, cause it should to
      object User instead of object ProcessDefinition.
    * Grant project to user if attribute user change for exists ProcessDefinition
    
    close: #8751
    
    * Add py.test conftest.py for package integration
---
 .../pydolphinscheduler/UPDATING.md                 |  1 +
 .../pydolphinscheduler/pytest.ini                  |  5 +--
 .../pydolphinscheduler/core/process_definition.py  | 22 ++++------
 .../src/pydolphinscheduler/side/project.py         |  2 +-
 .../src/pydolphinscheduler/side/user.py            | 21 ++++++---
 .../tests/core/test_process_definition.py          |  3 --
 .../{test_submit_examples.py => conftest.py}       | 36 ++++++----------
 .../tests/{ => integration}/test_java_gateway.py   |  7 +--
 .../tests/integration/test_process_definition.py   | 50 ++++++++++++++++++++++
 .../tests/integration/test_submit_examples.py      | 19 +-------
 .../server/PythonGatewayServer.java                | 42 ++++++++++++++++--
 11 files changed, 133 insertions(+), 75 deletions(-)

diff --git a/dolphinscheduler-python/pydolphinscheduler/UPDATING.md b/dolphinscheduler-python/pydolphinscheduler/UPDATING.md
index b5d69cd..cf45c09 100644
--- a/dolphinscheduler-python/pydolphinscheduler/UPDATING.md
+++ b/dolphinscheduler-python/pydolphinscheduler/UPDATING.md
@@ -24,5 +24,6 @@ It started after version 2.0.5 released
 
 ## dev
 
+* Remove parameter `queue` from class `ProcessDefinition` to avoid confuse user when it change but not work
 * Change `yaml_parser.py` method `to_string` to magic method `__str__` make it more pythonic.
 * Use package ``ruamel.yaml`` replace ``pyyaml`` for write yaml file with comment.
diff --git a/dolphinscheduler-python/pydolphinscheduler/pytest.ini b/dolphinscheduler-python/pydolphinscheduler/pytest.ini
index 9ed8ccf..b1aa850 100644
--- a/dolphinscheduler-python/pydolphinscheduler/pytest.ini
+++ b/dolphinscheduler-python/pydolphinscheduler/pytest.ini
@@ -14,11 +14,8 @@
 # limitations under the License.
 
 [pytest]
-# Do not test test_java_gateway.py due to we can not mock java gateway for now
-addopts = --ignore=tests/test_java_gateway.py
-
 # add path here to skip pytest scan it
 norecursedirs =
     tests/testing
-    # Integration test run seperated which do not calculate coverage
+    # Integration test run seperated which do not calculate coverage, it will run in `tox -e integrate-test`
     tests/integration
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
index dd2b83a..7615226 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
@@ -55,6 +55,14 @@ class ProcessDefinition(Base):
     """process definition object, will define process definition attribute, task, relation.
 
     TODO: maybe we should rename this class, currently use DS object name.
+
+    :param user: The user for current process definition. Will create a new one if it do not exists. If your
+        parameter ``project`` already exists but project's create do not belongs to ``user``, will grant
+        ``project`` to ``user`` automatically.
+    :param project: The project for current process definition. You could see the workflow in this project
+        thought Web UI after it :func:`submit` or :func:`run`. It will create a new project belongs to
+        ``user`` if it does not exists. And when ``project`` exists but project's create do not belongs
+        to ``user``, will grant `project` to ``user`` automatically.
     """
 
     # key attribute for identify ProcessDefinition object
@@ -91,7 +99,6 @@ class ProcessDefinition(Base):
         user: Optional[str] = configuration.WORKFLOW_USER,
         project: Optional[str] = configuration.WORKFLOW_PROJECT,
         tenant: Optional[str] = configuration.WORKFLOW_TENANT,
-        queue: Optional[str] = configuration.WORKFLOW_QUEUE,
         worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
         timeout: Optional[int] = 0,
         release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
@@ -105,7 +112,6 @@ class ProcessDefinition(Base):
         self._user = user
         self._project = project
         self._tenant = tenant
-        self._queue = queue
         self.worker_group = worker_group
         self.timeout = timeout
         self.release_state = release_state
@@ -148,15 +154,7 @@ class ProcessDefinition(Base):
 
         For now we just get from python side but not from java gateway side, so it may not correct.
         """
-        return User(
-            self._user,
-            configuration.USER_PASSWORD,
-            configuration.USER_EMAIL,
-            configuration.USER_PHONE,
-            self._tenant,
-            self._queue,
-            configuration.USER_STATE,
-        )
+        return User(name=self._user, tenant=self._tenant)
 
     @staticmethod
     def _parse_datetime(val: Any) -> Any:
@@ -331,8 +329,6 @@ class ProcessDefinition(Base):
         :class:`pydolphinscheduler.constants.ProcessDefinitionDefault`.
         """
         # TODO used metaclass for more pythonic
-        self.tenant.create_if_not_exists(self._queue)
-        # model User have to create after Tenant created
         self.user.create_if_not_exists()
         # Project model need User object exists
         self.project.create_if_not_exists(self._user)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
index b568cb4..750e3b8 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
@@ -37,6 +37,6 @@ class Project(BaseSide):
     def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
         """Create Project if not exists."""
         gateway = launch_gateway()
-        gateway.entry_point.createProject(user, self.name, self.description)
+        gateway.entry_point.createOrGrantProject(user, self.name, self.description)
         # TODO recover result checker
         # gateway_result_checker(result, None)
diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
index cd0145a..510e3a8 100644
--- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
+++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
@@ -19,8 +19,10 @@
 
 from typing import Optional
 
+from pydolphinscheduler.core import configuration
 from pydolphinscheduler.core.base_side import BaseSide
 from pydolphinscheduler.java_gateway import launch_gateway
+from pydolphinscheduler.side.tenant import Tenant
 
 
 class User(BaseSide):
@@ -39,12 +41,12 @@ class User(BaseSide):
     def __init__(
         self,
         name: str,
-        password: str,
-        email: str,
-        phone: str,
-        tenant: str,
-        queue: Optional[str] = None,
-        status: Optional[int] = 1,
+        password: Optional[str] = configuration.USER_PASSWORD,
+        email: Optional[str] = configuration.USER_EMAIL,
+        phone: Optional[str] = configuration.USER_PHONE,
+        tenant: Optional[str] = configuration.WORKFLOW_TENANT,
+        queue: Optional[str] = configuration.WORKFLOW_QUEUE,
+        status: Optional[int] = configuration.USER_STATE,
     ):
         super().__init__(name)
         self.password = password
@@ -54,8 +56,15 @@ class User(BaseSide):
         self.queue = queue
         self.status = status
 
+    def create_tenant_if_not_exists(self) -> None:
+        """Create tenant object."""
+        tenant = Tenant(name=self.tenant, queue=self.queue)
+        tenant.create_if_not_exists(self.queue)
+
     def create_if_not_exists(self, **kwargs):
         """Create User if not exists."""
+        # Should make sure queue already exists.
+        self.create_tenant_if_not_exists()
         gateway = launch_gateway()
         gateway.entry_point.createUser(
             self.name,
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
index 655b7fd..e311be2 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
@@ -309,15 +309,12 @@ def test_process_definition_simple_separate():
     "user_attrs",
     [
         {"tenant": "tenant_specific"},
-        {"queue": "queue_specific"},
-        {"tenant": "tenant_specific", "queue": "queue_specific"},
     ],
 )
 def test_set_process_definition_user_attr(user_attrs):
     """Test user with correct attributes if we specific assigned to process definition object."""
     default_value = {
         "tenant": configuration.WORKFLOW_TENANT,
-        "queue": configuration.WORKFLOW_QUEUE,
     }
     with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd:
         user = pd.user
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py
similarity index 62%
copy from dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
copy to dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py
index 423c6c3..a9cd352 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py
@@ -15,20 +15,24 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Test whether success submit examples DAG to PythonGatewayServer."""
-
-from pathlib import Path
+"""py.test conftest.py file for package integration test."""
 
 import pytest
 
-from tests.testing.constants import ignore_exec_examples
 from tests.testing.docker_wrapper import DockerWrapper
-from tests.testing.path import path_example
 
 
-@pytest.fixture(scope="module")
-def setup_docker():
-    """Set up and teardown docker env for  fixture."""
+@pytest.fixture(scope="package", autouse=True)
+def docker_setup_teardown():
+    """Fixture for whole package tests, Set up and teardown docker env.
+
+    Fixture in file named ``conftest.py`` with ``scope=package`` could be auto import in the
+    whole package, and with attribute ``autouse=True`` will be auto-use for each test cases.
+
+    .. seealso::
+        For more information about conftest.py see:
+        https://docs.pytest.org/en/latest/example/simple.html#package-directory-level-fixtures-setups
+    """
     docker_wrapper = DockerWrapper(
         image="apache/dolphinscheduler-standalone-server:ci",
         container_name="ci-dolphinscheduler-standalone-server",
@@ -40,19 +44,3 @@ def setup_docker():
     assert container is not None
     yield
     docker_wrapper.remove_container()
-
-
-@pytest.mark.parametrize(
-    "example_path",
-    [
-        path
-        for path in path_example.iterdir()
-        if path.is_file() and path.stem not in ignore_exec_examples
-    ],
-)
-def test_exec_white_list_example(setup_docker, example_path: Path):
-    """Test execute examples and submit DAG to PythonGatewayServer."""
-    try:
-        exec(example_path.read_text())
-    except Exception:
-        raise Exception("Run example %s failed.", example_path.stem)
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_java_gateway.py
similarity index 90%
rename from dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py
rename to dolphinscheduler-python/pydolphinscheduler/tests/integration/test_java_gateway.py
index 3c8831e..8b7c5ff 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_java_gateway.py
@@ -31,9 +31,10 @@ def test_gateway_connect():
 def test_jvm_simple():
     """Test use JVM build-in object and operator from java gateway."""
     gateway = JavaGateway()
-    smaller = gateway.jvm.java.lang.Integer.MIN_VALUE
-    bigger = gateway.jvm.java.lang.Integer.MAX_VALUE
-    assert bigger > smaller
+    smallest = gateway.jvm.java.lang.Integer.MIN_VALUE
+    biggest = gateway.jvm.java.lang.Integer.MAX_VALUE
+    assert smallest is not None and biggest is not None
+    assert biggest > smallest
 
 
 def test_python_client_java_import_single():
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_process_definition.py
new file mode 100644
index 0000000..1672bde
--- /dev/null
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_process_definition.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Test process definition in integration."""
+
+from typing import Dict
+
+import pytest
+
+from pydolphinscheduler.core.process_definition import ProcessDefinition
+from pydolphinscheduler.tasks.shell import Shell
+
+PROCESS_DEFINITION_NAME = "test_change_exists_attr_pd"
+TASK_NAME = f"task_{PROCESS_DEFINITION_NAME}"
+
+
+@pytest.mark.parametrize(
+    "pre, post",
+    [
+        (
+            {
+                "user": "pre_user",
+            },
+            {
+                "user": "post_user",
+            },
+        )
+    ],
+)
+def test_change_process_definition_attr(pre: Dict, post: Dict):
+    """Test whether process definition success when specific attribute change."""
+    assert pre.keys() == post.keys(), "Not equal keys for pre and post attribute."
+    for attrs in [pre, post]:
+        with ProcessDefinition(name=PROCESS_DEFINITION_NAME, **attrs) as pd:
+            Shell(name=TASK_NAME, command="echo 1")
+            pd.submit()
diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
index 423c6c3..0964e1b 100644
--- a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
+++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
@@ -22,26 +22,9 @@ from pathlib import Path
 import pytest
 
 from tests.testing.constants import ignore_exec_examples
-from tests.testing.docker_wrapper import DockerWrapper
 from tests.testing.path import path_example
 
 
-@pytest.fixture(scope="module")
-def setup_docker():
-    """Set up and teardown docker env for  fixture."""
-    docker_wrapper = DockerWrapper(
-        image="apache/dolphinscheduler-standalone-server:ci",
-        container_name="ci-dolphinscheduler-standalone-server",
-    )
-    ports = {"25333/tcp": 25333}
-    container = docker_wrapper.run_until_log(
-        log="Started StandaloneServer in", tty=True, ports=ports
-    )
-    assert container is not None
-    yield
-    docker_wrapper.remove_container()
-
-
 @pytest.mark.parametrize(
     "example_path",
     [
@@ -50,7 +33,7 @@ def setup_docker():
         if path.is_file() and path.stem not in ignore_exec_examples
     ],
 )
-def test_exec_white_list_example(setup_docker, example_path: Path):
+def test_exec_white_list_example(example_path: Path):
     """Test execute examples and submit DAG to PythonGatewayServer."""
     try:
         exec(example_path.read_text())
diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
index 9fa1366..218cf61 100644
--- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
+++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
@@ -44,6 +44,7 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
 import org.apache.dolphinscheduler.dao.entity.DataSource;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.ProjectUser;
 import org.apache.dolphinscheduler.dao.entity.Queue;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -52,6 +53,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectUserMapper;
 import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.server.config.PythonGatewayConfig;
@@ -59,6 +61,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -142,6 +145,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
     @Autowired
     private PythonGatewayConfig pythonGatewayConfig;
 
+    @Autowired
+    private ProjectUserMapper projectUserMapper;
+
     @Value("${spring.jackson.time-zone:UTC}")
     private String timezone;
 
@@ -231,8 +237,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
                                                 String taskDefinitionJson,
                                                 ProcessExecutionTypeEnum executionType) {
         User user = usersService.queryUser(userName);
-        Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST);
+        Project project = projectMapper.queryByName(projectName);
         long projectCode = project.getCode();
+
         ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name);
         long processDefinitionCode;
         // create or update process definition
@@ -349,9 +356,38 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
     }
 
     // side object
-    public Map<String, Object> createProject(String userName, String name, String desc) {
+    /*
+      Grant project's permission to user. Use when project's created user not current but
+      Python API use it to change process definition.
+     */
+    private Integer grantProjectToUser(Project project, User user) {
+        Date now = new Date();
+        ProjectUser projectUser = new ProjectUser();
+        projectUser.setUserId(user.getId());
+        projectUser.setProjectId(project.getId());
+        projectUser.setPerm(Constants.AUTHORIZE_WRITABLE_PERM);
+        projectUser.setCreateTime(now);
+        projectUser.setUpdateTime(now);
+        return projectUserMapper.insert(projectUser);
+    }
+
+    /*
+      Grant or create project. Create a new project if project do not exists, and grant the project
+      permission to user if project exists but without permission to this user.
+     */
+    public void createOrGrantProject(String userName, String name, String desc) {
         User user = usersService.queryUser(userName);
-        return projectService.createProject(user, name, desc);
+
+        Project project;
+        project = projectMapper.queryByName(name);
+        if (project == null) {
+            projectService.createProject(user, name, desc);
+        } else if (project.getUserId() != user.getId()) {
+            ProjectUser projectUser = projectUserMapper.queryProjectRelation(project.getId(), user.getId());
+            if (projectUser == null) {
+                grantProjectToUser(project, user);
+            }
+        }
     }
 
     public Map<String, Object> createQueue(String name, String queueName) {